From 79a37d927a023b2992400285e176f7e1d98a594a Mon Sep 17 00:00:00 2001 From: alikia2x Date: Mon, 19 May 2025 00:10:33 +0800 Subject: [PATCH 1/7] fix: forget to specify type when collecting videos without active schedules; missing return when eta too long for milestone snapshot --- packages/crawler/db/snapshotSchedule.ts | 62 +++++++++---------- .../mq/exec/dispatchMilestoneSnapshots.ts | 4 +- .../mq/exec/dispatchRegularSnapshots.ts | 7 ++- packages/crawler/mq/exec/snapshotVideo.ts | 13 ++-- 4 files changed, 40 insertions(+), 46 deletions(-) diff --git a/packages/crawler/db/snapshotSchedule.ts b/packages/crawler/db/snapshotSchedule.ts index 6fcfdc1..d10340d 100644 --- a/packages/crawler/db/snapshotSchedule.ts +++ b/packages/crawler/db/snapshotSchedule.ts @@ -63,18 +63,6 @@ export async function snapshotScheduleExists(sql: Psql, id: number) { return rows.length > 0; } -export async function videoHasActiveSchedule(sql: Psql, aid: number) { - const rows = await sql<{ status: string }[]>` - SELECT status - FROM snapshot_schedule - WHERE aid = ${aid} - AND (status = 'pending' - OR status = 'processing' - ) - ` - return rows.length > 0; -} - export async function videoHasActiveScheduleWithType(sql: Psql, aid: number, type: string) { const rows = await sql<{ status: string }[]>` SELECT status FROM snapshot_schedule @@ -292,23 +280,23 @@ export async function adjustSnapshotTime( } export async function getSnapshotsInNextSecond(sql: Psql) { - const rows = await sql` - SELECT * - FROM snapshot_schedule - WHERE started_at <= NOW() + INTERVAL '1 seconds' AND status = 'pending' AND type != 'normal' - ORDER BY - CASE - WHEN type = 'milestone' THEN 0 - ELSE 1 - END, - started_at - LIMIT 10; - ` - return rows; + return sql` + SELECT * + FROM snapshot_schedule + WHERE started_at <= NOW() + INTERVAL '1 seconds' + AND status = 'pending' + AND type != 'normal' + ORDER BY CASE + WHEN type = 'milestone' THEN 0 + ELSE 1 + END, + started_at + LIMIT 10; + `; } export async function getBulkSnapshotsInNextSecond(sql: Psql) { - const rows = await sql` + return sql` SELECT * FROM snapshot_schedule WHERE (started_at <= NOW() + INTERVAL '15 seconds') @@ -320,27 +308,33 @@ export async function getBulkSnapshotsInNextSecond(sql: Psql) { END, started_at LIMIT 1000; - ` - return rows; + `; } export async function setSnapshotStatus(sql: Psql, id: number, status: string) { - return await sql` - UPDATE snapshot_schedule SET status = ${status} WHERE id = ${id} + return sql` + UPDATE snapshot_schedule + SET status = ${status} + WHERE id = ${id} `; } export async function bulkSetSnapshotStatus(sql: Psql, ids: number[], status: string) { - return await sql` - UPDATE snapshot_schedule SET status = ${status} WHERE id = ANY(${ids}) + return sql` + UPDATE snapshot_schedule + SET status = ${status} + WHERE id = ANY (${ids}) `; } -export async function getVideosWithoutActiveSnapshotSchedule(sql: Psql) { +export async function getVideosWithoutActiveSnapshotScheduleByType(sql: Psql, type: string) { const rows = await sql<{ aid: string }[]>` SELECT s.aid FROM songs s - LEFT JOIN snapshot_schedule ss ON s.aid = ss.aid AND (ss.status = 'pending' OR ss.status = 'processing') + LEFT JOIN snapshot_schedule ss ON + s.aid = ss.aid AND + (ss.status = 'pending' OR ss.status = 'processing') AND + ss.type = ${type} WHERE ss.aid IS NULL `; return rows.map((r) => Number(r.aid)); diff --git a/packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts b/packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts index 2cbcd08..b17472b 100644 --- a/packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts +++ b/packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts @@ -16,8 +16,8 @@ export const dispatchMilestoneSnapshotsWorker = async (_job: Job) => { if (eta > 144) continue; const now = Date.now(); const scheduledNextSnapshotDelay = eta * HOUR; - const maxInterval = 1 * HOUR; - const minInterval = 1 * SECOND; + const maxInterval = 1.2 * HOUR; + const minInterval = 2 * SECOND; const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval); const targetTime = now + delay; await scheduleSnapshot(sql, aid, "milestone", targetTime); diff --git a/packages/crawler/mq/exec/dispatchRegularSnapshots.ts b/packages/crawler/mq/exec/dispatchRegularSnapshots.ts index 49a7893..3c4feb7 100644 --- a/packages/crawler/mq/exec/dispatchRegularSnapshots.ts +++ b/packages/crawler/mq/exec/dispatchRegularSnapshots.ts @@ -1,7 +1,10 @@ import { Job } from "bullmq"; import { getLatestVideoSnapshot } from "db/snapshot.ts"; import { truncate } from "utils/truncate.ts"; -import { getVideosWithoutActiveSnapshotSchedule, scheduleSnapshot } from "db/snapshotSchedule.ts"; +import { + getVideosWithoutActiveSnapshotScheduleByType, + scheduleSnapshot +} from "db/snapshotSchedule.ts"; import logger from "@core/log/logger.ts"; import { HOUR, MINUTE, WEEK } from "@core/const/time.ts"; import { lockManager } from "@core/mq/lockManager.ts"; @@ -17,7 +20,7 @@ export const dispatchRegularSnapshotsWorker = async (_job: Job): Promise = } await lockManager.acquireLock("dispatchRegularSnapshots", 30 * 60); - const aids = await getVideosWithoutActiveSnapshotSchedule(sql); + const aids = await getVideosWithoutActiveSnapshotScheduleByType(sql, "normal"); for (const rawAid of aids) { const aid = Number(rawAid); const latestSnapshot = await getLatestVideoSnapshot(sql, aid); diff --git a/packages/crawler/mq/exec/snapshotVideo.ts b/packages/crawler/mq/exec/snapshotVideo.ts index 59f05db..e638974 100644 --- a/packages/crawler/mq/exec/snapshotVideo.ts +++ b/packages/crawler/mq/exec/snapshotVideo.ts @@ -78,8 +78,9 @@ export const snapshotVideoWorker = async (job: Job): Promise => { logger.warn( `ETA (${etaHoursString}) too long for milestone snapshot. aid: ${aid}.`, "mq", - "fn:dispatchRegularSnapshotsWorker", + "fn:snapshotVideoWorker", ); + return; } const now = Date.now(); const targetTime = now + eta * HOUR; @@ -92,7 +93,7 @@ export const snapshotVideoWorker = async (job: Job): Promise => { logger.warn( `No available proxy for aid ${job.data.aid}.`, "mq", - "fn:takeSnapshotForVideoWorker", + "fn:snapshotVideoWorker", ); await setSnapshotStatus(sql, id, "no_proxy"); await scheduleSnapshot(sql, aid, type, Date.now() + retryInterval); @@ -102,16 +103,12 @@ export const snapshotVideoWorker = async (job: Job): Promise => { logger.warn( `Failed to proxy request for aid ${job.data.aid}: ${e.message}`, "mq", - "fn:takeSnapshotForVideoWorker", + "fn:snapshotVideoWorker", ); await setSnapshotStatus(sql, id, "failed"); await scheduleSnapshot(sql, aid, type, Date.now() + retryInterval); } - logger.error(e as Error, "mq", "fn:takeSnapshotForVideoWorker"); + logger.error(e as Error, "mq", "fn:snapshotVideoWorker"); await setSnapshotStatus(sql, id, "failed"); } - finally { - await lockManager.releaseLock("dispatchRegularSnapshots"); - }; - return; }; From cf7a285f5740676dc04f331bec488a579e8a14b0 Mon Sep 17 00:00:00 2001 From: alikia2x Date: Mon, 19 May 2025 00:11:53 +0800 Subject: [PATCH 2/7] fix: incorrect import for psql.d.ts, remove unnecessary benchmark files --- packages/crawler/ml/benchmark.ts | 179 ------------------ packages/crawler/ml/quant_benchmark.ts | 171 ----------------- packages/crawler/mq/scheduling.ts | 2 +- .../mq/task/regularSnapshotInterval.ts | 2 +- 4 files changed, 2 insertions(+), 352 deletions(-) delete mode 100644 packages/crawler/ml/benchmark.ts delete mode 100644 packages/crawler/ml/quant_benchmark.ts diff --git a/packages/crawler/ml/benchmark.ts b/packages/crawler/ml/benchmark.ts deleted file mode 100644 index 3fc76ac..0000000 --- a/packages/crawler/ml/benchmark.ts +++ /dev/null @@ -1,179 +0,0 @@ -import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers"; -import * as ort from "onnxruntime"; - -function softmax(logits: Float32Array): number[] { - const maxLogit = Math.max(...logits); - const exponents = logits.map((logit) => Math.exp(logit - maxLogit)); - const sumOfExponents = exponents.reduce((sum, exp) => sum + exp, 0); - return Array.from(exponents.map((exp) => exp / sumOfExponents)); -} - -// 配置参数 -const sentenceTransformerModelName = "alikia2x/jina-embedding-v3-m2v-1024"; -const onnxClassifierPath = "./model/video_classifier_v3_17.onnx"; -const onnxEmbeddingPath = "./model/embedding_original.onnx"; -const testDataPath = "./data/filter/test1.jsonl"; - -// 初始化会话 -const [sessionClassifier, sessionEmbedding] = await Promise.all([ - ort.InferenceSession.create(onnxClassifierPath), - ort.InferenceSession.create(onnxEmbeddingPath), -]); - -let tokenizer: PreTrainedTokenizer; - -// 初始化分词器 -async function loadTokenizer() { - const tokenizerConfig = { local_files_only: true }; - tokenizer = await AutoTokenizer.from_pretrained(sentenceTransformerModelName, tokenizerConfig); -} - -// 新的嵌入生成函数(使用ONNX) -async function getONNXEmbeddings(texts: string[], session: ort.InferenceSession): Promise { - const { input_ids } = await tokenizer(texts, { - add_special_tokens: false, - return_tensor: false, - }); - - // 构造输入参数 - const cumsum = (arr: number[]): number[] => - arr.reduce((acc: number[], num: number, i: number) => [...acc, num + (acc[i - 1] || 0)], []); - - const offsets: number[] = [0, ...cumsum(input_ids.slice(0, -1).map((x: string) => x.length))]; - const flattened_input_ids = input_ids.flat(); - - // 准备ONNX输入 - const inputs = { - input_ids: new ort.Tensor("int64", new BigInt64Array(flattened_input_ids.map(BigInt)), [ - flattened_input_ids.length, - ]), - offsets: new ort.Tensor("int64", new BigInt64Array(offsets.map(BigInt)), [offsets.length]), - }; - - // 执行推理 - const { embeddings } = await session.run(inputs); - return Array.from(embeddings.data as Float32Array); -} - -// 分类推理函数 -async function runClassification(embeddings: number[]): Promise { - const inputTensor = new ort.Tensor( - Float32Array.from(embeddings), - [1, 3, 1024], - ); - - const { logits } = await sessionClassifier.run({ channel_features: inputTensor }); - return softmax(logits.data as Float32Array); -} - -// 指标计算函数 -function calculateMetrics(labels: number[], predictions: number[], elapsedTime: number): { - accuracy: number; - precision: number; - recall: number; - f1: number; - "Class 0 Prec": number; - speed: string; -} { - // 输出label和prediction不一样的index列表 - const arr = []; - for (let i = 0; i < labels.length; i++) { - if (labels[i] !== predictions[i] && predictions[i] == 0) { - arr.push([i + 1, labels[i], predictions[i]]); - } - } - console.log(arr); - // 初始化混淆矩阵 - const classCount = Math.max(...labels, ...predictions) + 1; - const matrix = Array.from({ length: classCount }, () => Array.from({ length: classCount }, () => 0)); - - // 填充矩阵 - labels.forEach((trueLabel, i) => { - matrix[trueLabel][predictions[i]]++; - }); - - // 计算各指标 - let totalTP = 0, totalFP = 0, totalFN = 0; - - for (let c = 0; c < classCount; c++) { - const TP = matrix[c][c]; - const FP = matrix.flatMap((row, i) => i === c ? [] : [row[c]]).reduce((a, b) => a + b, 0); - const FN = matrix[c].filter((_, i) => i !== c).reduce((a, b) => a + b, 0); - - totalTP += TP; - totalFP += FP; - totalFN += FN; - } - - const precision = totalTP / (totalTP + totalFP); - const recall = totalTP / (totalTP + totalFN); - const f1 = 2 * (precision * recall) / (precision + recall) || 0; - - // 计算Class 0 Precision - const class0TP = matrix[0][0]; - const class0FP = matrix.flatMap((row, i) => i === 0 ? [] : [row[0]]).reduce((a, b) => a + b, 0); - const class0Precision = class0TP / (class0TP + class0FP) || 0; - - return { - accuracy: labels.filter((l, i) => l === predictions[i]).length / labels.length, - precision, - recall, - f1, - speed: `${(labels.length / (elapsedTime / 1000)).toFixed(1)} samples/sec`, - "Class 0 Prec": class0Precision, - }; -} - -// 改造后的评估函数 -async function evaluateModel(session: ort.InferenceSession): Promise<{ - accuracy: number; - precision: number; - recall: number; - f1: number; - "Class 0 Prec": number; -}> { - const data = await Deno.readTextFile(testDataPath); - const samples = data.split("\n") - .map((line) => { - try { - return JSON.parse(line); - } catch { - return null; - } - }) - .filter(Boolean); - - const allPredictions: number[] = []; - const allLabels: number[] = []; - - const t = new Date().getTime(); - for (const sample of samples) { - try { - const embeddings = await getONNXEmbeddings([ - sample.title, - sample.description, - sample.tags.join(","), - ], session); - - const probabilities = await runClassification(embeddings); - allPredictions.push(probabilities.indexOf(Math.max(...probabilities))); - allLabels.push(sample.label); - } catch (error) { - console.error("Processing error:", error); - } - } - const elapsed = new Date().getTime() - t; - - return calculateMetrics(allLabels, allPredictions, elapsed); -} - -// 主函数 -async function main() { - await loadTokenizer(); - - const metrics = await evaluateModel(sessionEmbedding); - console.log("Model Metrics:"); - console.table(metrics); -} - -await main(); diff --git a/packages/crawler/ml/quant_benchmark.ts b/packages/crawler/ml/quant_benchmark.ts deleted file mode 100644 index aab6308..0000000 --- a/packages/crawler/ml/quant_benchmark.ts +++ /dev/null @@ -1,171 +0,0 @@ -import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers"; -import * as ort from "onnxruntime"; - -function softmax(logits: Float32Array): number[] { - const maxLogit = Math.max(...logits); - const exponents = logits.map((logit) => Math.exp(logit - maxLogit)); - const sumOfExponents = exponents.reduce((sum, exp) => sum + exp, 0); - return Array.from(exponents.map((exp) => exp / sumOfExponents)); -} - -// 配置参数 -const sentenceTransformerModelName = "alikia2x/jina-embedding-v3-m2v-1024"; -const onnxClassifierPath = "./model/video_classifier_v3_11.onnx"; -const onnxEmbeddingOriginalPath = "./model/embedding_original.onnx"; -const onnxEmbeddingQuantizedPath = "./model/embedding_original.onnx"; - -// 初始化会话 -const [sessionClassifier, sessionEmbeddingOriginal, sessionEmbeddingQuantized] = await Promise.all([ - ort.InferenceSession.create(onnxClassifierPath), - ort.InferenceSession.create(onnxEmbeddingOriginalPath), - ort.InferenceSession.create(onnxEmbeddingQuantizedPath), -]); - -let tokenizer: PreTrainedTokenizer; - -// 初始化分词器 -async function loadTokenizer() { - const tokenizerConfig = { local_files_only: true }; - tokenizer = await AutoTokenizer.from_pretrained(sentenceTransformerModelName, tokenizerConfig); -} - -// 新的嵌入生成函数(使用ONNX) -async function getONNXEmbeddings(texts: string[], session: ort.InferenceSession): Promise { - const { input_ids } = await tokenizer(texts, { - add_special_tokens: false, - return_tensor: false, - }); - - // 构造输入参数 - const cumsum = (arr: number[]): number[] => - arr.reduce((acc: number[], num: number, i: number) => [...acc, num + (acc[i - 1] || 0)], []); - - const offsets: number[] = [0, ...cumsum(input_ids.slice(0, -1).map((x: string) => x.length))]; - const flattened_input_ids = input_ids.flat(); - - // 准备ONNX输入 - const inputs = { - input_ids: new ort.Tensor("int64", new BigInt64Array(flattened_input_ids.map(BigInt)), [ - flattened_input_ids.length, - ]), - offsets: new ort.Tensor("int64", new BigInt64Array(offsets.map(BigInt)), [offsets.length]), - }; - - // 执行推理 - const { embeddings } = await session.run(inputs); - return Array.from(embeddings.data as Float32Array); -} - -// 分类推理函数 -async function runClassification(embeddings: number[]): Promise { - const inputTensor = new ort.Tensor( - Float32Array.from(embeddings), - [1, 4, 1024], - ); - - const { logits } = await sessionClassifier.run({ channel_features: inputTensor }); - return softmax(logits.data as Float32Array); -} - -// 指标计算函数 -function calculateMetrics(labels: number[], predictions: number[], elapsedTime: number): { - accuracy: number; - precision: number; - recall: number; - f1: number; - speed: string; -} { - // 初始化混淆矩阵 - const classCount = Math.max(...labels, ...predictions) + 1; - const matrix = Array.from({ length: classCount }, () => Array.from({ length: classCount }, () => 0)); - - // 填充矩阵 - labels.forEach((trueLabel, i) => { - matrix[trueLabel][predictions[i]]++; - }); - - // 计算各指标 - let totalTP = 0, totalFP = 0, totalFN = 0; - - for (let c = 0; c < classCount; c++) { - const TP = matrix[c][c]; - const FP = matrix.flatMap((row, i) => i === c ? [] : [row[c]]).reduce((a, b) => a + b, 0); - const FN = matrix[c].filter((_, i) => i !== c).reduce((a, b) => a + b, 0); - - totalTP += TP; - totalFP += FP; - totalFN += FN; - } - - const precision = totalTP / (totalTP + totalFP); - const recall = totalTP / (totalTP + totalFN); - const f1 = 2 * (precision * recall) / (precision + recall) || 0; - - return { - accuracy: labels.filter((l, i) => l === predictions[i]).length / labels.length, - precision, - recall, - f1, - speed: `${(labels.length / (elapsedTime / 1000)).toFixed(1)} samples/sec`, - }; -} - -// 改造后的评估函数 -async function evaluateModel(session: ort.InferenceSession): Promise<{ - accuracy: number; - precision: number; - recall: number; - f1: number; -}> { - const data = await Deno.readTextFile("./data/filter/test1.jsonl"); - const samples = data.split("\n") - .map((line) => { - try { - return JSON.parse(line); - } catch { - return null; - } - }) - .filter(Boolean); - - const allPredictions: number[] = []; - const allLabels: number[] = []; - - const t = new Date().getTime(); - for (const sample of samples) { - try { - const embeddings = await getONNXEmbeddings([ - sample.title, - sample.description, - sample.tags.join(","), - sample.author_info, - ], session); - - const probabilities = await runClassification(embeddings); - allPredictions.push(probabilities.indexOf(Math.max(...probabilities))); - allLabels.push(sample.label); - } catch (error) { - console.error("Processing error:", error); - } - } - const elapsed = new Date().getTime() - t; - - return calculateMetrics(allLabels, allPredictions, elapsed); -} - -// 主函数 -async function main() { - await loadTokenizer(); - - // 评估原始模型 - const originalMetrics = await evaluateModel(sessionEmbeddingOriginal); - console.log("Original Model Metrics:"); - console.table(originalMetrics); - - // 评估量化模型 - const quantizedMetrics = await evaluateModel(sessionEmbeddingQuantized); - console.log("Quantized Model Metrics:"); - console.table(quantizedMetrics); -} - -await main(); diff --git a/packages/crawler/mq/scheduling.ts b/packages/crawler/mq/scheduling.ts index cf7427f..4ae81f6 100644 --- a/packages/crawler/mq/scheduling.ts +++ b/packages/crawler/mq/scheduling.ts @@ -2,7 +2,7 @@ import { findClosestSnapshot, getLatestSnapshot, hasAtLeast2Snapshots } from "db import { truncate } from "utils/truncate.ts"; import { closetMilestone } from "./exec/snapshotTick.ts"; import { HOUR, MINUTE } from "@core/const/time.ts"; -import type { Psql } from "@core/db/global.d.ts"; +import type { Psql } from "@core/db/psql.d.ts"; const log = (value: number, base: number = 10) => Math.log(value) / Math.log(base); diff --git a/packages/crawler/mq/task/regularSnapshotInterval.ts b/packages/crawler/mq/task/regularSnapshotInterval.ts index 852d401..2490b44 100644 --- a/packages/crawler/mq/task/regularSnapshotInterval.ts +++ b/packages/crawler/mq/task/regularSnapshotInterval.ts @@ -1,6 +1,6 @@ import { findClosestSnapshot, findSnapshotBefore, getLatestSnapshot } from "db/snapshotSchedule.ts"; import { HOUR } from "@core/const/time.ts"; -import type { Psql } from "@core/db/global.d.ts"; +import type { Psql } from "@core/db/psql.d.ts"; export const getRegularSnapshotInterval = async (sql: Psql, aid: number) => { const now = Date.now(); From 1ff71ab241b79375a657fcd342b1fbfdec115a89 Mon Sep 17 00:00:00 2001 From: alikia2x Date: Mon, 19 May 2025 00:13:01 +0800 Subject: [PATCH 3/7] improve: code quality --- packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts | 2 +- packages/crawler/mq/exec/snapshotVideo.ts | 1 - packages/crawler/mq/scheduling.ts | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts b/packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts index b17472b..5634132 100644 --- a/packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts +++ b/packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts @@ -25,5 +25,5 @@ export const dispatchMilestoneSnapshotsWorker = async (_job: Job) => { } } catch (e) { logger.error(e as Error, "mq", "fn:dispatchMilestoneSnapshotsWorker"); - }; + } } \ No newline at end of file diff --git a/packages/crawler/mq/exec/snapshotVideo.ts b/packages/crawler/mq/exec/snapshotVideo.ts index e638974..220ceb4 100644 --- a/packages/crawler/mq/exec/snapshotVideo.ts +++ b/packages/crawler/mq/exec/snapshotVideo.ts @@ -2,7 +2,6 @@ import { Job } from "bullmq"; import { scheduleSnapshot, setSnapshotStatus, snapshotScheduleExists } from "db/snapshotSchedule.ts"; import logger from "@core/log/logger.ts"; import { HOUR, MINUTE, SECOND } from "@core/const/time.ts"; -import { lockManager } from "@core/mq/lockManager.ts"; import { getBiliVideoStatus, setBiliVideoStatus } from "../../db/bilibili_metadata.ts"; import { insertVideoSnapshot } from "mq/task/getVideoStats.ts"; import { getSongsPublihsedAt } from "db/songs.ts"; diff --git a/packages/crawler/mq/scheduling.ts b/packages/crawler/mq/scheduling.ts index 4ae81f6..f80904a 100644 --- a/packages/crawler/mq/scheduling.ts +++ b/packages/crawler/mq/scheduling.ts @@ -34,7 +34,7 @@ export const getAdjustedShortTermETA = async (sql: Psql, aid: number) => { if (!snapshotsEnough) return 0; const currentTimestamp = new Date().getTime(); - const timeIntervals = [3 * MINUTE, 20 * MINUTE, 1 * HOUR, 3 * HOUR, 6 * HOUR, 72 * HOUR]; + const timeIntervals = [3 * MINUTE, 20 * MINUTE, HOUR, 3 * HOUR, 6 * HOUR, 72 * HOUR]; const DELTA = 0.00001; let minETAHours = Infinity; From 3bc72720d1a00be0f0539108b982b761ef02d37d Mon Sep 17 00:00:00 2001 From: alikia2x Date: Tue, 27 May 2025 22:35:44 +0800 Subject: [PATCH 4/7] fix: remove bull-board from all scripts to prevent crashing --- packages/crawler/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/crawler/package.json b/packages/crawler/package.json index 96abd83..b602ea4 100644 --- a/packages/crawler/package.json +++ b/packages/crawler/package.json @@ -7,7 +7,7 @@ "worker:filter": "bun run ./build/filterWorker.js", "adder": "bun run ./src/jobAdder.ts", "bullui": "bun run ./src/bullui.ts", - "all": "bun run concurrently --restart-tries -1 'bun run worker:main' 'bun run adder' 'bun run bullui' 'bun run worker:filter'" + "all": "bun run concurrently --restart-tries -1 'bun run worker:main' 'bun run adder' 'bun run worker:filter'" }, "devDependencies": { "concurrently": "^9.1.2" From 2b0497c83a0ae25b4ee6357ad96293ac133de580 Mon Sep 17 00:00:00 2001 From: alikia2x Date: Tue, 27 May 2025 22:40:53 +0800 Subject: [PATCH 5/7] fix: incorrect timezone when adding to bilibili metadata --- packages/crawler/utils/formatTimestampToPostgre.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/crawler/utils/formatTimestampToPostgre.ts b/packages/crawler/utils/formatTimestampToPostgre.ts index 8dc37b2..a9b5714 100644 --- a/packages/crawler/utils/formatTimestampToPostgre.ts +++ b/packages/crawler/utils/formatTimestampToPostgre.ts @@ -1,6 +1,6 @@ export function formatTimestampToPsql(timestamp: number) { const date = new Date(timestamp); - return date.toISOString().slice(0, 23).replace("T", " "); + return date.toISOString().slice(0, 23).replace("T", " ") + '+08'; } export function parseTimestampFromPsql(timestamp: string) { From 1a20d5afe0df3865d8212f89a011938f3df592db Mon Sep 17 00:00:00 2001 From: alikia2x Date: Sat, 31 May 2025 12:13:56 +0800 Subject: [PATCH 6/7] update: schedule archive snapshots to next Saturday midnight fix: no expire when acquiring lock for classifyVideos ref: format --- bun.lock | 1 + package.json | 37 ++++++------ packages/crawler/db/bilibili_metadata.ts | 60 +++++++++---------- packages/crawler/db/snapshot.ts | 4 +- packages/crawler/db/snapshotSchedule.ts | 6 +- packages/crawler/db/songs.ts | 20 +++---- packages/crawler/ml/akari.ts | 21 +++---- packages/crawler/ml/const.ts | 2 +- packages/crawler/ml/manager.ts | 3 +- packages/crawler/mq/exec/archiveSnapshots.ts | 26 ++++++-- packages/crawler/mq/exec/classifyVideo.ts | 4 +- packages/crawler/mq/exec/collectSongs.ts | 4 +- .../mq/exec/dispatchMilestoneSnapshots.ts | 2 +- .../mq/exec/dispatchRegularSnapshots.ts | 5 +- packages/crawler/mq/exec/executors.ts | 2 +- packages/crawler/mq/exec/getLatestVideos.ts | 4 +- packages/crawler/mq/exec/getVideoInfo.ts | 2 +- packages/crawler/mq/exec/snapshotTick.ts | 32 ++++++---- packages/crawler/mq/exec/snapshotVideo.ts | 28 ++++----- packages/crawler/mq/exec/takeBulkSnapshot.ts | 10 +--- packages/crawler/mq/index.ts | 6 +- packages/crawler/mq/init.ts | 4 +- packages/crawler/mq/scheduling.ts | 11 ++-- packages/crawler/mq/task/collectSongs.ts | 2 +- packages/crawler/mq/task/getVideoDetails.ts | 8 +-- packages/crawler/mq/task/getVideoStats.ts | 12 ++-- packages/crawler/mq/task/queueLatestVideo.ts | 26 ++++---- .../mq/task/regularSnapshotInterval.ts | 2 +- .../mq/task/removeAllTimeoutSchedules.ts | 8 +-- packages/crawler/package.json | 3 +- packages/crawler/src/build.ts | 5 +- packages/crawler/src/bullui.ts | 4 +- packages/crawler/src/filterWorker.ts | 12 ++-- packages/crawler/src/worker.ts | 18 +++--- .../crawler/test/db/snapshotSchedule.test.ts | 52 ++++++++-------- .../crawler/utils/formatTimestampToPostgre.ts | 2 +- packages/crawler/vitest.config.ts | 4 +- 37 files changed, 226 insertions(+), 226 deletions(-) diff --git a/bun.lock b/bun.lock index 251cf70..3147ce3 100644 --- a/bun.lock +++ b/bun.lock @@ -7,6 +7,7 @@ "postgres": "^3.4.5", }, "devDependencies": { + "prettier": "^3.5.3", "vite-tsconfig-paths": "^5.1.4", "vitest": "^3.1.2", "vitest-tsconfig-paths": "^3.4.1", diff --git a/package.json b/package.json index 2afb5de..2de6888 100644 --- a/package.json +++ b/package.json @@ -1,20 +1,21 @@ { - "name": "cvsa", - "version": "2.13.22", - "private": false, - "type": "module", - "workspaces": [ - "packages/frontend", - "packages/core", - "packages/backend", - "packages/crawler" - ], - "dependencies": { - "postgres": "^3.4.5" - }, - "devDependencies": { - "vite-tsconfig-paths": "^5.1.4", - "vitest": "^3.1.2", - "vitest-tsconfig-paths": "^3.4.1" - } + "name": "cvsa", + "version": "2.13.22", + "private": false, + "type": "module", + "workspaces": [ + "packages/frontend", + "packages/core", + "packages/backend", + "packages/crawler" + ], + "dependencies": { + "postgres": "^3.4.5" + }, + "devDependencies": { + "vite-tsconfig-paths": "^5.1.4", + "vitest": "^3.1.2", + "vitest-tsconfig-paths": "^3.4.1", + "prettier": "^3.5.3" + } } diff --git a/packages/crawler/db/bilibili_metadata.ts b/packages/crawler/db/bilibili_metadata.ts index 6c68516..f50c888 100644 --- a/packages/crawler/db/bilibili_metadata.ts +++ b/packages/crawler/db/bilibili_metadata.ts @@ -3,75 +3,75 @@ import { AllDataType, BiliUserType } from "@core/db/schema"; import { AkariModelVersion } from "ml/const"; export async function videoExistsInAllData(sql: Psql, aid: number) { - const rows = await sql<{ exists: boolean }[]>` + const rows = await sql<{ exists: boolean }[]>` SELECT EXISTS(SELECT 1 FROM bilibili_metadata WHERE aid = ${aid}) `; - return rows[0].exists; + return rows[0].exists; } export async function userExistsInBiliUsers(sql: Psql, uid: number) { - const rows = await sql<{ exists: boolean }[]>` + const rows = await sql<{ exists: boolean }[]>` SELECT EXISTS(SELECT 1 FROM bilibili_user WHERE uid = ${uid}) `; - return rows[0].exists; + return rows[0].exists; } export async function getUnlabelledVideos(sql: Psql) { - const rows = await sql<{ aid: number }[]>` + const rows = await sql<{ aid: number }[]>` SELECT a.aid FROM bilibili_metadata a LEFT JOIN labelling_result l ON a.aid = l.aid WHERE l.aid IS NULL `; - return rows.map((row) => row.aid); + return rows.map((row) => row.aid); } export async function insertVideoLabel(sql: Psql, aid: number, label: number) { - await sql` + await sql` INSERT INTO labelling_result (aid, label, model_version) VALUES (${aid}, ${label}, ${AkariModelVersion}) ON CONFLICT (aid, model_version) DO NOTHING `; } export async function getVideoInfoFromAllData(sql: Psql, aid: number) { - const rows = await sql` + const rows = await sql` SELECT * FROM bilibili_metadata WHERE aid = ${aid} `; - const row = rows[0]; - let authorInfo = ""; - if (row.uid && await userExistsInBiliUsers(sql, row.uid)) { - const userRows = await sql` + const row = rows[0]; + let authorInfo = ""; + if (row.uid && (await userExistsInBiliUsers(sql, row.uid))) { + const userRows = await sql` SELECT * FROM bilibili_user WHERE uid = ${row.uid} `; - const userRow = userRows[0]; - if (userRow) { - authorInfo = userRow.desc; - } - } - return { - title: row.title, - description: row.description, - tags: row.tags, - author_info: authorInfo, - }; + const userRow = userRows[0]; + if (userRow) { + authorInfo = userRow.desc; + } + } + return { + title: row.title, + description: row.description, + tags: row.tags, + author_info: authorInfo + }; } export async function getUnArchivedBiliUsers(sql: Psql) { - const rows = await sql<{ uid: number }[]>` + const rows = await sql<{ uid: number }[]>` SELECT ad.uid FROM bilibili_metadata ad LEFT JOIN bilibili_user bu ON ad.uid = bu.uid WHERE bu.uid IS NULL; `; - return rows.map((row) => row.uid); + return rows.map((row) => row.uid); } export async function setBiliVideoStatus(sql: Psql, aid: number, status: number) { - await sql` + await sql` UPDATE bilibili_metadata SET status = ${status} WHERE aid = ${aid} `; } export async function getBiliVideoStatus(sql: Psql, aid: number) { - const rows = await sql<{ status: number }[]>` + const rows = await sql<{ status: number }[]>` SELECT status FROM bilibili_metadata WHERE aid = ${aid} `; - if (rows.length === 0) return 0; - return rows[0].status; -} \ No newline at end of file + if (rows.length === 0) return 0; + return rows[0].status; +} diff --git a/packages/crawler/db/snapshot.ts b/packages/crawler/db/snapshot.ts index 16df13c..0b4c5fe 100644 --- a/packages/crawler/db/snapshot.ts +++ b/packages/crawler/db/snapshot.ts @@ -22,7 +22,7 @@ export async function getVideosNearMilestone(sql: Psql) { return queryResult.map((row) => { return { ...row, - aid: Number(row.aid), + aid: Number(row.aid) }; }); } @@ -40,7 +40,7 @@ export async function getLatestVideoSnapshot(sql: Psql, aid: number): Promise 0; } @@ -90,7 +90,7 @@ export async function bulkGetVideosWithoutProcessingSchedules(sql: Psql, aids: n WHERE aid = ANY(${aids}) AND status != 'processing' GROUP BY aid - ` + `; return rows.map((row) => Number(row.aid)); } @@ -346,6 +346,6 @@ export async function getAllVideosWithoutActiveSnapshotSchedule(psql: Psql) { FROM bilibili_metadata s LEFT JOIN snapshot_schedule ss ON s.aid = ss.aid AND (ss.status = 'pending' OR ss.status = 'processing') WHERE ss.aid IS NULL - ` + `; return rows.map((r) => Number(r.aid)); } diff --git a/packages/crawler/db/songs.ts b/packages/crawler/db/songs.ts index ebdd08c..ee4f042 100644 --- a/packages/crawler/db/songs.ts +++ b/packages/crawler/db/songs.ts @@ -2,7 +2,7 @@ import type { Psql } from "@core/db/psql.d.ts"; import { parseTimestampFromPsql } from "utils/formatTimestampToPostgre.ts"; export async function getNotCollectedSongs(sql: Psql) { - const rows = await sql<{ aid: number }[]>` + const rows = await sql<{ aid: number }[]>` SELECT lr.aid FROM labelling_result lr WHERE lr.label != 0 @@ -12,28 +12,28 @@ export async function getNotCollectedSongs(sql: Psql) { WHERE s.aid = lr.aid ); `; - return rows.map((row) => row.aid); + return rows.map((row) => row.aid); } export async function aidExistsInSongs(sql: Psql, aid: number) { - const rows = await sql<{ exists: boolean }[]>` + const rows = await sql<{ exists: boolean }[]>` SELECT EXISTS ( SELECT 1 FROM songs WHERE aid = ${aid} ); `; - return rows[0].exists; + return rows[0].exists; } export async function getSongsPublihsedAt(sql: Psql, aid: number) { - const rows = await sql<{ published_at: string }[]>` + const rows = await sql<{ published_at: string }[]>` SELECT published_at FROM songs WHERE aid = ${aid}; `; - if (rows.length === 0) { - return null; - } - return parseTimestampFromPsql(rows[0].published_at); -} \ No newline at end of file + if (rows.length === 0) { + return null; + } + return parseTimestampFromPsql(rows[0].published_at); +} diff --git a/packages/crawler/ml/akari.ts b/packages/crawler/ml/akari.ts index d76317f..ebf085f 100644 --- a/packages/crawler/ml/akari.ts +++ b/packages/crawler/ml/akari.ts @@ -16,8 +16,8 @@ class AkariProto extends AIManager { constructor() { super(); this.models = { - "classifier": onnxClassifierPath, - "embedding": onnxEmbeddingPath, + classifier: onnxClassifierPath, + embedding: onnxEmbeddingPath }; } @@ -55,7 +55,7 @@ class AkariProto extends AIManager { const { input_ids } = await tokenizer(texts, { add_special_tokens: false, - return_tensor: false, + return_tensor: false }); const cumsum = (arr: number[]): number[] => @@ -66,9 +66,9 @@ class AkariProto extends AIManager { const inputs = { input_ids: new ort.Tensor("int64", new BigInt64Array(flattened_input_ids.map(BigInt)), [ - flattened_input_ids.length, + flattened_input_ids.length ]), - offsets: new ort.Tensor("int64", new BigInt64Array(offsets.map(BigInt)), [offsets.length]), + offsets: new ort.Tensor("int64", new BigInt64Array(offsets.map(BigInt)), [offsets.length]) }; const { embeddings } = await session.run(inputs); @@ -77,21 +77,14 @@ class AkariProto extends AIManager { private async runClassification(embeddings: number[]): Promise { const session = this.getModelSession("classifier"); - const inputTensor = new ort.Tensor( - Float32Array.from(embeddings), - [1, 3, 1024], - ); + const inputTensor = new ort.Tensor(Float32Array.from(embeddings), [1, 3, 1024]); const { logits } = await session.run({ channel_features: inputTensor }); return this.softmax(logits.data as Float32Array); } public async classifyVideo(title: string, description: string, tags: string, aid?: number): Promise { - const embeddings = await this.getJinaEmbeddings1024([ - title, - description, - tags, - ]); + const embeddings = await this.getJinaEmbeddings1024([title, description, tags]); const probabilities = await this.runClassification(embeddings); if (aid) { logger.log(`Prediction result for aid: ${aid}: [${probabilities.map((p) => p.toFixed(5))}]`, "ml"); diff --git a/packages/crawler/ml/const.ts b/packages/crawler/ml/const.ts index 4eb1525..a2f1ea9 100644 --- a/packages/crawler/ml/const.ts +++ b/packages/crawler/ml/const.ts @@ -1 +1 @@ -export const AkariModelVersion = "3.17"; \ No newline at end of file +export const AkariModelVersion = "3.17"; diff --git a/packages/crawler/ml/manager.ts b/packages/crawler/ml/manager.ts index d876844..d193bc6 100644 --- a/packages/crawler/ml/manager.ts +++ b/packages/crawler/ml/manager.ts @@ -6,8 +6,7 @@ export class AIManager { public sessions: { [key: string]: ort.InferenceSession } = {}; public models: { [key: string]: string } = {}; - constructor() { - } + constructor() {} public async init() { const modelKeys = Object.keys(this.models); diff --git a/packages/crawler/mq/exec/archiveSnapshots.ts b/packages/crawler/mq/exec/archiveSnapshots.ts index c667416..f6e67ef 100644 --- a/packages/crawler/mq/exec/archiveSnapshots.ts +++ b/packages/crawler/mq/exec/archiveSnapshots.ts @@ -3,9 +3,26 @@ import { getAllVideosWithoutActiveSnapshotSchedule, scheduleSnapshot } from "db/ import logger from "@core/log/logger.ts"; import { lockManager } from "@core/mq/lockManager.ts"; import { getLatestVideoSnapshot } from "db/snapshot.ts"; -import { HOUR, MINUTE } from "@core/const/time.ts"; +import { MINUTE } from "@core/const/time.ts"; import { sql } from "@core/db/dbNew"; +function getNextSaturdayMidnightTimestamp(): number { + const now = new Date(); + const currentDay = now.getDay(); + + let daysUntilNextSaturday = (6 - currentDay + 7) % 7; + + if (daysUntilNextSaturday === 0) { + daysUntilNextSaturday = 7; + } + + const nextSaturday = new Date(now); + nextSaturday.setDate(nextSaturday.getDate() + daysUntilNextSaturday); + nextSaturday.setHours(0, 0, 0, 0); + + return nextSaturday.getTime(); +} + export const archiveSnapshotsWorker = async (_job: Job) => { try { const startedAt = Date.now(); @@ -20,15 +37,16 @@ export const archiveSnapshotsWorker = async (_job: Job) => { const latestSnapshot = await getLatestVideoSnapshot(sql, aid); const now = Date.now(); const lastSnapshotedAt = latestSnapshot?.time ?? now; - const interval = 168; + const nextSatMidnight = getNextSaturdayMidnightTimestamp(); + const interval = nextSatMidnight - now; logger.log( `Scheduled archive snapshot for aid ${aid} in ${interval} hours.`, "mq", "fn:archiveSnapshotsWorker" ); - const targetTime = lastSnapshotedAt + interval * HOUR; + const targetTime = lastSnapshotedAt + interval; await scheduleSnapshot(sql, aid, "archive", targetTime); - if (now - startedAt > 250 * MINUTE) { + if (now - startedAt > 30 * MINUTE) { return; } } diff --git a/packages/crawler/mq/exec/classifyVideo.ts b/packages/crawler/mq/exec/classifyVideo.ts index 6c3d37c..4a4a58a 100644 --- a/packages/crawler/mq/exec/classifyVideo.ts +++ b/packages/crawler/mq/exec/classifyVideo.ts @@ -34,7 +34,7 @@ export const classifyVideoWorker = async (job: Job) => { await job.updateData({ ...job.data, - label: label, + label: label }); return 0; @@ -46,7 +46,7 @@ export const classifyVideosWorker = async () => { return; } - await lockManager.acquireLock("classifyVideos"); + await lockManager.acquireLock("classifyVideos", 5 * 60); const videos = await getUnlabelledVideos(sql); logger.log(`Found ${videos.length} unlabelled videos`); diff --git a/packages/crawler/mq/exec/collectSongs.ts b/packages/crawler/mq/exec/collectSongs.ts index bd58179..b354c2b 100644 --- a/packages/crawler/mq/exec/collectSongs.ts +++ b/packages/crawler/mq/exec/collectSongs.ts @@ -1,7 +1,7 @@ import { Job } from "bullmq"; import { collectSongs } from "mq/task/collectSongs.ts"; -export const collectSongsWorker = async (_job: Job): Promise =>{ +export const collectSongsWorker = async (_job: Job): Promise => { await collectSongs(); return; -} \ No newline at end of file +}; diff --git a/packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts b/packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts index 5634132..1ed8b26 100644 --- a/packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts +++ b/packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts @@ -26,4 +26,4 @@ export const dispatchMilestoneSnapshotsWorker = async (_job: Job) => { } catch (e) { logger.error(e as Error, "mq", "fn:dispatchMilestoneSnapshotsWorker"); } -} \ No newline at end of file +}; diff --git a/packages/crawler/mq/exec/dispatchRegularSnapshots.ts b/packages/crawler/mq/exec/dispatchRegularSnapshots.ts index 3c4feb7..9b47b03 100644 --- a/packages/crawler/mq/exec/dispatchRegularSnapshots.ts +++ b/packages/crawler/mq/exec/dispatchRegularSnapshots.ts @@ -1,10 +1,7 @@ import { Job } from "bullmq"; import { getLatestVideoSnapshot } from "db/snapshot.ts"; import { truncate } from "utils/truncate.ts"; -import { - getVideosWithoutActiveSnapshotScheduleByType, - scheduleSnapshot -} from "db/snapshotSchedule.ts"; +import { getVideosWithoutActiveSnapshotScheduleByType, scheduleSnapshot } from "db/snapshotSchedule.ts"; import logger from "@core/log/logger.ts"; import { HOUR, MINUTE, WEEK } from "@core/const/time.ts"; import { lockManager } from "@core/mq/lockManager.ts"; diff --git a/packages/crawler/mq/exec/executors.ts b/packages/crawler/mq/exec/executors.ts index f7b9cf8..c2969ed 100644 --- a/packages/crawler/mq/exec/executors.ts +++ b/packages/crawler/mq/exec/executors.ts @@ -7,4 +7,4 @@ export * from "./dispatchMilestoneSnapshots.ts"; export * from "./dispatchRegularSnapshots.ts"; export * from "./snapshotVideo.ts"; export * from "./scheduleCleanup.ts"; -export * from "./snapshotTick.ts"; \ No newline at end of file +export * from "./snapshotTick.ts"; diff --git a/packages/crawler/mq/exec/getLatestVideos.ts b/packages/crawler/mq/exec/getLatestVideos.ts index b3d93f1..f2f4351 100644 --- a/packages/crawler/mq/exec/getLatestVideos.ts +++ b/packages/crawler/mq/exec/getLatestVideos.ts @@ -2,6 +2,6 @@ import { sql } from "@core/db/dbNew"; import { Job } from "bullmq"; import { queueLatestVideos } from "mq/task/queueLatestVideo.ts"; -export const getLatestVideosWorker = async (_job: Job): Promise =>{ +export const getLatestVideosWorker = async (_job: Job): Promise => { await queueLatestVideos(sql); -} +}; diff --git a/packages/crawler/mq/exec/getVideoInfo.ts b/packages/crawler/mq/exec/getVideoInfo.ts index ee96b2d..889bd7e 100644 --- a/packages/crawler/mq/exec/getVideoInfo.ts +++ b/packages/crawler/mq/exec/getVideoInfo.ts @@ -10,4 +10,4 @@ export const getVideoInfoWorker = async (job: Job): Promise => { return; } await insertVideoInfo(sql, aid); -} +}; diff --git a/packages/crawler/mq/exec/snapshotTick.ts b/packages/crawler/mq/exec/snapshotTick.ts index d599fca..ca596c8 100644 --- a/packages/crawler/mq/exec/snapshotTick.ts +++ b/packages/crawler/mq/exec/snapshotTick.ts @@ -5,15 +5,15 @@ import { getBulkSnapshotsInNextSecond, getSnapshotsInNextSecond, setSnapshotStatus, - videoHasProcessingSchedule, + videoHasProcessingSchedule } from "db/snapshotSchedule.ts"; import logger from "@core/log/logger.ts"; import { SnapshotQueue } from "mq/index.ts"; import { sql } from "@core/db/dbNew"; const priorityMap: { [key: string]: number } = { - "milestone": 1, - "normal": 3, + milestone: 1, + normal: 3 }; export const bulkSnapshotTickWorker = async (_job: Job) => { @@ -35,12 +35,16 @@ export const bulkSnapshotTickWorker = async (_job: Job) => { created_at: schedule.created_at, started_at: schedule.started_at, finished_at: schedule.finished_at, - status: schedule.status, + status: schedule.status }; }); - await SnapshotQueue.add("bulkSnapshotVideo", { - schedules: schedulesData, - }, { priority: 3 }); + await SnapshotQueue.add( + "bulkSnapshotVideo", + { + schedules: schedulesData + }, + { priority: 3 } + ); } return `OK`; } catch (e) { @@ -61,11 +65,15 @@ export const snapshotTickWorker = async (_job: Job) => { } const aid = Number(schedule.aid); await setSnapshotStatus(sql, schedule.id, "processing"); - await SnapshotQueue.add("snapshotVideo", { - aid: Number(aid), - id: Number(schedule.id), - type: schedule.type ?? "normal", - }, { priority }); + await SnapshotQueue.add( + "snapshotVideo", + { + aid: Number(aid), + id: Number(schedule.id), + type: schedule.type ?? "normal" + }, + { priority } + ); } return `OK`; } catch (e) { diff --git a/packages/crawler/mq/exec/snapshotVideo.ts b/packages/crawler/mq/exec/snapshotVideo.ts index 220ceb4..8d8d241 100644 --- a/packages/crawler/mq/exec/snapshotVideo.ts +++ b/packages/crawler/mq/exec/snapshotVideo.ts @@ -10,9 +10,9 @@ import { NetSchedulerError } from "@core/net/delegate.ts"; import { sql } from "@core/db/dbNew.ts"; const snapshotTypeToTaskMap: { [key: string]: string } = { - "milestone": "snapshotMilestoneVideo", - "normal": "snapshotVideo", - "new": "snapshotMilestoneVideo", + milestone: "snapshotMilestoneVideo", + normal: "snapshotVideo", + new: "snapshotMilestoneVideo" }; export const snapshotVideoWorker = async (job: Job): Promise => { @@ -31,7 +31,7 @@ export const snapshotVideoWorker = async (job: Job): Promise => { logger.warn( `Video ${aid} has status ${status} in the database. Abort snapshoting.`, "mq", - "fn:dispatchRegularSnapshotsWorker", + "fn:dispatchRegularSnapshotsWorker" ); return; } @@ -43,7 +43,7 @@ export const snapshotVideoWorker = async (job: Job): Promise => { logger.warn( `Bilibili return status ${status} when snapshoting for ${aid}.`, "mq", - "fn:dispatchRegularSnapshotsWorker", + "fn:dispatchRegularSnapshotsWorker" ); return; } @@ -51,7 +51,7 @@ export const snapshotVideoWorker = async (job: Job): Promise => { if (type === "new") { const publihsedAt = await getSongsPublihsedAt(sql, aid); const timeSincePublished = stat.time - publihsedAt!; - const viewsPerHour = stat.views / timeSincePublished * HOUR; + const viewsPerHour = (stat.views / timeSincePublished) * HOUR; if (timeSincePublished > 48 * HOUR) { return; } @@ -77,7 +77,7 @@ export const snapshotVideoWorker = async (job: Job): Promise => { logger.warn( `ETA (${etaHoursString}) too long for milestone snapshot. aid: ${aid}.`, "mq", - "fn:snapshotVideoWorker", + "fn:snapshotVideoWorker" ); return; } @@ -86,23 +86,17 @@ export const snapshotVideoWorker = async (job: Job): Promise => { await scheduleSnapshot(sql, aid, type, targetTime); await setSnapshotStatus(sql, id, "completed"); return; - } - catch (e) { + } catch (e) { if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") { - logger.warn( - `No available proxy for aid ${job.data.aid}.`, - "mq", - "fn:snapshotVideoWorker", - ); + logger.warn(`No available proxy for aid ${job.data.aid}.`, "mq", "fn:snapshotVideoWorker"); await setSnapshotStatus(sql, id, "no_proxy"); await scheduleSnapshot(sql, aid, type, Date.now() + retryInterval); return; - } - else if (e instanceof NetSchedulerError && e.code === "ALICLOUD_PROXY_ERR") { + } else if (e instanceof NetSchedulerError && e.code === "ALICLOUD_PROXY_ERR") { logger.warn( `Failed to proxy request for aid ${job.data.aid}: ${e.message}`, "mq", - "fn:snapshotVideoWorker", + "fn:snapshotVideoWorker" ); await setSnapshotStatus(sql, id, "failed"); await scheduleSnapshot(sql, aid, type, Date.now() + retryInterval); diff --git a/packages/crawler/mq/exec/takeBulkSnapshot.ts b/packages/crawler/mq/exec/takeBulkSnapshot.ts index a71add1..afaf239 100644 --- a/packages/crawler/mq/exec/takeBulkSnapshot.ts +++ b/packages/crawler/mq/exec/takeBulkSnapshot.ts @@ -3,7 +3,7 @@ import { bulkScheduleSnapshot, bulkSetSnapshotStatus, scheduleSnapshot, - snapshotScheduleExists, + snapshotScheduleExists } from "db/snapshotSchedule.ts"; import { bulkGetVideoStats } from "net/bulkGetVideoStats.ts"; import logger from "@core/log/logger.ts"; @@ -55,7 +55,7 @@ export const takeBulkSnapshotForVideosWorker = async (job: Job) => { ${shares}, ${favorites} ) - ` + `; logger.log(`Taken snapshot for video ${aid} in bulk.`, "net", "fn:takeBulkSnapshotForVideosWorker"); } @@ -72,11 +72,7 @@ export const takeBulkSnapshotForVideosWorker = async (job: Job) => { return `DONE`; } catch (e) { if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") { - logger.warn( - `No available proxy for bulk request now.`, - "mq", - "fn:takeBulkSnapshotForVideosWorker", - ); + logger.warn(`No available proxy for bulk request now.`, "mq", "fn:takeBulkSnapshotForVideosWorker"); await bulkSetSnapshotStatus(sql, ids, "no_proxy"); await bulkScheduleSnapshot(sql, aidsToFetch, "normal", Date.now() + 20 * MINUTE * Math.random()); return; diff --git a/packages/crawler/mq/index.ts b/packages/crawler/mq/index.ts index 62d5d86..0644d05 100644 --- a/packages/crawler/mq/index.ts +++ b/packages/crawler/mq/index.ts @@ -2,13 +2,13 @@ import { Queue, ConnectionOptions } from "bullmq"; import { redis } from "@core/db/redis.ts"; export const LatestVideosQueue = new Queue("latestVideos", { - connection: redis as ConnectionOptions + connection: redis as ConnectionOptions }); export const ClassifyVideoQueue = new Queue("classifyVideo", { - connection: redis as ConnectionOptions + connection: redis as ConnectionOptions }); export const SnapshotQueue = new Queue("snapshot", { - connection: redis as ConnectionOptions + connection: redis as ConnectionOptions }); diff --git a/packages/crawler/mq/init.ts b/packages/crawler/mq/init.ts index f646c08..e6c5e52 100644 --- a/packages/crawler/mq/init.ts +++ b/packages/crawler/mq/init.ts @@ -62,8 +62,8 @@ export async function initMQ() { }); await SnapshotQueue.upsertJobScheduler("dispatchArchiveSnapshots", { - every: 6 * HOUR, - immediately: true + every: 2 * HOUR, + immediately: false }); await SnapshotQueue.upsertJobScheduler("scheduleCleanup", { diff --git a/packages/crawler/mq/scheduling.ts b/packages/crawler/mq/scheduling.ts index f80904a..dcc8ad3 100644 --- a/packages/crawler/mq/scheduling.ts +++ b/packages/crawler/mq/scheduling.ts @@ -12,13 +12,12 @@ const getFactor = (x: number) => { const c = 100; const u = 0.601; const g = 455; - if (x>g) { - return log(b/log(x+1),a); + if (x > g) { + return log(b / log(x + 1), a); + } else { + return log(b / log(x + c), a) + u; } - else { - return log(b/log(x+c),a)+u; - } -} +}; /* * Returns the minimum ETA in hours for the next snapshot diff --git a/packages/crawler/mq/task/collectSongs.ts b/packages/crawler/mq/task/collectSongs.ts index dcf5472..af3178e 100644 --- a/packages/crawler/mq/task/collectSongs.ts +++ b/packages/crawler/mq/task/collectSongs.ts @@ -25,5 +25,5 @@ export async function insertIntoSongs(sql: Psql, aid: number) { (SELECT duration FROM bilibili_metadata WHERE aid = ${aid}) ) ON CONFLICT DO NOTHING - ` + `; } diff --git a/packages/crawler/mq/task/getVideoDetails.ts b/packages/crawler/mq/task/getVideoDetails.ts index 1fc618a..cd67994 100644 --- a/packages/crawler/mq/task/getVideoDetails.ts +++ b/packages/crawler/mq/task/getVideoDetails.ts @@ -18,9 +18,9 @@ export async function insertVideoInfo(sql: Psql, aid: number) { const bvid = data.View.bvid; const desc = data.View.desc; const uid = data.View.owner.mid; - const tags = data.Tags - .filter((tag) => !["old_channel", "topic"].indexOf(tag.tag_type)) - .map((tag) => tag.tag_name).join(","); + const tags = data.Tags.filter((tag) => !["old_channel", "topic"].indexOf(tag.tag_type)) + .map((tag) => tag.tag_name) + .join(","); const title = data.View.title; const published_at = formatTimestampToPsql(data.View.pubdate * SECOND + 8 * HOUR); const duration = data.View.duration; @@ -55,7 +55,7 @@ export async function insertVideoInfo(sql: Psql, aid: number) { ${stat.share}, ${stat.favorite} ) - ` + `; logger.log(`Inserted video metadata for aid: ${aid}`, "mq"); await ClassifyVideoQueue.add("classifyVideo", { aid }); diff --git a/packages/crawler/mq/task/getVideoStats.ts b/packages/crawler/mq/task/getVideoStats.ts index ffec09f..afd5b72 100644 --- a/packages/crawler/mq/task/getVideoStats.ts +++ b/packages/crawler/mq/task/getVideoStats.ts @@ -24,11 +24,7 @@ export interface SnapshotNumber { * - The native `fetch` function threw an error: with error code `FETCH_ERROR` * - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR` */ -export async function insertVideoSnapshot( - sql: Psql, - aid: number, - task: string, -): Promise { +export async function insertVideoSnapshot(sql: Psql, aid: number, task: string): Promise { const data = await getVideoInfo(aid, task); if (typeof data == "number") { return data; @@ -42,10 +38,10 @@ export async function insertVideoSnapshot( const shares = data.stat.share; const favorites = data.stat.favorite; - await sql` + await sql` INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites) VALUES (${aid}, ${views}, ${danmakus}, ${replies}, ${likes}, ${coins}, ${shares}, ${favorites}) - ` + `; logger.log(`Taken snapshot for video ${aid}.`, "net", "fn:insertVideoSnapshot"); @@ -58,6 +54,6 @@ export async function insertVideoSnapshot( coins, shares, favorites, - time, + time }; } diff --git a/packages/crawler/mq/task/queueLatestVideo.ts b/packages/crawler/mq/task/queueLatestVideo.ts index 9f583c2..193787a 100644 --- a/packages/crawler/mq/task/queueLatestVideo.ts +++ b/packages/crawler/mq/task/queueLatestVideo.ts @@ -6,9 +6,7 @@ import logger from "@core/log/logger.ts"; import { LatestVideosQueue } from "mq/index.ts"; import type { Psql } from "@core/db/psql.d.ts"; -export async function queueLatestVideos( - sql: Psql, -): Promise { +export async function queueLatestVideos(sql: Psql): Promise { let page = 1; let i = 0; const videosFound = new Set(); @@ -26,14 +24,18 @@ export async function queueLatestVideos( if (videoExists) { continue; } - await LatestVideosQueue.add("getVideoInfo", { aid }, { - delay, - attempts: 100, - backoff: { - type: "fixed", - delay: SECOND * 5, - }, - }); + await LatestVideosQueue.add( + "getVideoInfo", + { aid }, + { + delay, + attempts: 100, + backoff: { + type: "fixed", + delay: SECOND * 5 + } + } + ); videosFound.add(aid); allExists = false; delay += Math.random() * SECOND * 1.5; @@ -42,7 +44,7 @@ export async function queueLatestVideos( logger.log( `Page ${page} crawled, total: ${videosFound.size}/${i} videos added/observed.`, "net", - "fn:queueLatestVideos()", + "fn:queueLatestVideos()" ); if (allExists) { return 0; diff --git a/packages/crawler/mq/task/regularSnapshotInterval.ts b/packages/crawler/mq/task/regularSnapshotInterval.ts index 2490b44..306865a 100644 --- a/packages/crawler/mq/task/regularSnapshotInterval.ts +++ b/packages/crawler/mq/task/regularSnapshotInterval.ts @@ -14,7 +14,7 @@ export const getRegularSnapshotInterval = async (sql: Psql, aid: number) => { if (hoursDiff < 8) return 24; const viewsDiff = latestSnapshot.views - oldSnapshot.views; if (viewsDiff === 0) return 72; - const speedPerDay = viewsDiff / (hoursDiff + 0.001) * 24; + const speedPerDay = (viewsDiff / (hoursDiff + 0.001)) * 24; if (speedPerDay < 6) return 36; if (speedPerDay < 120) return 24; if (speedPerDay < 320) return 12; diff --git a/packages/crawler/mq/task/removeAllTimeoutSchedules.ts b/packages/crawler/mq/task/removeAllTimeoutSchedules.ts index ccca69a..325f08b 100644 --- a/packages/crawler/mq/task/removeAllTimeoutSchedules.ts +++ b/packages/crawler/mq/task/removeAllTimeoutSchedules.ts @@ -2,14 +2,10 @@ import { sql } from "@core/db/dbNew"; import logger from "@core/log/logger.ts"; export async function removeAllTimeoutSchedules() { - logger.log( - "Too many timeout schedules, directly removing these schedules...", - "mq", - "fn:scheduleCleanupWorker", - ); + logger.log("Too many timeout schedules, directly removing these schedules...", "mq", "fn:scheduleCleanupWorker"); return await sql` DELETE FROM snapshot_schedule WHERE status IN ('pending', 'processing') AND started_at < NOW() - INTERVAL '30 minutes' `; -} \ No newline at end of file +} diff --git a/packages/crawler/package.json b/packages/crawler/package.json index b602ea4..9e84eb1 100644 --- a/packages/crawler/package.json +++ b/packages/crawler/package.json @@ -7,7 +7,8 @@ "worker:filter": "bun run ./build/filterWorker.js", "adder": "bun run ./src/jobAdder.ts", "bullui": "bun run ./src/bullui.ts", - "all": "bun run concurrently --restart-tries -1 'bun run worker:main' 'bun run adder' 'bun run worker:filter'" + "all": "bun run concurrently --restart-tries -1 'bun run worker:main' 'bun run adder' 'bun run worker:filter'", + "format": "prettier --write ." }, "devDependencies": { "concurrently": "^9.1.2" diff --git a/packages/crawler/src/build.ts b/packages/crawler/src/build.ts index 942672c..42b7210 100644 --- a/packages/crawler/src/build.ts +++ b/packages/crawler/src/build.ts @@ -3,13 +3,12 @@ import Bun from "bun"; await Bun.build({ entrypoints: ["./src/filterWorker.ts"], outdir: "./build", - target: "node" + target: "node" }); - const file = Bun.file("./build/filterWorker.js"); const code = await file.text(); const modifiedCode = code.replaceAll("../bin/napi-v3/", "../../../node_modules/onnxruntime-node/bin/napi-v3/"); -await Bun.write("./build/filterWorker.js", modifiedCode); \ No newline at end of file +await Bun.write("./build/filterWorker.js", modifiedCode); diff --git a/packages/crawler/src/bullui.ts b/packages/crawler/src/bullui.ts index 5765540..8f2dcc7 100644 --- a/packages/crawler/src/bullui.ts +++ b/packages/crawler/src/bullui.ts @@ -11,9 +11,9 @@ createBullBoard({ queues: [ new BullMQAdapter(LatestVideosQueue), new BullMQAdapter(ClassifyVideoQueue), - new BullMQAdapter(SnapshotQueue), + new BullMQAdapter(SnapshotQueue) ], - serverAdapter: serverAdapter, + serverAdapter: serverAdapter }); const app = express(); diff --git a/packages/crawler/src/filterWorker.ts b/packages/crawler/src/filterWorker.ts index ab85cba..c740336 100644 --- a/packages/crawler/src/filterWorker.ts +++ b/packages/crawler/src/filterWorker.ts @@ -7,13 +7,13 @@ import { lockManager } from "@core/mq/lockManager.ts"; import Akari from "ml/akari.ts"; const shutdown = async (signal: string) => { - logger.log(`${signal} Received: Shutting down workers...`, "mq"); - await filterWorker.close(true); - process.exit(0); + logger.log(`${signal} Received: Shutting down workers...`, "mq"); + await filterWorker.close(true); + process.exit(0); }; -process.on('SIGINT', () => shutdown('SIGINT')); -process.on('SIGTERM', () => shutdown('SIGTERM')); +process.on("SIGINT", () => shutdown("SIGINT")); +process.on("SIGTERM", () => shutdown("SIGTERM")); await Akari.init(); @@ -29,7 +29,7 @@ const filterWorker = new Worker( break; } }, - { connection: redis as ConnectionOptions, concurrency: 2, removeOnComplete: { count: 1000 } }, + { connection: redis as ConnectionOptions, concurrency: 2, removeOnComplete: { count: 1000 } } ); filterWorker.on("active", () => { diff --git a/packages/crawler/src/worker.ts b/packages/crawler/src/worker.ts index 534f488..418c343 100644 --- a/packages/crawler/src/worker.ts +++ b/packages/crawler/src/worker.ts @@ -10,7 +10,7 @@ import { scheduleCleanupWorker, snapshotTickWorker, snapshotVideoWorker, - takeBulkSnapshotForVideosWorker, + takeBulkSnapshotForVideosWorker } from "mq/exec/executors.ts"; import { redis } from "@core/db/redis.ts"; import logger from "@core/log/logger.ts"; @@ -30,15 +30,15 @@ const releaseAllLocks = async () => { }; const shutdown = async (signal: string) => { - logger.log(`${signal} Received: Shutting down workers...`, "mq"); - await releaseAllLocks(); + logger.log(`${signal} Received: Shutting down workers...`, "mq"); + await releaseAllLocks(); await latestVideoWorker.close(true); await snapshotWorker.close(true); - process.exit(0); + process.exit(0); }; -process.on('SIGINT', () => shutdown('SIGINT')); -process.on('SIGTERM', () => shutdown('SIGTERM')); +process.on("SIGINT", () => shutdown("SIGINT")); +process.on("SIGTERM", () => shutdown("SIGTERM")); const latestVideoWorker = new Worker( "latestVideos", @@ -58,8 +58,8 @@ const latestVideoWorker = new Worker( connection: redis as ConnectionOptions, concurrency: 6, removeOnComplete: { count: 1440 }, - removeOnFail: { count: 0 }, - }, + removeOnFail: { count: 0 } + } ); latestVideoWorker.on("active", () => { @@ -95,7 +95,7 @@ const snapshotWorker = new Worker( break; } }, - { connection: redis as ConnectionOptions, concurrency: 50, removeOnComplete: { count: 2000 } }, + { connection: redis as ConnectionOptions, concurrency: 50, removeOnComplete: { count: 2000 } } ); snapshotWorker.on("error", (err) => { diff --git a/packages/crawler/test/db/snapshotSchedule.test.ts b/packages/crawler/test/db/snapshotSchedule.test.ts index b0ff0c1..28de210 100644 --- a/packages/crawler/test/db/snapshotSchedule.test.ts +++ b/packages/crawler/test/db/snapshotSchedule.test.ts @@ -56,65 +56,65 @@ const databasePreparationQuery = ` CREATE INDEX idx_snapshot_schedule_status ON snapshot_schedule USING btree (status); CREATE INDEX idx_snapshot_schedule_type ON snapshot_schedule USING btree (type); CREATE UNIQUE INDEX snapshot_schedule_pkey ON snapshot_schedule USING btree (id); -` +`; const cleanUpQuery = ` DROP SEQUENCE IF EXISTS "snapshot_schedule_id_seq" CASCADE; DROP TABLE IF EXISTS "snapshot_schedule" CASCADE; -` +`; async function testMocking() { - await sql.begin(async tx => { - await tx.unsafe(cleanUpQuery).simple(); + await sql.begin(async (tx) => { + await tx.unsafe(cleanUpQuery).simple(); await tx.unsafe(databasePreparationQuery).simple(); - - await tx` + + await tx` INSERT INTO snapshot_schedule - ${sql(mockSnapshotSchedules, 'aid', 'created_at', 'finished_at', 'id', 'started_at', 'status', 'type')} + ${sql(mockSnapshotSchedules, "aid", "created_at", "finished_at", "id", "started_at", "status", "type")} `; - await tx` + await tx` ROLLBACK; - ` + `; - await tx.unsafe(cleanUpQuery).simple(); - return; + await tx.unsafe(cleanUpQuery).simple(); + return; }); } async function testBulkSetSnapshotStatus() { - return await sql.begin(async tx => { - await tx.unsafe(cleanUpQuery).simple(); - await tx.unsafe(databasePreparationQuery).simple(); + return await sql.begin(async (tx) => { + await tx.unsafe(cleanUpQuery).simple(); + await tx.unsafe(databasePreparationQuery).simple(); - await tx` + await tx` INSERT INTO snapshot_schedule - ${sql(mockSnapshotSchedules, 'aid', 'created_at', 'finished_at', 'id', 'started_at', 'status', 'type')} + ${sql(mockSnapshotSchedules, "aid", "created_at", "finished_at", "id", "started_at", "status", "type")} `; const ids = [1, 2, 3]; - - await bulkSetSnapshotStatus(tx, ids, 'pending') - const rows = tx<{status: string}[]>` + await bulkSetSnapshotStatus(tx, ids, "pending"); + + const rows = tx<{ status: string }[]>` SELECT status FROM snapshot_schedule WHERE id = 1; `.execute(); - await tx` + await tx` ROLLBACK; - ` + `; - await tx.unsafe(cleanUpQuery).simple(); - return rows; + await tx.unsafe(cleanUpQuery).simple(); + return rows; }); } test("data mocking works", async () => { - await testMocking(); + await testMocking(); expect(() => {}).not.toThrowError(); }); test("bulkSetSnapshotStatus core logic works smoothly", async () => { - const rows = await testBulkSetSnapshotStatus(); - expect(rows.every(item => item.status === 'pending')).toBe(true); + const rows = await testBulkSetSnapshotStatus(); + expect(rows.every((item) => item.status === "pending")).toBe(true); }); diff --git a/packages/crawler/utils/formatTimestampToPostgre.ts b/packages/crawler/utils/formatTimestampToPostgre.ts index a9b5714..fa444b5 100644 --- a/packages/crawler/utils/formatTimestampToPostgre.ts +++ b/packages/crawler/utils/formatTimestampToPostgre.ts @@ -1,6 +1,6 @@ export function formatTimestampToPsql(timestamp: number) { const date = new Date(timestamp); - return date.toISOString().slice(0, 23).replace("T", " ") + '+08'; + return date.toISOString().slice(0, 23).replace("T", " ") + "+08"; } export function parseTimestampFromPsql(timestamp: string) { diff --git a/packages/crawler/vitest.config.ts b/packages/crawler/vitest.config.ts index fb5072b..63af38b 100644 --- a/packages/crawler/vitest.config.ts +++ b/packages/crawler/vitest.config.ts @@ -1,6 +1,6 @@ import { defineConfig } from "vitest/config"; -import tsconfigPaths from 'vite-tsconfig-paths' +import tsconfigPaths from "vite-tsconfig-paths"; export default defineConfig({ - plugins: [tsconfigPaths()] + plugins: [tsconfigPaths()] }); From 2c83b798816660814699ca77085230fbd61d28f5 Mon Sep 17 00:00:00 2001 From: alikia2x Date: Sat, 31 May 2025 12:23:01 +0800 Subject: [PATCH 7/7] update: termination condition to time-based in classifyVideosWorker --- packages/crawler/mq/exec/classifyVideo.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/crawler/mq/exec/classifyVideo.ts b/packages/crawler/mq/exec/classifyVideo.ts index 4a4a58a..50a760b 100644 --- a/packages/crawler/mq/exec/classifyVideo.ts +++ b/packages/crawler/mq/exec/classifyVideo.ts @@ -51,14 +51,14 @@ export const classifyVideosWorker = async () => { const videos = await getUnlabelledVideos(sql); logger.log(`Found ${videos.length} unlabelled videos`); - let i = 0; + const startTime = new Date().getTime(); for (const aid of videos) { - if (i > 200) { + const now = new Date().getTime(); + if (now - startTime > 4.2 * MINUTE) { await lockManager.releaseLock("classifyVideos"); - return 10000 + i; + return 1; } await ClassifyVideoQueue.add("classifyVideo", { aid: Number(aid) }); - i++; } await lockManager.releaseLock("classifyVideos"); return 0;