Compare commits
21 Commits
d44ba8a0ae
...
e0776a452e
Author | SHA1 | Date | |
---|---|---|---|
e0776a452e | |||
291a21d82a | |||
f401417ce2 | |||
be3ff00edc | |||
879a6604e5 | |||
d88ad099c4 | |||
636c5e25cb | |||
7337538f0b | |||
9e3cc8236c | |||
01171f5de3 | |||
6a7f246562 | |||
49098763f1 | |||
cd160c486e | |||
c12379134c | |||
189bb294cb | |||
7ad4255fa7 | |||
de061eeb0f | |||
767e19b425 | |||
d8201c7f8e | |||
d229e49ff2 | |||
aea9e10d1a |
12
.gitignore
vendored
12
.gitignore
vendored
@ -51,7 +51,6 @@ internal/
|
|||||||
!tests/cases/projects/projectOption/**/node_modules
|
!tests/cases/projects/projectOption/**/node_modules
|
||||||
!tests/cases/projects/NodeModulesSearch/**/*
|
!tests/cases/projects/NodeModulesSearch/**/*
|
||||||
!tests/baselines/reference/project/nodeModules*/**/*
|
!tests/baselines/reference/project/nodeModules*/**/*
|
||||||
.idea
|
|
||||||
yarn.lock
|
yarn.lock
|
||||||
yarn-error.log
|
yarn-error.log
|
||||||
.parallelperf.*
|
.parallelperf.*
|
||||||
@ -78,10 +77,11 @@ node_modules/
|
|||||||
# project specific
|
# project specific
|
||||||
logs/
|
logs/
|
||||||
__pycache__
|
__pycache__
|
||||||
filter/runs
|
ml/filter/runs
|
||||||
pred/runs
|
ml/pred/runs
|
||||||
pred/checkpoints
|
ml/pred/checkpoints
|
||||||
data/
|
ml/pred/observed
|
||||||
filter/checkpoints
|
ml/data/
|
||||||
|
ml/filter/checkpoints
|
||||||
scripts
|
scripts
|
||||||
model/
|
model/
|
||||||
|
9
.idea/.gitignore
vendored
Normal file
9
.idea/.gitignore
vendored
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
# Default ignored files
|
||||||
|
/shelf/
|
||||||
|
/workspace.xml
|
||||||
|
# Editor-based HTTP Client requests
|
||||||
|
/httpRequests/
|
||||||
|
# Datasource local storage ignored files
|
||||||
|
/dataSources/
|
||||||
|
/dataSources.local.xml
|
||||||
|
dataSources.xml
|
21
.idea/cvsa.iml
Normal file
21
.idea/cvsa.iml
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<module type="WEB_MODULE" version="4">
|
||||||
|
<component name="NewModuleRootManager">
|
||||||
|
<content url="file://$MODULE_DIR$">
|
||||||
|
<excludeFolder url="file://$MODULE_DIR$/.tmp" />
|
||||||
|
<excludeFolder url="file://$MODULE_DIR$/temp" />
|
||||||
|
<excludeFolder url="file://$MODULE_DIR$/tmp" />
|
||||||
|
<excludeFolder url="file://$MODULE_DIR$/ml/data" />
|
||||||
|
<excludeFolder url="file://$MODULE_DIR$/doc" />
|
||||||
|
<excludeFolder url="file://$MODULE_DIR$/ml/filter/checkpoints" />
|
||||||
|
<excludeFolder url="file://$MODULE_DIR$/ml/filter/runs" />
|
||||||
|
<excludeFolder url="file://$MODULE_DIR$/ml/lab/data" />
|
||||||
|
<excludeFolder url="file://$MODULE_DIR$/ml/lab/temp" />
|
||||||
|
<excludeFolder url="file://$MODULE_DIR$/logs" />
|
||||||
|
<excludeFolder url="file://$MODULE_DIR$/model" />
|
||||||
|
<excludeFolder url="file://$MODULE_DIR$/src/db" />
|
||||||
|
</content>
|
||||||
|
<orderEntry type="inheritedJdk" />
|
||||||
|
<orderEntry type="sourceFolder" forTests="false" />
|
||||||
|
</component>
|
||||||
|
</module>
|
12
.idea/inspectionProfiles/Project_Default.xml
Normal file
12
.idea/inspectionProfiles/Project_Default.xml
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
<component name="InspectionProjectProfileManager">
|
||||||
|
<profile version="1.0">
|
||||||
|
<option name="myName" value="Project Default" />
|
||||||
|
<inspection_tool class="GrazieInspection" enabled="false" level="GRAMMAR_ERROR" enabled_by_default="false" />
|
||||||
|
<inspection_tool class="LanguageDetectionInspection" enabled="false" level="WARNING" enabled_by_default="false" />
|
||||||
|
<inspection_tool class="SpellCheckingInspection" enabled="false" level="TYPO" enabled_by_default="false">
|
||||||
|
<option name="processCode" value="true" />
|
||||||
|
<option name="processLiterals" value="true" />
|
||||||
|
<option name="processComments" value="true" />
|
||||||
|
</inspection_tool>
|
||||||
|
</profile>
|
||||||
|
</component>
|
8
.idea/modules.xml
Normal file
8
.idea/modules.xml
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project version="4">
|
||||||
|
<component name="ProjectModuleManager">
|
||||||
|
<modules>
|
||||||
|
<module fileurl="file://$PROJECT_DIR$/.idea/cvsa.iml" filepath="$PROJECT_DIR$/.idea/cvsa.iml" />
|
||||||
|
</modules>
|
||||||
|
</component>
|
||||||
|
</project>
|
6
.idea/sqldialects.xml
Normal file
6
.idea/sqldialects.xml
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project version="4">
|
||||||
|
<component name="SqlDialectMappings">
|
||||||
|
<file url="PROJECT" dialect="PostgreSQL" />
|
||||||
|
</component>
|
||||||
|
</project>
|
6
.idea/vcs.xml
Normal file
6
.idea/vcs.xml
Normal file
@ -0,0 +1,6 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
|
<project version="4">
|
||||||
|
<component name="VcsDirectoryMappings">
|
||||||
|
<mapping directory="" vcs="Git" />
|
||||||
|
</component>
|
||||||
|
</project>
|
@ -1,12 +0,0 @@
|
|||||||
import { JSX } from "preact";
|
|
||||||
import { IS_BROWSER } from "$fresh/runtime.ts";
|
|
||||||
|
|
||||||
export function Button(props: JSX.HTMLAttributes<HTMLButtonElement>) {
|
|
||||||
return (
|
|
||||||
<button
|
|
||||||
{...props}
|
|
||||||
disabled={!IS_BROWSER || props.disabled}
|
|
||||||
class="px-2 py-1 border-gray-500 border-2 rounded bg-white hover:bg-gray-200 transition-colors"
|
|
||||||
/>
|
|
||||||
);
|
|
||||||
}
|
|
@ -1,55 +0,0 @@
|
|||||||
import json
|
|
||||||
import random
|
|
||||||
|
|
||||||
def process_data(input_file, output_file):
|
|
||||||
"""
|
|
||||||
从输入文件中读取数据,找出model和human不一致的行,
|
|
||||||
删除"model"键,将"human"键重命名为"label",
|
|
||||||
然后将处理后的数据添加到输出文件中。
|
|
||||||
在写入之前,它会加载output_file中的所有样本,
|
|
||||||
并使用aid键进行去重过滤。
|
|
||||||
|
|
||||||
Args:
|
|
||||||
input_file (str): 输入文件的路径。
|
|
||||||
output_file (str): 输出文件的路径。
|
|
||||||
"""
|
|
||||||
|
|
||||||
# 加载output_file中已有的数据,用于去重
|
|
||||||
existing_data = set()
|
|
||||||
try:
|
|
||||||
with open(output_file, 'r', encoding='utf-8') as f_out:
|
|
||||||
for line in f_out:
|
|
||||||
try:
|
|
||||||
data = json.loads(line)
|
|
||||||
existing_data.add(data['aid'])
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
pass # 忽略JSON解码错误,继续读取下一行
|
|
||||||
except FileNotFoundError:
|
|
||||||
pass # 如果文件不存在,则忽略
|
|
||||||
|
|
||||||
with open(input_file, 'r', encoding='utf-8') as f_in, open(output_file, 'a', encoding='utf-8') as f_out:
|
|
||||||
for line in f_in:
|
|
||||||
try:
|
|
||||||
data = json.loads(line)
|
|
||||||
|
|
||||||
if data['model'] != data['human'] or random.random() < 0.2:
|
|
||||||
if data['aid'] not in existing_data: # 检查aid是否已存在
|
|
||||||
del data['model']
|
|
||||||
data['label'] = data['human']
|
|
||||||
del data['human']
|
|
||||||
f_out.write(json.dumps(data, ensure_ascii=False) + '\n')
|
|
||||||
existing_data.add(data['aid']) # 将新的aid添加到集合中
|
|
||||||
|
|
||||||
except json.JSONDecodeError as e:
|
|
||||||
print(f"JSON解码错误: {e}")
|
|
||||||
print(f"错误行内容: {line.strip()}")
|
|
||||||
except KeyError as e:
|
|
||||||
print(f"KeyError: 键 '{e}' 不存在")
|
|
||||||
print(f"错误行内容: {line.strip()}")
|
|
||||||
|
|
||||||
# 调用函数处理数据
|
|
||||||
input_file = 'real_test.jsonl'
|
|
||||||
output_file = 'labeled_data.jsonl'
|
|
||||||
process_data(input_file, output_file)
|
|
||||||
print(f"处理完成,结果已写入 {output_file}")
|
|
||||||
|
|
58
deno.json
58
deno.json
@ -1,55 +1,15 @@
|
|||||||
{
|
{
|
||||||
"lock": false,
|
"lock": false,
|
||||||
"tasks": {
|
"workspace": [
|
||||||
"crawl-raw-bili": "deno --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run src/db/raw/insertAidsToDB.ts",
|
"./packages/crawler",
|
||||||
"crawl-bili-aids": "deno --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run src/db/raw/fetchAids.ts",
|
"./packages/frontend",
|
||||||
"check": "deno fmt --check && deno lint && deno check **/*.ts && deno check **/*.tsx",
|
"./packages/backend",
|
||||||
"cli": "echo \"import '\\$fresh/src/dev/cli.ts'\" | deno run --unstable -A -",
|
"./packages/core"
|
||||||
"manifest": "deno task cli manifest $(pwd)",
|
],
|
||||||
"start": "deno run -A --watch=static/,routes/ dev.ts",
|
|
||||||
"build": "deno run -A dev.ts build",
|
|
||||||
"preview": "deno run -A main.ts",
|
|
||||||
"update": "deno run -A -r https://fresh.deno.dev/update .",
|
|
||||||
"worker:main": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write --allow-run ./src/worker.ts",
|
|
||||||
"worker:filter": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/filterWorker.ts",
|
|
||||||
"adder": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts",
|
|
||||||
"bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts",
|
|
||||||
"all": "concurrently 'deno task worker:main' 'deno task adder' 'deno task bullui' 'deno task worker:filter'",
|
|
||||||
"test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run"
|
|
||||||
},
|
|
||||||
"lint": {
|
|
||||||
"rules": {
|
|
||||||
"tags": ["fresh", "recommended"]
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"exclude": ["**/_fresh/*"],
|
|
||||||
"imports": {
|
|
||||||
"@std/assert": "jsr:@std/assert@1",
|
|
||||||
"$fresh/": "https://deno.land/x/fresh@1.7.3/",
|
|
||||||
"preact": "https://esm.sh/preact@10.22.0",
|
|
||||||
"preact/": "https://esm.sh/preact@10.22.0/",
|
|
||||||
"@preact/signals": "https://esm.sh/*@preact/signals@1.2.2",
|
|
||||||
"@preact/signals-core": "https://esm.sh/*@preact/signals-core@1.5.1",
|
|
||||||
"tailwindcss": "npm:tailwindcss@3.4.1",
|
|
||||||
"tailwindcss/": "npm:/tailwindcss@3.4.1/",
|
|
||||||
"tailwindcss/plugin": "npm:/tailwindcss@3.4.1/plugin.js",
|
|
||||||
"$std/": "https://deno.land/std@0.216.0/",
|
|
||||||
"@huggingface/transformers": "npm:@huggingface/transformers@3.0.0",
|
|
||||||
"bullmq": "npm:bullmq",
|
|
||||||
"lib/": "./lib/",
|
|
||||||
"ioredis": "npm:ioredis",
|
|
||||||
"@bull-board/api": "npm:@bull-board/api",
|
|
||||||
"@bull-board/express": "npm:@bull-board/express",
|
|
||||||
"express": "npm:express",
|
|
||||||
"src/": "./src/",
|
|
||||||
"onnxruntime": "npm:onnxruntime-node@1.19.2",
|
|
||||||
"chalk": "npm:chalk"
|
|
||||||
},
|
|
||||||
"compilerOptions": {
|
|
||||||
"jsx": "react-jsx",
|
|
||||||
"jsxImportSource": "preact"
|
|
||||||
},
|
|
||||||
"nodeModulesDir": "auto",
|
"nodeModulesDir": "auto",
|
||||||
|
"tasks": {
|
||||||
|
"crawler": "deno task --filter 'crawler' all"
|
||||||
|
},
|
||||||
"fmt": {
|
"fmt": {
|
||||||
"useTabs": true,
|
"useTabs": true,
|
||||||
"lineWidth": 120,
|
"lineWidth": 120,
|
||||||
|
7
dev.ts
7
dev.ts
@ -1,7 +0,0 @@
|
|||||||
#!/usr/bin/env -S deno run -A --watch=static/,routes/
|
|
||||||
|
|
||||||
import dev from "$fresh/dev.ts";
|
|
||||||
import config from "./fresh.config.ts";
|
|
||||||
|
|
||||||
import "$std/dotenv/load.ts";
|
|
||||||
await dev(import.meta.url, "./main.ts", config);
|
|
@ -1,6 +0,0 @@
|
|||||||
import { defineConfig } from "$fresh/server.ts";
|
|
||||||
import tailwind from "$fresh/plugins/tailwind.ts";
|
|
||||||
|
|
||||||
export default defineConfig({
|
|
||||||
plugins: [tailwind()],
|
|
||||||
});
|
|
27
fresh.gen.ts
27
fresh.gen.ts
@ -1,27 +0,0 @@
|
|||||||
// DO NOT EDIT. This file is generated by Fresh.
|
|
||||||
// This file SHOULD be checked into source version control.
|
|
||||||
// This file is automatically updated during development when running `dev.ts`.
|
|
||||||
|
|
||||||
import * as $_404 from "./routes/_404.tsx";
|
|
||||||
import * as $_app from "./routes/_app.tsx";
|
|
||||||
import * as $api_joke from "./routes/api/joke.ts";
|
|
||||||
import * as $greet_name_ from "./routes/greet/[name].tsx";
|
|
||||||
import * as $index from "./routes/index.tsx";
|
|
||||||
import * as $Counter from "./islands/Counter.tsx";
|
|
||||||
import type { Manifest } from "$fresh/server.ts";
|
|
||||||
|
|
||||||
const manifest = {
|
|
||||||
routes: {
|
|
||||||
"./routes/_404.tsx": $_404,
|
|
||||||
"./routes/_app.tsx": $_app,
|
|
||||||
"./routes/api/joke.ts": $api_joke,
|
|
||||||
"./routes/greet/[name].tsx": $greet_name_,
|
|
||||||
"./routes/index.tsx": $index,
|
|
||||||
},
|
|
||||||
islands: {
|
|
||||||
"./islands/Counter.tsx": $Counter,
|
|
||||||
},
|
|
||||||
baseUrl: import.meta.url,
|
|
||||||
} satisfies Manifest;
|
|
||||||
|
|
||||||
export default manifest;
|
|
@ -1,16 +0,0 @@
|
|||||||
import type { Signal } from "@preact/signals";
|
|
||||||
import { Button } from "../components/Button.tsx";
|
|
||||||
|
|
||||||
interface CounterProps {
|
|
||||||
count: Signal<number>;
|
|
||||||
}
|
|
||||||
|
|
||||||
export default function Counter(props: CounterProps) {
|
|
||||||
return (
|
|
||||||
<div class="flex gap-8 py-6">
|
|
||||||
<Button onClick={() => props.count.value -= 1}>-1</Button>
|
|
||||||
<p class="text-3xl tabular-nums">{props.count}</p>
|
|
||||||
<Button onClick={() => props.count.value += 1}>+1</Button>
|
|
||||||
</div>
|
|
||||||
);
|
|
||||||
}
|
|
@ -1,22 +0,0 @@
|
|||||||
import { AIManager } from "lib/ml/manager.ts";
|
|
||||||
import * as ort from "onnxruntime";
|
|
||||||
import logger from "lib/log/logger.ts";
|
|
||||||
import { WorkerError } from "lib/mq/schema.ts";
|
|
||||||
|
|
||||||
const modelPath = "./model/model.onnx";
|
|
||||||
|
|
||||||
class MantisProto extends AIManager {
|
|
||||||
constructor() {
|
|
||||||
super();
|
|
||||||
this.models = {
|
|
||||||
"predictor": modelPath,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
public override async init(): Promise<void> {
|
|
||||||
await super.init();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const Mantis = new MantisProto();
|
|
||||||
export default Mantis;
|
|
@ -1 +0,0 @@
|
|||||||
export * from "lib/mq/exec/getLatestVideos.ts";
|
|
13
main.ts
13
main.ts
@ -1,13 +0,0 @@
|
|||||||
/// <reference no-default-lib="true" />
|
|
||||||
/// <reference lib="dom" />
|
|
||||||
/// <reference lib="dom.iterable" />
|
|
||||||
/// <reference lib="dom.asynciterable" />
|
|
||||||
/// <reference lib="deno.ns" />
|
|
||||||
|
|
||||||
import "$std/dotenv/load.ts";
|
|
||||||
|
|
||||||
import { start } from "$fresh/server.ts";
|
|
||||||
import manifest from "./fresh.gen.ts";
|
|
||||||
import config from "./fresh.config.ts";
|
|
||||||
|
|
||||||
await start(manifest, config);
|
|
0
lab/.gitignore → ml/lab/.gitignore
vendored
0
lab/.gitignore → ml/lab/.gitignore
vendored
0
packages/backend/deno.json
Normal file
0
packages/backend/deno.json
Normal file
0
packages/core/deno.json
Normal file
0
packages/core/deno.json
Normal file
@ -1,6 +1,6 @@
|
|||||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
import { AllDataType, BiliUserType } from "lib/db/schema.d.ts";
|
import { AllDataType, BiliUserType } from "db/schema.d.ts";
|
||||||
import Akari from "lib/ml/akari.ts";
|
import Akari from "ml/akari.ts";
|
||||||
|
|
||||||
export async function videoExistsInAllData(client: Client, aid: number) {
|
export async function videoExistsInAllData(client: Client, aid: number) {
|
||||||
return await client.queryObject<{ exists: boolean }>(
|
return await client.queryObject<{ exists: boolean }>(
|
@ -1,5 +1,5 @@
|
|||||||
import { Pool } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
import { Pool } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
import { postgresConfig } from "lib/db/pgConfig.ts";
|
import { postgresConfig } from "db/pgConfig.ts";
|
||||||
|
|
||||||
const pool = new Pool(postgresConfig, 12);
|
const pool = new Pool(postgresConfig, 12);
|
||||||
|
|
@ -1,5 +1,5 @@
|
|||||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
import { LatestSnapshotType } from "lib/db/schema.d.ts";
|
import { LatestSnapshotType } from "db/schema.d.ts";
|
||||||
|
|
||||||
export async function getVideosNearMilestone(client: Client) {
|
export async function getVideosNearMilestone(client: Client) {
|
||||||
const queryResult = await client.queryObject<LatestSnapshotType>(`
|
const queryResult = await client.queryObject<LatestSnapshotType>(`
|
@ -1,16 +1,13 @@
|
|||||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
import { formatTimestampToPsql } from "lib/utils/formatTimestampToPostgre.ts";
|
import { formatTimestampToPsql } from "utils/formatTimestampToPostgre.ts";
|
||||||
import { SnapshotScheduleType } from "./schema.d.ts";
|
import { SnapshotScheduleType } from "./schema.d.ts";
|
||||||
import logger from "lib/log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
import { MINUTE } from "$std/datetime/constants.ts";
|
import { MINUTE } from "$std/datetime/constants.ts";
|
||||||
import { redis } from "lib/db/redis.ts";
|
import { redis } from "db/redis.ts";
|
||||||
import { Redis } from "ioredis";
|
import { Redis } from "ioredis";
|
||||||
|
|
||||||
const WINDOW_SIZE = 2880;
|
|
||||||
const REDIS_KEY = "cvsa:snapshot_window_counts";
|
const REDIS_KEY = "cvsa:snapshot_window_counts";
|
||||||
|
|
||||||
let lastAvailableWindow: { offset: number; count: number } | null = null;
|
|
||||||
|
|
||||||
function getCurrentWindowIndex(): number {
|
function getCurrentWindowIndex(): number {
|
||||||
const now = new Date();
|
const now = new Date();
|
||||||
const minutesSinceMidnight = now.getHours() * 60 + now.getMinutes();
|
const minutesSinceMidnight = now.getHours() * 60 + now.getMinutes();
|
||||||
@ -31,7 +28,7 @@ export async function refreshSnapshotWindowCounts(client: Client, redisClient: R
|
|||||||
WHERE started_at >= NOW() AND status = 'pending' AND started_at <= NOW() + INTERVAL '10 days'
|
WHERE started_at >= NOW() AND status = 'pending' AND started_at <= NOW() + INTERVAL '10 days'
|
||||||
GROUP BY 1
|
GROUP BY 1
|
||||||
ORDER BY window_start
|
ORDER BY window_start
|
||||||
`
|
`;
|
||||||
|
|
||||||
await redisClient.del(REDIS_KEY);
|
await redisClient.del(REDIS_KEY);
|
||||||
|
|
||||||
@ -39,13 +36,11 @@ export async function refreshSnapshotWindowCounts(client: Client, redisClient: R
|
|||||||
|
|
||||||
for (const row of result.rows) {
|
for (const row of result.rows) {
|
||||||
const targetOffset = Math.floor((row.window_start.getTime() - startTime) / (5 * MINUTE));
|
const targetOffset = Math.floor((row.window_start.getTime() - startTime) / (5 * MINUTE));
|
||||||
const offset = (currentWindow + targetOffset);
|
const offset = currentWindow + targetOffset;
|
||||||
if (offset >= 0 && offset < WINDOW_SIZE) {
|
if (offset >= 0) {
|
||||||
await redisClient.hset(REDIS_KEY, offset.toString(), Number(row.count));
|
await redisClient.hset(REDIS_KEY, offset.toString(), Number(row.count));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
lastAvailableWindow = null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function initSnapshotWindowCounts(client: Client, redisClient: Redis) {
|
export async function initSnapshotWindowCounts(client: Client, redisClient: Redis) {
|
||||||
@ -84,6 +79,14 @@ export async function videoHasProcessingSchedule(client: Client, aid: number) {
|
|||||||
return res.rows.length > 0;
|
return res.rows.length > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function bulkGetVideosWithoutProcessingSchedules(client: Client, aids: number[]) {
|
||||||
|
const res = await client.queryObject<{ aid: number }>(
|
||||||
|
`SELECT aid FROM snapshot_schedule WHERE aid = ANY($1) AND status != 'processing' GROUP BY aid`,
|
||||||
|
[aids],
|
||||||
|
);
|
||||||
|
return res.rows.map((row) => row.aid);
|
||||||
|
}
|
||||||
|
|
||||||
interface Snapshot {
|
interface Snapshot {
|
||||||
created_at: number;
|
created_at: number;
|
||||||
views: number;
|
views: number;
|
||||||
@ -183,7 +186,13 @@ export async function getSnapshotScheduleCountWithinRange(client: Client, start:
|
|||||||
* @param aid The aid of the video.
|
* @param aid The aid of the video.
|
||||||
* @param targetTime Scheduled time for snapshot. (Timestamp in milliseconds)
|
* @param targetTime Scheduled time for snapshot. (Timestamp in milliseconds)
|
||||||
*/
|
*/
|
||||||
export async function scheduleSnapshot(client: Client, aid: number, type: string, targetTime: number, force: boolean = false) {
|
export async function scheduleSnapshot(
|
||||||
|
client: Client,
|
||||||
|
aid: number,
|
||||||
|
type: string,
|
||||||
|
targetTime: number,
|
||||||
|
force: boolean = false,
|
||||||
|
) {
|
||||||
if (await videoHasActiveSchedule(client, aid) && !force) return;
|
if (await videoHasActiveSchedule(client, aid) && !force) return;
|
||||||
let adjustedTime = new Date(targetTime);
|
let adjustedTime = new Date(targetTime);
|
||||||
if (type !== "milestone" && type !== "new") {
|
if (type !== "milestone" && type !== "new") {
|
||||||
@ -196,6 +205,18 @@ export async function scheduleSnapshot(client: Client, aid: number, type: string
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function bulkScheduleSnapshot(
|
||||||
|
client: Client,
|
||||||
|
aids: number[],
|
||||||
|
type: string,
|
||||||
|
targetTime: number,
|
||||||
|
force: boolean = false,
|
||||||
|
) {
|
||||||
|
for (const aid of aids) {
|
||||||
|
await scheduleSnapshot(client, aid, type, targetTime, force);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export async function adjustSnapshotTime(
|
export async function adjustSnapshotTime(
|
||||||
expectedStartTime: Date,
|
expectedStartTime: Date,
|
||||||
allowedCounts: number = 1000,
|
allowedCounts: number = 1000,
|
||||||
@ -204,21 +225,19 @@ export async function adjustSnapshotTime(
|
|||||||
const currentWindow = getCurrentWindowIndex();
|
const currentWindow = getCurrentWindowIndex();
|
||||||
const targetOffset = Math.floor((expectedStartTime.getTime() - Date.now()) / (5 * MINUTE)) - 6;
|
const targetOffset = Math.floor((expectedStartTime.getTime() - Date.now()) / (5 * MINUTE)) - 6;
|
||||||
|
|
||||||
let initialOffset = currentWindow + Math.max(targetOffset, 0);
|
const initialOffset = currentWindow + Math.max(targetOffset, 0);
|
||||||
|
|
||||||
if (lastAvailableWindow && lastAvailableWindow.count < allowedCounts) {
|
|
||||||
initialOffset = Math.max(lastAvailableWindow.offset - 2, 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
let timePerIteration = 0;
|
let timePerIteration = 0;
|
||||||
|
const MAX_ITERATIONS = 2880;
|
||||||
|
let iters = 0;
|
||||||
const t = performance.now();
|
const t = performance.now();
|
||||||
for (let i = initialOffset; i < WINDOW_SIZE; i++) {
|
for (let i = initialOffset; i < MAX_ITERATIONS; i++) {
|
||||||
|
iters++;
|
||||||
const offset = i;
|
const offset = i;
|
||||||
const count = await getWindowCount(redisClient, offset);
|
const count = await getWindowCount(redisClient, offset);
|
||||||
|
|
||||||
if (count < allowedCounts) {
|
if (count < allowedCounts) {
|
||||||
const newCount = await redisClient.hincrby(REDIS_KEY, offset.toString(), 1);
|
await redisClient.hincrby(REDIS_KEY, offset.toString(), 1);
|
||||||
lastAvailableWindow = { offset, count: newCount };
|
|
||||||
|
|
||||||
const startPoint = new Date();
|
const startPoint = new Date();
|
||||||
startPoint.setHours(0, 0, 0, 0);
|
startPoint.setHours(0, 0, 0, 0);
|
||||||
@ -230,28 +249,27 @@ export async function adjustSnapshotTime(
|
|||||||
|
|
||||||
if (delayedDate.getTime() < now.getTime()) {
|
if (delayedDate.getTime() < now.getTime()) {
|
||||||
const elapsed = performance.now() - t;
|
const elapsed = performance.now() - t;
|
||||||
timePerIteration = elapsed / (i+1);
|
timePerIteration = elapsed / (i + 1);
|
||||||
logger.log(`${timePerIteration.toFixed(3)}ms * ${i+1}iterations`, "perf", "fn:adjustSnapshotTime");
|
logger.log(`${timePerIteration.toFixed(3)}ms * ${iters} iterations`, "perf", "fn:adjustSnapshotTime");
|
||||||
return now;
|
return now;
|
||||||
}
|
}
|
||||||
const elapsed = performance.now() - t;
|
const elapsed = performance.now() - t;
|
||||||
timePerIteration = elapsed / (i+1);
|
timePerIteration = elapsed / (i + 1);
|
||||||
logger.log(`${timePerIteration.toFixed(3)}ms * ${i+1}iterations`, "perf", "fn:adjustSnapshotTime");
|
logger.log(`${timePerIteration.toFixed(3)}ms * ${iters} iterations`, "perf", "fn:adjustSnapshotTime");
|
||||||
return delayedDate;
|
return delayedDate;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
const elapsed = performance.now() - t;
|
const elapsed = performance.now() - t;
|
||||||
timePerIteration = elapsed / WINDOW_SIZE;
|
timePerIteration = elapsed / MAX_ITERATIONS;
|
||||||
logger.log(`${timePerIteration.toFixed(3)}ms * ${WINDOW_SIZE}iterations`, "perf", "fn:adjustSnapshotTime");
|
logger.log(`${timePerIteration.toFixed(3)}ms * ${MAX_ITERATIONS} iterations`, "perf", "fn:adjustSnapshotTime");
|
||||||
return expectedStartTime;
|
return expectedStartTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
export async function getSnapshotsInNextSecond(client: Client) {
|
export async function getSnapshotsInNextSecond(client: Client) {
|
||||||
const query = `
|
const query = `
|
||||||
SELECT *
|
SELECT *
|
||||||
FROM snapshot_schedule
|
FROM snapshot_schedule
|
||||||
WHERE started_at <= NOW() + INTERVAL '1 seconds' AND status = 'pending'
|
WHERE started_at <= NOW() + INTERVAL '1 seconds' AND status = 'pending' AND type != 'normal'
|
||||||
ORDER BY
|
ORDER BY
|
||||||
CASE
|
CASE
|
||||||
WHEN type = 'milestone' THEN 0
|
WHEN type = 'milestone' THEN 0
|
||||||
@ -264,6 +282,18 @@ export async function getSnapshotsInNextSecond(client: Client) {
|
|||||||
return res.rows;
|
return res.rows;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function getBulkSnapshotsInNextSecond(client: Client) {
|
||||||
|
const query = `
|
||||||
|
SELECT *
|
||||||
|
FROM snapshot_schedule
|
||||||
|
WHERE started_at <= NOW() + INTERVAL '15 seconds' AND status = 'pending' AND type = 'normal'
|
||||||
|
ORDER BY started_at
|
||||||
|
LIMIT 1000;
|
||||||
|
`;
|
||||||
|
const res = await client.queryObject<SnapshotScheduleType>(query, []);
|
||||||
|
return res.rows;
|
||||||
|
}
|
||||||
|
|
||||||
export async function setSnapshotStatus(client: Client, id: number, status: string) {
|
export async function setSnapshotStatus(client: Client, id: number, status: string) {
|
||||||
return await client.queryObject(
|
return await client.queryObject(
|
||||||
`UPDATE snapshot_schedule SET status = $2 WHERE id = $1`,
|
`UPDATE snapshot_schedule SET status = $2 WHERE id = $1`,
|
||||||
@ -271,6 +301,13 @@ export async function setSnapshotStatus(client: Client, id: number, status: stri
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function bulkSetSnapshotStatus(client: Client, ids: number[], status: string) {
|
||||||
|
return await client.queryObject(
|
||||||
|
`UPDATE snapshot_schedule SET status = $2 WHERE id = ANY($1)`,
|
||||||
|
[ids, status],
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
export async function getVideosWithoutActiveSnapshotSchedule(client: Client) {
|
export async function getVideosWithoutActiveSnapshotSchedule(client: Client) {
|
||||||
const query: string = `
|
const query: string = `
|
||||||
SELECT s.aid
|
SELECT s.aid
|
@ -1,5 +1,5 @@
|
|||||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts";
|
import { parseTimestampFromPsql } from "utils/formatTimestampToPostgre.ts";
|
||||||
|
|
||||||
export async function getNotCollectedSongs(client: Client) {
|
export async function getNotCollectedSongs(client: Client) {
|
||||||
const queryResult = await client.queryObject<{ aid: number }>(`
|
const queryResult = await client.queryObject<{ aid: number }>(`
|
43
packages/crawler/deno.json
Normal file
43
packages/crawler/deno.json
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
{
|
||||||
|
"name": "@cvsa/crawler",
|
||||||
|
"tasks": {
|
||||||
|
"crawl-raw-bili": "deno --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run src/db/raw/insertAidsToDB.ts",
|
||||||
|
"crawl-bili-aids": "deno --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run src/db/raw/fetchAids.ts",
|
||||||
|
"check": "deno fmt --check && deno lint && deno check **/*.ts && deno check **/*.tsx",
|
||||||
|
"manifest": "deno task cli manifest $(pwd)",
|
||||||
|
"start": "deno run -A --watch=static/,routes/ dev.ts",
|
||||||
|
"build": "deno run -A dev.ts build",
|
||||||
|
"preview": "deno run -A main.ts",
|
||||||
|
"worker:main": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write --allow-run ./src/worker.ts",
|
||||||
|
"worker:filter": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/filterWorker.ts",
|
||||||
|
"adder": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts",
|
||||||
|
"bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts",
|
||||||
|
"all": "concurrently 'deno task worker:main' 'deno task adder' 'deno task bullui' 'deno task worker:filter'",
|
||||||
|
"test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run"
|
||||||
|
},
|
||||||
|
"lint": {
|
||||||
|
"rules": {
|
||||||
|
"tags": ["recommended"]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"imports": {
|
||||||
|
"@std/assert": "jsr:@std/assert@1",
|
||||||
|
"$std/": "https://deno.land/std@0.216.0/",
|
||||||
|
"@huggingface/transformers": "npm:@huggingface/transformers@3.0.0",
|
||||||
|
"bullmq": "npm:bullmq",
|
||||||
|
"mq/": "./mq/",
|
||||||
|
"db/": "./db/",
|
||||||
|
"log/": "./log/",
|
||||||
|
"net/": "./net/",
|
||||||
|
"ml/": "./ml/",
|
||||||
|
"utils/": "./utils/",
|
||||||
|
"ioredis": "npm:ioredis",
|
||||||
|
"@bull-board/api": "npm:@bull-board/api",
|
||||||
|
"@bull-board/express": "npm:@bull-board/express",
|
||||||
|
"express": "npm:express",
|
||||||
|
"src/": "./src/",
|
||||||
|
"onnxruntime": "npm:onnxruntime-node@1.19.2",
|
||||||
|
"chalk": "npm:chalk"
|
||||||
|
},
|
||||||
|
"exports": "./main.ts"
|
||||||
|
}
|
@ -1,4 +1,4 @@
|
|||||||
import logger from "lib/log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
|
|
||||||
logger.error(Error("test error"), "test service");
|
logger.error(Error("test error"), "test service");
|
||||||
logger.debug(`some string`);
|
logger.debug(`some string`);
|
7
packages/crawler/main.ts
Normal file
7
packages/crawler/main.ts
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
// DENO ASK ME TO EXPORT SOMETHING WHEN 'name' IS SPECIFIED
|
||||||
|
// AND IF I DON'T SPECIFY 'name', THE --filter FLAG IN `deno task` WON'T WORK.
|
||||||
|
// I DONT'T KNOW WHY
|
||||||
|
// SO HERE'S A PLACHOLDER EXPORT FOR DENO:
|
||||||
|
export const DENO = "FUCK YOU DENO";
|
||||||
|
// Oh, maybe export the version is a good idea
|
||||||
|
export const VERSION = "1.0.13";
|
@ -1,12 +1,12 @@
|
|||||||
import { AIManager } from "lib/ml/manager.ts";
|
import { AIManager } from "ml/manager.ts";
|
||||||
import * as ort from "onnxruntime";
|
import * as ort from "onnxruntime";
|
||||||
import logger from "lib/log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
import { WorkerError } from "lib/mq/schema.ts";
|
import { WorkerError } from "mq/schema.ts";
|
||||||
import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers";
|
import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers";
|
||||||
|
|
||||||
const tokenizerModel = "alikia2x/jina-embedding-v3-m2v-1024";
|
const tokenizerModel = "alikia2x/jina-embedding-v3-m2v-1024";
|
||||||
const onnxClassifierPath = "./model/akari/3.17.onnx";
|
const onnxClassifierPath = "../../model/akari/3.17.onnx";
|
||||||
const onnxEmbeddingPath = "./model/embedding/model.onnx";
|
const onnxEmbeddingPath = "../../model/embedding/model.onnx";
|
||||||
|
|
||||||
class AkariProto extends AIManager {
|
class AkariProto extends AIManager {
|
||||||
private tokenizer: PreTrainedTokenizer | null = null;
|
private tokenizer: PreTrainedTokenizer | null = null;
|
@ -1,6 +1,6 @@
|
|||||||
import * as ort from "onnxruntime";
|
import * as ort from "onnxruntime";
|
||||||
import logger from "lib/log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
import { WorkerError } from "lib/mq/schema.ts";
|
import { WorkerError } from "mq/schema.ts";
|
||||||
|
|
||||||
export class AIManager {
|
export class AIManager {
|
||||||
public sessions: { [key: string]: ort.InferenceSession } = {};
|
public sessions: { [key: string]: ort.InferenceSession } = {};
|
@ -1,13 +1,13 @@
|
|||||||
import { Job } from "bullmq";
|
import { Job } from "bullmq";
|
||||||
import { db } from "lib/db/init.ts";
|
import { db } from "db/init.ts";
|
||||||
import { getUnlabelledVideos, getVideoInfoFromAllData, insertVideoLabel } from "lib/db/allData.ts";
|
import { getUnlabelledVideos, getVideoInfoFromAllData, insertVideoLabel } from "db/allData.ts";
|
||||||
import Akari from "lib/ml/akari.ts";
|
import Akari from "ml/akari.ts";
|
||||||
import { ClassifyVideoQueue } from "lib/mq/index.ts";
|
import { ClassifyVideoQueue } from "mq/index.ts";
|
||||||
import logger from "lib/log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
import { lockManager } from "lib/mq/lockManager.ts";
|
import { lockManager } from "mq/lockManager.ts";
|
||||||
import { aidExistsInSongs } from "lib/db/songs.ts";
|
import { aidExistsInSongs } from "db/songs.ts";
|
||||||
import { insertIntoSongs } from "lib/mq/task/collectSongs.ts";
|
import { insertIntoSongs } from "mq/task/collectSongs.ts";
|
||||||
import { scheduleSnapshot } from "lib/db/snapshotSchedule.ts";
|
import { scheduleSnapshot } from "db/snapshotSchedule.ts";
|
||||||
import { MINUTE } from "$std/datetime/constants.ts";
|
import { MINUTE } from "$std/datetime/constants.ts";
|
||||||
|
|
||||||
export const classifyVideoWorker = async (job: Job) => {
|
export const classifyVideoWorker = async (job: Job) => {
|
@ -1,8 +1,8 @@
|
|||||||
import { Job } from "bullmq";
|
import { Job } from "bullmq";
|
||||||
import { queueLatestVideos } from "lib/mq/task/queueLatestVideo.ts";
|
import { queueLatestVideos } from "mq/task/queueLatestVideo.ts";
|
||||||
import { db } from "lib/db/init.ts";
|
import { db } from "db/init.ts";
|
||||||
import { insertVideoInfo } from "lib/mq/task/getVideoDetails.ts";
|
import { insertVideoInfo } from "mq/task/getVideoDetails.ts";
|
||||||
import { collectSongs } from "lib/mq/task/collectSongs.ts";
|
import { collectSongs } from "mq/task/collectSongs.ts";
|
||||||
|
|
||||||
export const getLatestVideosWorker = async (_job: Job): Promise<void> => {
|
export const getLatestVideosWorker = async (_job: Job): Promise<void> => {
|
||||||
const client = await db.connect();
|
const client = await db.connect();
|
@ -1,9 +1,13 @@
|
|||||||
import { Job } from "bullmq";
|
import { Job } from "bullmq";
|
||||||
import { db } from "lib/db/init.ts";
|
import { db } from "db/init.ts";
|
||||||
import { getLatestVideoSnapshot, getVideosNearMilestone } from "lib/db/snapshot.ts";
|
import { getLatestVideoSnapshot, getVideosNearMilestone } from "db/snapshot.ts";
|
||||||
import {
|
import {
|
||||||
|
bulkGetVideosWithoutProcessingSchedules,
|
||||||
|
bulkScheduleSnapshot,
|
||||||
|
bulkSetSnapshotStatus,
|
||||||
findClosestSnapshot,
|
findClosestSnapshot,
|
||||||
findSnapshotBefore,
|
findSnapshotBefore,
|
||||||
|
getBulkSnapshotsInNextSecond,
|
||||||
getLatestSnapshot,
|
getLatestSnapshot,
|
||||||
getSnapshotsInNextSecond,
|
getSnapshotsInNextSecond,
|
||||||
getVideosWithoutActiveSnapshotSchedule,
|
getVideosWithoutActiveSnapshotSchedule,
|
||||||
@ -12,17 +16,18 @@ import {
|
|||||||
setSnapshotStatus,
|
setSnapshotStatus,
|
||||||
snapshotScheduleExists,
|
snapshotScheduleExists,
|
||||||
videoHasProcessingSchedule,
|
videoHasProcessingSchedule,
|
||||||
} from "lib/db/snapshotSchedule.ts";
|
} from "db/snapshotSchedule.ts";
|
||||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
import { HOUR, MINUTE, SECOND, WEEK } from "$std/datetime/constants.ts";
|
import { HOUR, MINUTE, SECOND, WEEK } from "$std/datetime/constants.ts";
|
||||||
import logger from "lib/log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
import { SnapshotQueue } from "lib/mq/index.ts";
|
import { SnapshotQueue } from "mq/index.ts";
|
||||||
import { insertVideoSnapshot } from "lib/mq/task/getVideoStats.ts";
|
import { insertVideoSnapshot } from "mq/task/getVideoStats.ts";
|
||||||
import { NetSchedulerError } from "lib/mq/scheduler.ts";
|
import { NetSchedulerError } from "mq/scheduler.ts";
|
||||||
import { getBiliVideoStatus, setBiliVideoStatus } from "lib/db/allData.ts";
|
import { getBiliVideoStatus, setBiliVideoStatus } from "db/allData.ts";
|
||||||
import { truncate } from "lib/utils/truncate.ts";
|
import { truncate } from "utils/truncate.ts";
|
||||||
import { lockManager } from "lib/mq/lockManager.ts";
|
import { lockManager } from "mq/lockManager.ts";
|
||||||
import { getSongsPublihsedAt } from "lib/db/songs.ts";
|
import { getSongsPublihsedAt } from "db/songs.ts";
|
||||||
|
import { bulkGetVideoStats } from "net/bulkGetVideoStats.ts";
|
||||||
|
|
||||||
const priorityMap: { [key: string]: number } = {
|
const priorityMap: { [key: string]: number } = {
|
||||||
"milestone": 1,
|
"milestone": 1,
|
||||||
@ -35,12 +40,40 @@ const snapshotTypeToTaskMap: { [key: string]: string } = {
|
|||||||
"new": "snapshotMilestoneVideo",
|
"new": "snapshotMilestoneVideo",
|
||||||
};
|
};
|
||||||
|
|
||||||
|
export const bulkSnapshotTickWorker = async (_job: Job) => {
|
||||||
|
const client = await db.connect();
|
||||||
|
try {
|
||||||
|
const schedules = await getBulkSnapshotsInNextSecond(client);
|
||||||
|
const count = schedules.length;
|
||||||
|
const groups = Math.ceil(count / 30);
|
||||||
|
for (let i = 0; i < groups; i++) {
|
||||||
|
const group = schedules.slice(i * 30, (i + 1) * 30);
|
||||||
|
const aids = group.map((schedule) => Number(schedule.aid));
|
||||||
|
const filteredAids = await bulkGetVideosWithoutProcessingSchedules(client, aids);
|
||||||
|
if (filteredAids.length === 0) continue;
|
||||||
|
await bulkSetSnapshotStatus(client, filteredAids, "processing");
|
||||||
|
const dataMap: { [key: number]: number } = {};
|
||||||
|
for (const schedule of group) {
|
||||||
|
const id = Number(schedule.id);
|
||||||
|
dataMap[id] = Number(schedule.aid);
|
||||||
|
}
|
||||||
|
await SnapshotQueue.add("bulkSnapshotVideo", {
|
||||||
|
map: dataMap,
|
||||||
|
}, { priority: 3 });
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
logger.error(e as Error);
|
||||||
|
} finally {
|
||||||
|
client.release();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
export const snapshotTickWorker = async (_job: Job) => {
|
export const snapshotTickWorker = async (_job: Job) => {
|
||||||
const client = await db.connect();
|
const client = await db.connect();
|
||||||
try {
|
try {
|
||||||
const schedules = await getSnapshotsInNextSecond(client);
|
const schedules = await getSnapshotsInNextSecond(client);
|
||||||
for (const schedule of schedules) {
|
for (const schedule of schedules) {
|
||||||
if (await videoHasProcessingSchedule(client, schedule.aid)) {
|
if (await videoHasProcessingSchedule(client, Number(schedule.aid))) {
|
||||||
return `ALREADY_PROCESSING`;
|
return `ALREADY_PROCESSING`;
|
||||||
}
|
}
|
||||||
let priority = 3;
|
let priority = 3;
|
||||||
@ -140,7 +173,8 @@ export const collectMilestoneSnapshotsWorker = async (_job: Job) => {
|
|||||||
const getRegularSnapshotInterval = async (client: Client, aid: number) => {
|
const getRegularSnapshotInterval = async (client: Client, aid: number) => {
|
||||||
const now = Date.now();
|
const now = Date.now();
|
||||||
const date = new Date(now - 24 * HOUR);
|
const date = new Date(now - 24 * HOUR);
|
||||||
const oldSnapshot = await findSnapshotBefore(client, aid, date);
|
let oldSnapshot = await findSnapshotBefore(client, aid, date);
|
||||||
|
if (!oldSnapshot) oldSnapshot = await findClosestSnapshot(client, aid, date);
|
||||||
const latestSnapshot = await getLatestSnapshot(client, aid);
|
const latestSnapshot = await getLatestSnapshot(client, aid);
|
||||||
if (!oldSnapshot || !latestSnapshot) return 0;
|
if (!oldSnapshot || !latestSnapshot) return 0;
|
||||||
if (oldSnapshot.created_at === latestSnapshot.created_at) return 0;
|
if (oldSnapshot.created_at === latestSnapshot.created_at) return 0;
|
||||||
@ -148,7 +182,7 @@ const getRegularSnapshotInterval = async (client: Client, aid: number) => {
|
|||||||
if (hoursDiff < 8) return 24;
|
if (hoursDiff < 8) return 24;
|
||||||
const viewsDiff = latestSnapshot.views - oldSnapshot.views;
|
const viewsDiff = latestSnapshot.views - oldSnapshot.views;
|
||||||
if (viewsDiff === 0) return 72;
|
if (viewsDiff === 0) return 72;
|
||||||
const speedPerDay = viewsDiff / hoursDiff * 24;
|
const speedPerDay = viewsDiff / (hoursDiff + 0.001) * 24;
|
||||||
if (speedPerDay < 6) return 36;
|
if (speedPerDay < 6) return 36;
|
||||||
if (speedPerDay < 120) return 24;
|
if (speedPerDay < 120) return 24;
|
||||||
if (speedPerDay < 320) return 12;
|
if (speedPerDay < 320) return 12;
|
||||||
@ -172,7 +206,7 @@ export const regularSnapshotsWorker = async (_job: Job) => {
|
|||||||
const now = Date.now();
|
const now = Date.now();
|
||||||
const lastSnapshotedAt = latestSnapshot?.time ?? now;
|
const lastSnapshotedAt = latestSnapshot?.time ?? now;
|
||||||
const interval = await getRegularSnapshotInterval(client, aid);
|
const interval = await getRegularSnapshotInterval(client, aid);
|
||||||
logger.log(`Schedule regular snapshot for aid ${aid} in ${interval} hours.`, "mq")
|
logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq");
|
||||||
const targetTime = truncate(lastSnapshotedAt + interval * HOUR, now + 1, now + 100000 * WEEK);
|
const targetTime = truncate(lastSnapshotedAt + interval * HOUR, now + 1, now + 100000 * WEEK);
|
||||||
await scheduleSnapshot(client, aid, "normal", targetTime);
|
await scheduleSnapshot(client, aid, "normal", targetTime);
|
||||||
if (now - startedAt > 25 * MINUTE) {
|
if (now - startedAt > 25 * MINUTE) {
|
||||||
@ -187,6 +221,72 @@ export const regularSnapshotsWorker = async (_job: Job) => {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
export const takeBulkSnapshotForVideosWorker = async (job: Job) => {
|
||||||
|
const dataMap: { [key: number]: number } = job.data.map;
|
||||||
|
const ids = Object.keys(dataMap).map((id) => Number(id));
|
||||||
|
const aidsToFetch: number[] = [];
|
||||||
|
const client = await db.connect();
|
||||||
|
try {
|
||||||
|
for (const id of ids) {
|
||||||
|
const aid = Number(dataMap[id]);
|
||||||
|
const exists = await snapshotScheduleExists(client, id);
|
||||||
|
if (!exists) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
aidsToFetch.push(aid);
|
||||||
|
}
|
||||||
|
const data = await bulkGetVideoStats(aidsToFetch);
|
||||||
|
if (typeof data === "number") {
|
||||||
|
await bulkSetSnapshotStatus(client, ids, "failed");
|
||||||
|
await bulkScheduleSnapshot(client, aidsToFetch, "normal", Date.now() + 15 * SECOND);
|
||||||
|
return `GET_BILI_STATUS_${data}`;
|
||||||
|
}
|
||||||
|
for (const video of data) {
|
||||||
|
const aid = video.id;
|
||||||
|
const stat = video.cnt_info;
|
||||||
|
const views = stat.play;
|
||||||
|
const danmakus = stat.danmaku;
|
||||||
|
const replies = stat.reply;
|
||||||
|
const likes = stat.thumb_up;
|
||||||
|
const coins = stat.coin;
|
||||||
|
const shares = stat.share;
|
||||||
|
const favorites = stat.collect;
|
||||||
|
const query: string = `
|
||||||
|
INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites)
|
||||||
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||||
|
`;
|
||||||
|
await client.queryObject(
|
||||||
|
query,
|
||||||
|
[aid, views, danmakus, replies, likes, coins, shares, favorites],
|
||||||
|
);
|
||||||
|
|
||||||
|
logger.log(`Taken snapshot for video ${aid} in bulk.`, "net", "fn:takeBulkSnapshotForVideosWorker");
|
||||||
|
}
|
||||||
|
await bulkSetSnapshotStatus(client, ids, "completed");
|
||||||
|
for (const aid of aidsToFetch) {
|
||||||
|
const interval = await getRegularSnapshotInterval(client, aid);
|
||||||
|
logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq");
|
||||||
|
await scheduleSnapshot(client, aid, "normal", Date.now() + interval * HOUR);
|
||||||
|
}
|
||||||
|
return `DONE`;
|
||||||
|
} catch (e) {
|
||||||
|
if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") {
|
||||||
|
logger.warn(
|
||||||
|
`No available proxy for bulk request now.`,
|
||||||
|
"mq",
|
||||||
|
"fn:takeBulkSnapshotForVideosWorker",
|
||||||
|
);
|
||||||
|
await bulkSetSnapshotStatus(client, ids, "completed");
|
||||||
|
await bulkScheduleSnapshot(client, aidsToFetch, "normal", Date.now() + 2 * MINUTE);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
logger.error(e as Error, "mq", "fn:takeBulkSnapshotForVideosWorker");
|
||||||
|
await bulkSetSnapshotStatus(client, ids, "failed");
|
||||||
|
} finally {
|
||||||
|
client.release();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
export const takeSnapshotForVideoWorker = async (job: Job) => {
|
export const takeSnapshotForVideoWorker = async (job: Job) => {
|
||||||
const id = job.data.id;
|
const id = job.data.id;
|
||||||
const aid = Number(job.data.aid);
|
const aid = Number(job.data.aid);
|
||||||
@ -215,7 +315,7 @@ export const takeSnapshotForVideoWorker = async (job: Job) => {
|
|||||||
await setSnapshotStatus(client, id, "completed");
|
await setSnapshotStatus(client, id, "completed");
|
||||||
if (type === "normal") {
|
if (type === "normal") {
|
||||||
const interval = await getRegularSnapshotInterval(client, aid);
|
const interval = await getRegularSnapshotInterval(client, aid);
|
||||||
logger.log(`Schedule regular snapshot for aid ${aid} in ${interval} hours.`, "mq")
|
logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq");
|
||||||
await scheduleSnapshot(client, aid, type, Date.now() + interval * HOUR);
|
await scheduleSnapshot(client, aid, type, Date.now() + interval * HOUR);
|
||||||
return `DONE`;
|
return `DONE`;
|
||||||
} else if (type === "new") {
|
} else if (type === "new") {
|
||||||
@ -282,7 +382,11 @@ export const scheduleCleanupWorker = async (_job: Job) => {
|
|||||||
const type = row.type;
|
const type = row.type;
|
||||||
await setSnapshotStatus(client, id, "timeout");
|
await setSnapshotStatus(client, id, "timeout");
|
||||||
await scheduleSnapshot(client, aid, type, Date.now() + 10 * SECOND);
|
await scheduleSnapshot(client, aid, type, Date.now() + 10 * SECOND);
|
||||||
logger.log(`Schedule ${id} has no response received for 5 minutes, rescheduled.`, "mq", "fn:scheduleCleanupWorker")
|
logger.log(
|
||||||
|
`Schedule ${id} has no response received for 5 minutes, rescheduled.`,
|
||||||
|
"mq",
|
||||||
|
"fn:scheduleCleanupWorker",
|
||||||
|
);
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
logger.error(e as Error, "mq", "fn:scheduleCleanupWorker");
|
logger.error(e as Error, "mq", "fn:scheduleCleanupWorker");
|
1
packages/crawler/mq/executors.ts
Normal file
1
packages/crawler/mq/executors.ts
Normal file
@ -0,0 +1 @@
|
|||||||
|
export * from "mq/exec/getLatestVideos.ts";
|
@ -1,9 +1,9 @@
|
|||||||
import { MINUTE, SECOND } from "$std/datetime/constants.ts";
|
import { MINUTE, SECOND } from "$std/datetime/constants.ts";
|
||||||
import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "lib/mq/index.ts";
|
import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "mq/index.ts";
|
||||||
import logger from "lib/log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
import { initSnapshotWindowCounts } from "lib/db/snapshotSchedule.ts";
|
import { initSnapshotWindowCounts } from "db/snapshotSchedule.ts";
|
||||||
import { db } from "lib/db/init.ts";
|
import { db } from "db/init.ts";
|
||||||
import { redis } from "lib/db/redis.ts";
|
import { redis } from "db/redis.ts";
|
||||||
|
|
||||||
export async function initMQ() {
|
export async function initMQ() {
|
||||||
const client = await db.connect();
|
const client = await db.connect();
|
||||||
@ -35,6 +35,16 @@ export async function initMQ() {
|
|||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
|
await SnapshotQueue.upsertJobScheduler("bulkSnapshotTick", {
|
||||||
|
every: 15 * SECOND,
|
||||||
|
immediately: true,
|
||||||
|
}, {
|
||||||
|
opts: {
|
||||||
|
removeOnComplete: 1,
|
||||||
|
removeOnFail: 1,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
await SnapshotQueue.upsertJobScheduler("collectMilestoneSnapshots", {
|
await SnapshotQueue.upsertJobScheduler("collectMilestoneSnapshots", {
|
||||||
every: 5 * MINUTE,
|
every: 5 * MINUTE,
|
||||||
immediately: true,
|
immediately: true,
|
@ -1,5 +1,5 @@
|
|||||||
import { Redis } from "ioredis";
|
import { Redis } from "ioredis";
|
||||||
import { redis } from "lib/db/redis.ts";
|
import { redis } from "db/redis.ts";
|
||||||
|
|
||||||
class LockManager {
|
class LockManager {
|
||||||
private redis: Redis;
|
private redis: Redis;
|
@ -1,4 +1,4 @@
|
|||||||
import { SlidingWindow } from "lib/mq/slidingWindow.ts";
|
import { SlidingWindow } from "mq/slidingWindow.ts";
|
||||||
|
|
||||||
export interface RateLimiterConfig {
|
export interface RateLimiterConfig {
|
||||||
window: SlidingWindow;
|
window: SlidingWindow;
|
@ -1,7 +1,7 @@
|
|||||||
import logger from "lib/log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
import { RateLimiter, RateLimiterConfig } from "lib/mq/rateLimiter.ts";
|
import { RateLimiter, RateLimiterConfig } from "mq/rateLimiter.ts";
|
||||||
import { SlidingWindow } from "lib/mq/slidingWindow.ts";
|
import { SlidingWindow } from "mq/slidingWindow.ts";
|
||||||
import { redis } from "lib/db/redis.ts";
|
import { redis } from "db/redis.ts";
|
||||||
import Redis from "ioredis";
|
import Redis from "ioredis";
|
||||||
import { SECOND } from "$std/datetime/constants.ts";
|
import { SECOND } from "$std/datetime/constants.ts";
|
||||||
|
|
||||||
@ -333,12 +333,18 @@ const biliLimiterConfig: RateLimiterConfig[] = [
|
|||||||
},
|
},
|
||||||
];
|
];
|
||||||
|
|
||||||
const bili_test = biliLimiterConfig;
|
const bili_test = [...biliLimiterConfig];
|
||||||
bili_test[0].max = 10;
|
bili_test[0].max = 10;
|
||||||
bili_test[1].max = 36;
|
bili_test[1].max = 36;
|
||||||
bili_test[2].max = 150;
|
bili_test[2].max = 150;
|
||||||
bili_test[3].max = 1000;
|
bili_test[3].max = 1000;
|
||||||
|
|
||||||
|
const bili_strict = [...biliLimiterConfig];
|
||||||
|
bili_strict[0].max = 1;
|
||||||
|
bili_strict[1].max = 4;
|
||||||
|
bili_strict[2].max = 12;
|
||||||
|
bili_strict[3].max = 100;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Execution order for setup:
|
Execution order for setup:
|
||||||
|
|
||||||
@ -378,11 +384,21 @@ netScheduler.addTask("snapshotVideo", "bili_test", [
|
|||||||
"alicloud-shenzhen",
|
"alicloud-shenzhen",
|
||||||
"alicloud-hohhot",
|
"alicloud-hohhot",
|
||||||
]);
|
]);
|
||||||
|
netScheduler.addTask("bulkSnapshot", "bili_strict", [
|
||||||
|
"alicloud-qingdao",
|
||||||
|
"alicloud-shanghai",
|
||||||
|
"alicloud-zhangjiakou",
|
||||||
|
"alicloud-chengdu",
|
||||||
|
"alicloud-shenzhen",
|
||||||
|
"alicloud-hohhot",
|
||||||
|
]);
|
||||||
netScheduler.setTaskLimiter("getVideoInfo", videoInfoRateLimiterConfig);
|
netScheduler.setTaskLimiter("getVideoInfo", videoInfoRateLimiterConfig);
|
||||||
netScheduler.setTaskLimiter("getLatestVideos", null);
|
netScheduler.setTaskLimiter("getLatestVideos", null);
|
||||||
netScheduler.setTaskLimiter("snapshotMilestoneVideo", null);
|
netScheduler.setTaskLimiter("snapshotMilestoneVideo", null);
|
||||||
netScheduler.setTaskLimiter("snapshotVideo", null);
|
netScheduler.setTaskLimiter("snapshotVideo", null);
|
||||||
|
netScheduler.setTaskLimiter("bulkSnapshot", null);
|
||||||
netScheduler.setProviderLimiter("bilibili", biliLimiterConfig);
|
netScheduler.setProviderLimiter("bilibili", biliLimiterConfig);
|
||||||
netScheduler.setProviderLimiter("bili_test", bili_test);
|
netScheduler.setProviderLimiter("bili_test", bili_test);
|
||||||
|
netScheduler.setProviderLimiter("bili_strict", bili_strict);
|
||||||
|
|
||||||
export default netScheduler;
|
export default netScheduler;
|
@ -1,7 +1,7 @@
|
|||||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
import { aidExistsInSongs, getNotCollectedSongs } from "lib/db/songs.ts";
|
import { aidExistsInSongs, getNotCollectedSongs } from "db/songs.ts";
|
||||||
import logger from "lib/log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
import { scheduleSnapshot } from "lib/db/snapshotSchedule.ts";
|
import { scheduleSnapshot } from "db/snapshotSchedule.ts";
|
||||||
import { MINUTE } from "$std/datetime/constants.ts";
|
import { MINUTE } from "$std/datetime/constants.ts";
|
||||||
|
|
||||||
export async function collectSongs(client: Client) {
|
export async function collectSongs(client: Client) {
|
@ -1,9 +1,9 @@
|
|||||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
import { getVideoDetails } from "lib/net/getVideoDetails.ts";
|
import { getVideoDetails } from "net/getVideoDetails.ts";
|
||||||
import { formatTimestampToPsql } from "lib/utils/formatTimestampToPostgre.ts";
|
import { formatTimestampToPsql } from "utils/formatTimestampToPostgre.ts";
|
||||||
import logger from "lib/log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
import { ClassifyVideoQueue } from "lib/mq/index.ts";
|
import { ClassifyVideoQueue } from "mq/index.ts";
|
||||||
import { userExistsInBiliUsers, videoExistsInAllData } from "lib/db/allData.ts";
|
import { userExistsInBiliUsers, videoExistsInAllData } from "db/allData.ts";
|
||||||
import { HOUR, SECOND } from "$std/datetime/constants.ts";
|
import { HOUR, SECOND } from "$std/datetime/constants.ts";
|
||||||
|
|
||||||
export async function insertVideoInfo(client: Client, aid: number) {
|
export async function insertVideoInfo(client: Client, aid: number) {
|
@ -1,7 +1,7 @@
|
|||||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
import { getVideoInfo } from "lib/net/getVideoInfo.ts";
|
import { getVideoInfo } from "net/getVideoInfo.ts";
|
||||||
import { LatestSnapshotType } from "lib/db/schema.d.ts";
|
import { LatestSnapshotType } from "db/schema.d.ts";
|
||||||
import logger from "lib/log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Fetch video stats from bilibili API and insert into database
|
* Fetch video stats from bilibili API and insert into database
|
@ -1,10 +1,10 @@
|
|||||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
import { getLatestVideoAids } from "lib/net/getLatestVideoAids.ts";
|
import { getLatestVideoAids } from "net/getLatestVideoAids.ts";
|
||||||
import { videoExistsInAllData } from "lib/db/allData.ts";
|
import { videoExistsInAllData } from "db/allData.ts";
|
||||||
import { sleep } from "lib/utils/sleep.ts";
|
import { sleep } from "utils/sleep.ts";
|
||||||
import { SECOND } from "$std/datetime/constants.ts";
|
import { SECOND } from "$std/datetime/constants.ts";
|
||||||
import logger from "lib/log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
import { LatestVideosQueue } from "lib/mq/index.ts";
|
import { LatestVideosQueue } from "mq/index.ts";
|
||||||
|
|
||||||
export async function queueLatestVideos(
|
export async function queueLatestVideos(
|
||||||
client: Client,
|
client: Client,
|
@ -11,11 +11,11 @@ export type VideoTagsResponse = BaseResponse<VideoTagsData>;
|
|||||||
export type VideoInfoResponse = BaseResponse<VideoInfoData>;
|
export type VideoInfoResponse = BaseResponse<VideoInfoData>;
|
||||||
export type MediaListInfoResponse = BaseResponse<MediaListInfoData>;
|
export type MediaListInfoResponse = BaseResponse<MediaListInfoData>;
|
||||||
|
|
||||||
type MediaListInfoData = MediaListInfoItem[];
|
export type MediaListInfoData = MediaListInfoItem[];
|
||||||
|
|
||||||
|
export interface MediaListInfoItem {
|
||||||
interface MediaListInfoItem {
|
attr: number;
|
||||||
bvid: string;
|
bvid: string;
|
||||||
id: number;
|
id: number;
|
||||||
cnt_info: {
|
cnt_info: {
|
||||||
coin: number;
|
coin: number;
|
||||||
@ -25,7 +25,7 @@ interface MediaListInfoItem {
|
|||||||
reply: number;
|
reply: number;
|
||||||
share: number;
|
share: number;
|
||||||
thumb_up: number;
|
thumb_up: number;
|
||||||
}
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
interface VideoInfoData {
|
interface VideoInfoData {
|
27
packages/crawler/net/bulkGetVideoStats.ts
Normal file
27
packages/crawler/net/bulkGetVideoStats.ts
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
import netScheduler from "mq/scheduler.ts";
|
||||||
|
import { MediaListInfoData, MediaListInfoResponse } from "net/bilibili.d.ts";
|
||||||
|
import logger from "log/logger.ts";
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Bulk fetch video metadata from bilibili API
|
||||||
|
* @param {number[]} aids - The aid list to fetch
|
||||||
|
* @returns {Promise<MediaListInfoData | number>} MediaListInfoData or the error code returned by bilibili API
|
||||||
|
* @throws {NetSchedulerError} - The error will be thrown in following cases:
|
||||||
|
* - No proxy is available currently: with error code `NO_PROXY_AVAILABLE`
|
||||||
|
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
|
||||||
|
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
|
||||||
|
*/
|
||||||
|
export async function bulkGetVideoStats(aids: number[]): Promise<MediaListInfoData | number> {
|
||||||
|
const baseURL = `https://api.bilibili.com/medialist/gateway/base/resource/infos?resources=`;
|
||||||
|
let url = baseURL;
|
||||||
|
for (const aid of aids) {
|
||||||
|
url += `${aid}:2,`;
|
||||||
|
}
|
||||||
|
const data = await netScheduler.request<MediaListInfoResponse>(url, "bulkSnapshot");
|
||||||
|
const errMessage = `Error fetching metadata for aid list: ${aids.join(",")}:`;
|
||||||
|
if (data.code !== 0) {
|
||||||
|
logger.error(errMessage + data.code + "-" + data.message, "net", "fn:getVideoInfo");
|
||||||
|
return data.code;
|
||||||
|
}
|
||||||
|
return data.data;
|
||||||
|
}
|
@ -1,6 +1,6 @@
|
|||||||
import { VideoListResponse } from "lib/net/bilibili.d.ts";
|
import { VideoListResponse } from "net/bilibili.d.ts";
|
||||||
import logger from "lib/log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
import netScheduler from "lib/mq/scheduler.ts";
|
import netScheduler from "mq/scheduler.ts";
|
||||||
|
|
||||||
export async function getLatestVideoAids(page: number = 1, pageSize: number = 10): Promise<number[]> {
|
export async function getLatestVideoAids(page: number = 1, pageSize: number = 10): Promise<number[]> {
|
||||||
const startFrom = 1 + pageSize * (page - 1);
|
const startFrom = 1 + pageSize * (page - 1);
|
@ -1,6 +1,6 @@
|
|||||||
import netScheduler from "lib/mq/scheduler.ts";
|
import netScheduler from "mq/scheduler.ts";
|
||||||
import { VideoDetailsData, VideoDetailsResponse } from "lib/net/bilibili.d.ts";
|
import { VideoDetailsData, VideoDetailsResponse } from "net/bilibili.d.ts";
|
||||||
import logger from "lib/log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
|
|
||||||
export async function getVideoDetails(aid: number): Promise<VideoDetailsData | null> {
|
export async function getVideoDetails(aid: number): Promise<VideoDetailsData | null> {
|
||||||
const url = `https://api.bilibili.com/x/web-interface/view/detail?aid=${aid}`;
|
const url = `https://api.bilibili.com/x/web-interface/view/detail?aid=${aid}`;
|
@ -1,6 +1,6 @@
|
|||||||
import netScheduler from "lib/mq/scheduler.ts";
|
import netScheduler from "mq/scheduler.ts";
|
||||||
import { VideoInfoData, VideoInfoResponse } from "lib/net/bilibili.d.ts";
|
import { VideoInfoData, VideoInfoResponse } from "net/bilibili.d.ts";
|
||||||
import logger from "lib/log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Fetch video metadata from bilibili API
|
* Fetch video metadata from bilibili API
|
@ -2,7 +2,7 @@ import express from "express";
|
|||||||
import { createBullBoard } from "@bull-board/api";
|
import { createBullBoard } from "@bull-board/api";
|
||||||
import { BullMQAdapter } from "@bull-board/api/bullMQAdapter.js";
|
import { BullMQAdapter } from "@bull-board/api/bullMQAdapter.js";
|
||||||
import { ExpressAdapter } from "@bull-board/express";
|
import { ExpressAdapter } from "@bull-board/express";
|
||||||
import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "lib/mq/index.ts";
|
import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "mq/index.ts";
|
||||||
|
|
||||||
const serverAdapter = new ExpressAdapter();
|
const serverAdapter = new ExpressAdapter();
|
||||||
serverAdapter.setBasePath("/");
|
serverAdapter.setBasePath("/");
|
@ -1,10 +1,10 @@
|
|||||||
import { ConnectionOptions, Job, Worker } from "bullmq";
|
import { ConnectionOptions, Job, Worker } from "bullmq";
|
||||||
import { redis } from "lib/db/redis.ts";
|
import { redis } from "db/redis.ts";
|
||||||
import logger from "lib/log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
import { classifyVideosWorker, classifyVideoWorker } from "lib/mq/exec/classifyVideo.ts";
|
import { classifyVideosWorker, classifyVideoWorker } from "mq/exec/classifyVideo.ts";
|
||||||
import { WorkerError } from "lib/mq/schema.ts";
|
import { WorkerError } from "mq/schema.ts";
|
||||||
import { lockManager } from "lib/mq/lockManager.ts";
|
import { lockManager } from "mq/lockManager.ts";
|
||||||
import Akari from "lib/ml/akari.ts";
|
import Akari from "ml/akari.ts";
|
||||||
|
|
||||||
Deno.addSignalListener("SIGINT", async () => {
|
Deno.addSignalListener("SIGINT", async () => {
|
||||||
logger.log("SIGINT Received: Shutting down workers...", "mq");
|
logger.log("SIGINT Received: Shutting down workers...", "mq");
|
3
packages/crawler/src/jobAdder.ts
Normal file
3
packages/crawler/src/jobAdder.ts
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
import { initMQ } from "mq/init.ts";
|
||||||
|
|
||||||
|
await initMQ();
|
@ -1,17 +1,19 @@
|
|||||||
import { ConnectionOptions, Job, Worker } from "bullmq";
|
import { ConnectionOptions, Job, Worker } from "bullmq";
|
||||||
import { collectSongsWorker, getLatestVideosWorker } from "lib/mq/executors.ts";
|
import { collectSongsWorker, getLatestVideosWorker } from "mq/executors.ts";
|
||||||
import { redis } from "lib/db/redis.ts";
|
import { redis } from "db/redis.ts";
|
||||||
import logger from "lib/log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
import { lockManager } from "lib/mq/lockManager.ts";
|
import { lockManager } from "mq/lockManager.ts";
|
||||||
import { WorkerError } from "lib/mq/schema.ts";
|
import { WorkerError } from "mq/schema.ts";
|
||||||
import { getVideoInfoWorker } from "lib/mq/exec/getLatestVideos.ts";
|
import { getVideoInfoWorker } from "mq/exec/getLatestVideos.ts";
|
||||||
import {
|
import {
|
||||||
|
bulkSnapshotTickWorker,
|
||||||
collectMilestoneSnapshotsWorker,
|
collectMilestoneSnapshotsWorker,
|
||||||
regularSnapshotsWorker,
|
regularSnapshotsWorker,
|
||||||
|
scheduleCleanupWorker,
|
||||||
snapshotTickWorker,
|
snapshotTickWorker,
|
||||||
|
takeBulkSnapshotForVideosWorker,
|
||||||
takeSnapshotForVideoWorker,
|
takeSnapshotForVideoWorker,
|
||||||
scheduleCleanupWorker
|
} from "mq/exec/snapshotTick.ts";
|
||||||
} from "lib/mq/exec/snapshotTick.ts";
|
|
||||||
|
|
||||||
Deno.addSignalListener("SIGINT", async () => {
|
Deno.addSignalListener("SIGINT", async () => {
|
||||||
logger.log("SIGINT Received: Shutting down workers...", "mq");
|
logger.log("SIGINT Received: Shutting down workers...", "mq");
|
||||||
@ -84,6 +86,12 @@ const snapshotWorker = new Worker(
|
|||||||
case "scheduleCleanup":
|
case "scheduleCleanup":
|
||||||
await scheduleCleanupWorker(job);
|
await scheduleCleanupWorker(job);
|
||||||
break;
|
break;
|
||||||
|
case "bulkSnapshotVideo":
|
||||||
|
await takeBulkSnapshotForVideosWorker(job);
|
||||||
|
break;
|
||||||
|
case "bulkSnapshotTick":
|
||||||
|
await bulkSnapshotTickWorker(job);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue
Block a user