diff --git a/bun.lock b/bun.lock index e482ba5..12eff82 100644 --- a/bun.lock +++ b/bun.lock @@ -3,6 +3,15 @@ "workspaces": { "": { "name": "cvsa", + "dependencies": { + "postgres": "^3.4.5", + }, + "devDependencies": { + "prettier": "^3.5.3", + "vite-tsconfig-paths": "^5.1.4", + "vitest": "^3.1.2", + "vitest-tsconfig-paths": "^3.4.1", + }, }, "packages/backend": { "name": "@cvsa/backend", diff --git a/package.json b/package.json index 673369c..2de6888 100644 --- a/package.json +++ b/package.json @@ -1,12 +1,21 @@ { "name": "cvsa", "version": "2.13.22", - "private": true, + "private": false, "type": "module", "workspaces": [ "packages/frontend", "packages/core", "packages/backend", "packages/crawler" - ] + ], + "dependencies": { + "postgres": "^3.4.5" + }, + "devDependencies": { + "vite-tsconfig-paths": "^5.1.4", + "vitest": "^3.1.2", + "vitest-tsconfig-paths": "^3.4.1", + "prettier": "^3.5.3" + } } diff --git a/packages/crawler/db/bilibili_metadata.ts b/packages/crawler/db/bilibili_metadata.ts index 5b83e9c..b6b363f 100644 --- a/packages/crawler/db/bilibili_metadata.ts +++ b/packages/crawler/db/bilibili_metadata.ts @@ -30,9 +30,9 @@ export async function insertVideoLabel(sql: Psql, aid: number, label: number) { } export async function getVideoInfoFromAllData(sql: Psql, aid: number) { - const rows = await sql` + const rows = await sql` SELECT * FROM bilibili_metadata WHERE aid = ${aid} - `; + `; const row = rows[0]; let authorInfo = ""; if (row.uid && (await userExistsInBiliUsers(sql, row.uid))) { diff --git a/packages/crawler/db/snapshot.ts b/packages/crawler/db/snapshot.ts index 16df13c..0b4c5fe 100644 --- a/packages/crawler/db/snapshot.ts +++ b/packages/crawler/db/snapshot.ts @@ -22,7 +22,7 @@ export async function getVideosNearMilestone(sql: Psql) { return queryResult.map((row) => { return { ...row, - aid: Number(row.aid), + aid: Number(row.aid) }; }); } @@ -40,7 +40,7 @@ export async function getLatestVideoSnapshot(sql: Psql, aid: number): Promise 0; } -export async function videoHasActiveSchedule(sql: Psql, aid: number) { - const rows = await sql<{ status: string }[]>` - SELECT status - FROM snapshot_schedule - WHERE aid = ${aid} - AND (status = 'pending' - OR status = 'processing' - ) - ` - return rows.length > 0; -} - export async function videoHasActiveScheduleWithType(sql: Psql, aid: number, type: string) { const rows = await sql<{ status: string }[]>` SELECT status FROM snapshot_schedule @@ -91,7 +79,7 @@ export async function videoHasProcessingSchedule(sql: Psql, aid: number) { FROM snapshot_schedule WHERE aid = ${aid} AND status = 'processing' - ` + `; return rows.length > 0; } @@ -102,7 +90,7 @@ export async function bulkGetVideosWithoutProcessingSchedules(sql: Psql, aids: n WHERE aid = ANY(${aids}) AND status != 'processing' GROUP BY aid - ` + `; return rows.map((row) => Number(row.aid)); } @@ -292,23 +280,23 @@ export async function adjustSnapshotTime( } export async function getSnapshotsInNextSecond(sql: Psql) { - const rows = await sql` - SELECT * - FROM snapshot_schedule - WHERE started_at <= NOW() + INTERVAL '1 seconds' AND status = 'pending' AND type != 'normal' - ORDER BY - CASE - WHEN type = 'milestone' THEN 0 - ELSE 1 - END, - started_at - LIMIT 10; - ` - return rows; + return sql` + SELECT * + FROM snapshot_schedule + WHERE started_at <= NOW() + INTERVAL '1 seconds' + AND status = 'pending' + AND type != 'normal' + ORDER BY CASE + WHEN type = 'milestone' THEN 0 + ELSE 1 + END, + started_at + LIMIT 10; + `; } export async function getBulkSnapshotsInNextSecond(sql: Psql) { - const rows = await sql` + return sql` SELECT * FROM snapshot_schedule WHERE (started_at <= NOW() + INTERVAL '15 seconds') @@ -320,27 +308,33 @@ export async function getBulkSnapshotsInNextSecond(sql: Psql) { END, started_at LIMIT 1000; - ` - return rows; + `; } export async function setSnapshotStatus(sql: Psql, id: number, status: string) { - return await sql` - UPDATE snapshot_schedule SET status = ${status} WHERE id = ${id} + return sql` + UPDATE snapshot_schedule + SET status = ${status} + WHERE id = ${id} `; } export async function bulkSetSnapshotStatus(sql: Psql, ids: number[], status: string) { - return await sql` - UPDATE snapshot_schedule SET status = ${status} WHERE id = ANY(${ids}) + return sql` + UPDATE snapshot_schedule + SET status = ${status} + WHERE id = ANY (${ids}) `; } -export async function getVideosWithoutActiveSnapshotSchedule(sql: Psql) { +export async function getVideosWithoutActiveSnapshotScheduleByType(sql: Psql, type: string) { const rows = await sql<{ aid: string }[]>` SELECT s.aid FROM songs s - LEFT JOIN snapshot_schedule ss ON s.aid = ss.aid AND (ss.status = 'pending' OR ss.status = 'processing') + LEFT JOIN snapshot_schedule ss ON + s.aid = ss.aid AND + (ss.status = 'pending' OR ss.status = 'processing') AND + ss.type = ${type} WHERE ss.aid IS NULL `; return rows.map((r) => Number(r.aid)); @@ -352,6 +346,6 @@ export async function getAllVideosWithoutActiveSnapshotSchedule(psql: Psql) { FROM bilibili_metadata s LEFT JOIN snapshot_schedule ss ON s.aid = ss.aid AND (ss.status = 'pending' OR ss.status = 'processing') WHERE ss.aid IS NULL - ` + `; return rows.map((r) => Number(r.aid)); } diff --git a/packages/crawler/db/songs.ts b/packages/crawler/db/songs.ts index ebdd08c..ee4f042 100644 --- a/packages/crawler/db/songs.ts +++ b/packages/crawler/db/songs.ts @@ -2,7 +2,7 @@ import type { Psql } from "@core/db/psql.d.ts"; import { parseTimestampFromPsql } from "utils/formatTimestampToPostgre.ts"; export async function getNotCollectedSongs(sql: Psql) { - const rows = await sql<{ aid: number }[]>` + const rows = await sql<{ aid: number }[]>` SELECT lr.aid FROM labelling_result lr WHERE lr.label != 0 @@ -12,28 +12,28 @@ export async function getNotCollectedSongs(sql: Psql) { WHERE s.aid = lr.aid ); `; - return rows.map((row) => row.aid); + return rows.map((row) => row.aid); } export async function aidExistsInSongs(sql: Psql, aid: number) { - const rows = await sql<{ exists: boolean }[]>` + const rows = await sql<{ exists: boolean }[]>` SELECT EXISTS ( SELECT 1 FROM songs WHERE aid = ${aid} ); `; - return rows[0].exists; + return rows[0].exists; } export async function getSongsPublihsedAt(sql: Psql, aid: number) { - const rows = await sql<{ published_at: string }[]>` + const rows = await sql<{ published_at: string }[]>` SELECT published_at FROM songs WHERE aid = ${aid}; `; - if (rows.length === 0) { - return null; - } - return parseTimestampFromPsql(rows[0].published_at); -} \ No newline at end of file + if (rows.length === 0) { + return null; + } + return parseTimestampFromPsql(rows[0].published_at); +} diff --git a/packages/crawler/ml/akari.ts b/packages/crawler/ml/akari.ts index d76317f..ebf085f 100644 --- a/packages/crawler/ml/akari.ts +++ b/packages/crawler/ml/akari.ts @@ -16,8 +16,8 @@ class AkariProto extends AIManager { constructor() { super(); this.models = { - "classifier": onnxClassifierPath, - "embedding": onnxEmbeddingPath, + classifier: onnxClassifierPath, + embedding: onnxEmbeddingPath }; } @@ -55,7 +55,7 @@ class AkariProto extends AIManager { const { input_ids } = await tokenizer(texts, { add_special_tokens: false, - return_tensor: false, + return_tensor: false }); const cumsum = (arr: number[]): number[] => @@ -66,9 +66,9 @@ class AkariProto extends AIManager { const inputs = { input_ids: new ort.Tensor("int64", new BigInt64Array(flattened_input_ids.map(BigInt)), [ - flattened_input_ids.length, + flattened_input_ids.length ]), - offsets: new ort.Tensor("int64", new BigInt64Array(offsets.map(BigInt)), [offsets.length]), + offsets: new ort.Tensor("int64", new BigInt64Array(offsets.map(BigInt)), [offsets.length]) }; const { embeddings } = await session.run(inputs); @@ -77,21 +77,14 @@ class AkariProto extends AIManager { private async runClassification(embeddings: number[]): Promise { const session = this.getModelSession("classifier"); - const inputTensor = new ort.Tensor( - Float32Array.from(embeddings), - [1, 3, 1024], - ); + const inputTensor = new ort.Tensor(Float32Array.from(embeddings), [1, 3, 1024]); const { logits } = await session.run({ channel_features: inputTensor }); return this.softmax(logits.data as Float32Array); } public async classifyVideo(title: string, description: string, tags: string, aid?: number): Promise { - const embeddings = await this.getJinaEmbeddings1024([ - title, - description, - tags, - ]); + const embeddings = await this.getJinaEmbeddings1024([title, description, tags]); const probabilities = await this.runClassification(embeddings); if (aid) { logger.log(`Prediction result for aid: ${aid}: [${probabilities.map((p) => p.toFixed(5))}]`, "ml"); diff --git a/packages/crawler/ml/const.ts b/packages/crawler/ml/const.ts index 4eb1525..a2f1ea9 100644 --- a/packages/crawler/ml/const.ts +++ b/packages/crawler/ml/const.ts @@ -1 +1 @@ -export const AkariModelVersion = "3.17"; \ No newline at end of file +export const AkariModelVersion = "3.17"; diff --git a/packages/crawler/ml/manager.ts b/packages/crawler/ml/manager.ts index d876844..d193bc6 100644 --- a/packages/crawler/ml/manager.ts +++ b/packages/crawler/ml/manager.ts @@ -6,8 +6,7 @@ export class AIManager { public sessions: { [key: string]: ort.InferenceSession } = {}; public models: { [key: string]: string } = {}; - constructor() { - } + constructor() {} public async init() { const modelKeys = Object.keys(this.models); diff --git a/packages/crawler/mq/exec/archiveSnapshots.ts b/packages/crawler/mq/exec/archiveSnapshots.ts index c667416..f6e67ef 100644 --- a/packages/crawler/mq/exec/archiveSnapshots.ts +++ b/packages/crawler/mq/exec/archiveSnapshots.ts @@ -3,9 +3,26 @@ import { getAllVideosWithoutActiveSnapshotSchedule, scheduleSnapshot } from "db/ import logger from "@core/log/logger.ts"; import { lockManager } from "@core/mq/lockManager.ts"; import { getLatestVideoSnapshot } from "db/snapshot.ts"; -import { HOUR, MINUTE } from "@core/const/time.ts"; +import { MINUTE } from "@core/const/time.ts"; import { sql } from "@core/db/dbNew"; +function getNextSaturdayMidnightTimestamp(): number { + const now = new Date(); + const currentDay = now.getDay(); + + let daysUntilNextSaturday = (6 - currentDay + 7) % 7; + + if (daysUntilNextSaturday === 0) { + daysUntilNextSaturday = 7; + } + + const nextSaturday = new Date(now); + nextSaturday.setDate(nextSaturday.getDate() + daysUntilNextSaturday); + nextSaturday.setHours(0, 0, 0, 0); + + return nextSaturday.getTime(); +} + export const archiveSnapshotsWorker = async (_job: Job) => { try { const startedAt = Date.now(); @@ -20,15 +37,16 @@ export const archiveSnapshotsWorker = async (_job: Job) => { const latestSnapshot = await getLatestVideoSnapshot(sql, aid); const now = Date.now(); const lastSnapshotedAt = latestSnapshot?.time ?? now; - const interval = 168; + const nextSatMidnight = getNextSaturdayMidnightTimestamp(); + const interval = nextSatMidnight - now; logger.log( `Scheduled archive snapshot for aid ${aid} in ${interval} hours.`, "mq", "fn:archiveSnapshotsWorker" ); - const targetTime = lastSnapshotedAt + interval * HOUR; + const targetTime = lastSnapshotedAt + interval; await scheduleSnapshot(sql, aid, "archive", targetTime); - if (now - startedAt > 250 * MINUTE) { + if (now - startedAt > 30 * MINUTE) { return; } } diff --git a/packages/crawler/mq/exec/classifyVideo.ts b/packages/crawler/mq/exec/classifyVideo.ts index 6c3d37c..50a760b 100644 --- a/packages/crawler/mq/exec/classifyVideo.ts +++ b/packages/crawler/mq/exec/classifyVideo.ts @@ -34,7 +34,7 @@ export const classifyVideoWorker = async (job: Job) => { await job.updateData({ ...job.data, - label: label, + label: label }); return 0; @@ -46,19 +46,19 @@ export const classifyVideosWorker = async () => { return; } - await lockManager.acquireLock("classifyVideos"); + await lockManager.acquireLock("classifyVideos", 5 * 60); const videos = await getUnlabelledVideos(sql); logger.log(`Found ${videos.length} unlabelled videos`); - let i = 0; + const startTime = new Date().getTime(); for (const aid of videos) { - if (i > 200) { + const now = new Date().getTime(); + if (now - startTime > 4.2 * MINUTE) { await lockManager.releaseLock("classifyVideos"); - return 10000 + i; + return 1; } await ClassifyVideoQueue.add("classifyVideo", { aid: Number(aid) }); - i++; } await lockManager.releaseLock("classifyVideos"); return 0; diff --git a/packages/crawler/mq/exec/collectSongs.ts b/packages/crawler/mq/exec/collectSongs.ts index bd58179..b354c2b 100644 --- a/packages/crawler/mq/exec/collectSongs.ts +++ b/packages/crawler/mq/exec/collectSongs.ts @@ -1,7 +1,7 @@ import { Job } from "bullmq"; import { collectSongs } from "mq/task/collectSongs.ts"; -export const collectSongsWorker = async (_job: Job): Promise =>{ +export const collectSongsWorker = async (_job: Job): Promise => { await collectSongs(); return; -} \ No newline at end of file +}; diff --git a/packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts b/packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts index 2cbcd08..1ed8b26 100644 --- a/packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts +++ b/packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts @@ -16,8 +16,8 @@ export const dispatchMilestoneSnapshotsWorker = async (_job: Job) => { if (eta > 144) continue; const now = Date.now(); const scheduledNextSnapshotDelay = eta * HOUR; - const maxInterval = 1 * HOUR; - const minInterval = 1 * SECOND; + const maxInterval = 1.2 * HOUR; + const minInterval = 2 * SECOND; const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval); const targetTime = now + delay; await scheduleSnapshot(sql, aid, "milestone", targetTime); @@ -25,5 +25,5 @@ export const dispatchMilestoneSnapshotsWorker = async (_job: Job) => { } } catch (e) { logger.error(e as Error, "mq", "fn:dispatchMilestoneSnapshotsWorker"); - }; -} \ No newline at end of file + } +}; diff --git a/packages/crawler/mq/exec/dispatchRegularSnapshots.ts b/packages/crawler/mq/exec/dispatchRegularSnapshots.ts index 49a7893..9b47b03 100644 --- a/packages/crawler/mq/exec/dispatchRegularSnapshots.ts +++ b/packages/crawler/mq/exec/dispatchRegularSnapshots.ts @@ -1,7 +1,7 @@ import { Job } from "bullmq"; import { getLatestVideoSnapshot } from "db/snapshot.ts"; import { truncate } from "utils/truncate.ts"; -import { getVideosWithoutActiveSnapshotSchedule, scheduleSnapshot } from "db/snapshotSchedule.ts"; +import { getVideosWithoutActiveSnapshotScheduleByType, scheduleSnapshot } from "db/snapshotSchedule.ts"; import logger from "@core/log/logger.ts"; import { HOUR, MINUTE, WEEK } from "@core/const/time.ts"; import { lockManager } from "@core/mq/lockManager.ts"; @@ -17,7 +17,7 @@ export const dispatchRegularSnapshotsWorker = async (_job: Job): Promise = } await lockManager.acquireLock("dispatchRegularSnapshots", 30 * 60); - const aids = await getVideosWithoutActiveSnapshotSchedule(sql); + const aids = await getVideosWithoutActiveSnapshotScheduleByType(sql, "normal"); for (const rawAid of aids) { const aid = Number(rawAid); const latestSnapshot = await getLatestVideoSnapshot(sql, aid); diff --git a/packages/crawler/mq/exec/executors.ts b/packages/crawler/mq/exec/executors.ts index f7b9cf8..c2969ed 100644 --- a/packages/crawler/mq/exec/executors.ts +++ b/packages/crawler/mq/exec/executors.ts @@ -7,4 +7,4 @@ export * from "./dispatchMilestoneSnapshots.ts"; export * from "./dispatchRegularSnapshots.ts"; export * from "./snapshotVideo.ts"; export * from "./scheduleCleanup.ts"; -export * from "./snapshotTick.ts"; \ No newline at end of file +export * from "./snapshotTick.ts"; diff --git a/packages/crawler/mq/exec/getLatestVideos.ts b/packages/crawler/mq/exec/getLatestVideos.ts index b3d93f1..f2f4351 100644 --- a/packages/crawler/mq/exec/getLatestVideos.ts +++ b/packages/crawler/mq/exec/getLatestVideos.ts @@ -2,6 +2,6 @@ import { sql } from "@core/db/dbNew"; import { Job } from "bullmq"; import { queueLatestVideos } from "mq/task/queueLatestVideo.ts"; -export const getLatestVideosWorker = async (_job: Job): Promise =>{ +export const getLatestVideosWorker = async (_job: Job): Promise => { await queueLatestVideos(sql); -} +}; diff --git a/packages/crawler/mq/exec/getVideoInfo.ts b/packages/crawler/mq/exec/getVideoInfo.ts index ee96b2d..889bd7e 100644 --- a/packages/crawler/mq/exec/getVideoInfo.ts +++ b/packages/crawler/mq/exec/getVideoInfo.ts @@ -10,4 +10,4 @@ export const getVideoInfoWorker = async (job: Job): Promise => { return; } await insertVideoInfo(sql, aid); -} +}; diff --git a/packages/crawler/mq/exec/snapshotTick.ts b/packages/crawler/mq/exec/snapshotTick.ts index d599fca..ca596c8 100644 --- a/packages/crawler/mq/exec/snapshotTick.ts +++ b/packages/crawler/mq/exec/snapshotTick.ts @@ -5,15 +5,15 @@ import { getBulkSnapshotsInNextSecond, getSnapshotsInNextSecond, setSnapshotStatus, - videoHasProcessingSchedule, + videoHasProcessingSchedule } from "db/snapshotSchedule.ts"; import logger from "@core/log/logger.ts"; import { SnapshotQueue } from "mq/index.ts"; import { sql } from "@core/db/dbNew"; const priorityMap: { [key: string]: number } = { - "milestone": 1, - "normal": 3, + milestone: 1, + normal: 3 }; export const bulkSnapshotTickWorker = async (_job: Job) => { @@ -35,12 +35,16 @@ export const bulkSnapshotTickWorker = async (_job: Job) => { created_at: schedule.created_at, started_at: schedule.started_at, finished_at: schedule.finished_at, - status: schedule.status, + status: schedule.status }; }); - await SnapshotQueue.add("bulkSnapshotVideo", { - schedules: schedulesData, - }, { priority: 3 }); + await SnapshotQueue.add( + "bulkSnapshotVideo", + { + schedules: schedulesData + }, + { priority: 3 } + ); } return `OK`; } catch (e) { @@ -61,11 +65,15 @@ export const snapshotTickWorker = async (_job: Job) => { } const aid = Number(schedule.aid); await setSnapshotStatus(sql, schedule.id, "processing"); - await SnapshotQueue.add("snapshotVideo", { - aid: Number(aid), - id: Number(schedule.id), - type: schedule.type ?? "normal", - }, { priority }); + await SnapshotQueue.add( + "snapshotVideo", + { + aid: Number(aid), + id: Number(schedule.id), + type: schedule.type ?? "normal" + }, + { priority } + ); } return `OK`; } catch (e) { diff --git a/packages/crawler/mq/exec/snapshotVideo.ts b/packages/crawler/mq/exec/snapshotVideo.ts index 59f05db..8d8d241 100644 --- a/packages/crawler/mq/exec/snapshotVideo.ts +++ b/packages/crawler/mq/exec/snapshotVideo.ts @@ -2,7 +2,6 @@ import { Job } from "bullmq"; import { scheduleSnapshot, setSnapshotStatus, snapshotScheduleExists } from "db/snapshotSchedule.ts"; import logger from "@core/log/logger.ts"; import { HOUR, MINUTE, SECOND } from "@core/const/time.ts"; -import { lockManager } from "@core/mq/lockManager.ts"; import { getBiliVideoStatus, setBiliVideoStatus } from "../../db/bilibili_metadata.ts"; import { insertVideoSnapshot } from "mq/task/getVideoStats.ts"; import { getSongsPublihsedAt } from "db/songs.ts"; @@ -11,9 +10,9 @@ import { NetSchedulerError } from "@core/net/delegate.ts"; import { sql } from "@core/db/dbNew.ts"; const snapshotTypeToTaskMap: { [key: string]: string } = { - "milestone": "snapshotMilestoneVideo", - "normal": "snapshotVideo", - "new": "snapshotMilestoneVideo", + milestone: "snapshotMilestoneVideo", + normal: "snapshotVideo", + new: "snapshotMilestoneVideo" }; export const snapshotVideoWorker = async (job: Job): Promise => { @@ -32,7 +31,7 @@ export const snapshotVideoWorker = async (job: Job): Promise => { logger.warn( `Video ${aid} has status ${status} in the database. Abort snapshoting.`, "mq", - "fn:dispatchRegularSnapshotsWorker", + "fn:dispatchRegularSnapshotsWorker" ); return; } @@ -44,7 +43,7 @@ export const snapshotVideoWorker = async (job: Job): Promise => { logger.warn( `Bilibili return status ${status} when snapshoting for ${aid}.`, "mq", - "fn:dispatchRegularSnapshotsWorker", + "fn:dispatchRegularSnapshotsWorker" ); return; } @@ -52,7 +51,7 @@ export const snapshotVideoWorker = async (job: Job): Promise => { if (type === "new") { const publihsedAt = await getSongsPublihsedAt(sql, aid); const timeSincePublished = stat.time - publihsedAt!; - const viewsPerHour = stat.views / timeSincePublished * HOUR; + const viewsPerHour = (stat.views / timeSincePublished) * HOUR; if (timeSincePublished > 48 * HOUR) { return; } @@ -78,40 +77,31 @@ export const snapshotVideoWorker = async (job: Job): Promise => { logger.warn( `ETA (${etaHoursString}) too long for milestone snapshot. aid: ${aid}.`, "mq", - "fn:dispatchRegularSnapshotsWorker", + "fn:snapshotVideoWorker" ); + return; } const now = Date.now(); const targetTime = now + eta * HOUR; await scheduleSnapshot(sql, aid, type, targetTime); await setSnapshotStatus(sql, id, "completed"); return; - } - catch (e) { + } catch (e) { if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") { - logger.warn( - `No available proxy for aid ${job.data.aid}.`, - "mq", - "fn:takeSnapshotForVideoWorker", - ); + logger.warn(`No available proxy for aid ${job.data.aid}.`, "mq", "fn:snapshotVideoWorker"); await setSnapshotStatus(sql, id, "no_proxy"); await scheduleSnapshot(sql, aid, type, Date.now() + retryInterval); return; - } - else if (e instanceof NetSchedulerError && e.code === "ALICLOUD_PROXY_ERR") { + } else if (e instanceof NetSchedulerError && e.code === "ALICLOUD_PROXY_ERR") { logger.warn( `Failed to proxy request for aid ${job.data.aid}: ${e.message}`, "mq", - "fn:takeSnapshotForVideoWorker", + "fn:snapshotVideoWorker" ); await setSnapshotStatus(sql, id, "failed"); await scheduleSnapshot(sql, aid, type, Date.now() + retryInterval); } - logger.error(e as Error, "mq", "fn:takeSnapshotForVideoWorker"); + logger.error(e as Error, "mq", "fn:snapshotVideoWorker"); await setSnapshotStatus(sql, id, "failed"); } - finally { - await lockManager.releaseLock("dispatchRegularSnapshots"); - }; - return; }; diff --git a/packages/crawler/mq/exec/takeBulkSnapshot.ts b/packages/crawler/mq/exec/takeBulkSnapshot.ts index a71add1..afaf239 100644 --- a/packages/crawler/mq/exec/takeBulkSnapshot.ts +++ b/packages/crawler/mq/exec/takeBulkSnapshot.ts @@ -3,7 +3,7 @@ import { bulkScheduleSnapshot, bulkSetSnapshotStatus, scheduleSnapshot, - snapshotScheduleExists, + snapshotScheduleExists } from "db/snapshotSchedule.ts"; import { bulkGetVideoStats } from "net/bulkGetVideoStats.ts"; import logger from "@core/log/logger.ts"; @@ -55,7 +55,7 @@ export const takeBulkSnapshotForVideosWorker = async (job: Job) => { ${shares}, ${favorites} ) - ` + `; logger.log(`Taken snapshot for video ${aid} in bulk.`, "net", "fn:takeBulkSnapshotForVideosWorker"); } @@ -72,11 +72,7 @@ export const takeBulkSnapshotForVideosWorker = async (job: Job) => { return `DONE`; } catch (e) { if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") { - logger.warn( - `No available proxy for bulk request now.`, - "mq", - "fn:takeBulkSnapshotForVideosWorker", - ); + logger.warn(`No available proxy for bulk request now.`, "mq", "fn:takeBulkSnapshotForVideosWorker"); await bulkSetSnapshotStatus(sql, ids, "no_proxy"); await bulkScheduleSnapshot(sql, aidsToFetch, "normal", Date.now() + 20 * MINUTE * Math.random()); return; diff --git a/packages/crawler/mq/index.ts b/packages/crawler/mq/index.ts index 62d5d86..0644d05 100644 --- a/packages/crawler/mq/index.ts +++ b/packages/crawler/mq/index.ts @@ -2,13 +2,13 @@ import { Queue, ConnectionOptions } from "bullmq"; import { redis } from "@core/db/redis.ts"; export const LatestVideosQueue = new Queue("latestVideos", { - connection: redis as ConnectionOptions + connection: redis as ConnectionOptions }); export const ClassifyVideoQueue = new Queue("classifyVideo", { - connection: redis as ConnectionOptions + connection: redis as ConnectionOptions }); export const SnapshotQueue = new Queue("snapshot", { - connection: redis as ConnectionOptions + connection: redis as ConnectionOptions }); diff --git a/packages/crawler/mq/init.ts b/packages/crawler/mq/init.ts index f646c08..e6c5e52 100644 --- a/packages/crawler/mq/init.ts +++ b/packages/crawler/mq/init.ts @@ -62,8 +62,8 @@ export async function initMQ() { }); await SnapshotQueue.upsertJobScheduler("dispatchArchiveSnapshots", { - every: 6 * HOUR, - immediately: true + every: 2 * HOUR, + immediately: false }); await SnapshotQueue.upsertJobScheduler("scheduleCleanup", { diff --git a/packages/crawler/mq/scheduling.ts b/packages/crawler/mq/scheduling.ts index 3d25515..dcc8ad3 100644 --- a/packages/crawler/mq/scheduling.ts +++ b/packages/crawler/mq/scheduling.ts @@ -33,7 +33,7 @@ export const getAdjustedShortTermETA = async (sql: Psql, aid: number) => { if (!snapshotsEnough) return 0; const currentTimestamp = new Date().getTime(); - const timeIntervals = [3 * MINUTE, 20 * MINUTE, 1 * HOUR, 3 * HOUR, 6 * HOUR, 72 * HOUR]; + const timeIntervals = [3 * MINUTE, 20 * MINUTE, HOUR, 3 * HOUR, 6 * HOUR, 72 * HOUR]; const DELTA = 0.00001; let minETAHours = Infinity; diff --git a/packages/crawler/mq/task/collectSongs.ts b/packages/crawler/mq/task/collectSongs.ts index dcf5472..af3178e 100644 --- a/packages/crawler/mq/task/collectSongs.ts +++ b/packages/crawler/mq/task/collectSongs.ts @@ -25,5 +25,5 @@ export async function insertIntoSongs(sql: Psql, aid: number) { (SELECT duration FROM bilibili_metadata WHERE aid = ${aid}) ) ON CONFLICT DO NOTHING - ` + `; } diff --git a/packages/crawler/mq/task/getVideoDetails.ts b/packages/crawler/mq/task/getVideoDetails.ts index 1fc618a..cd67994 100644 --- a/packages/crawler/mq/task/getVideoDetails.ts +++ b/packages/crawler/mq/task/getVideoDetails.ts @@ -18,9 +18,9 @@ export async function insertVideoInfo(sql: Psql, aid: number) { const bvid = data.View.bvid; const desc = data.View.desc; const uid = data.View.owner.mid; - const tags = data.Tags - .filter((tag) => !["old_channel", "topic"].indexOf(tag.tag_type)) - .map((tag) => tag.tag_name).join(","); + const tags = data.Tags.filter((tag) => !["old_channel", "topic"].indexOf(tag.tag_type)) + .map((tag) => tag.tag_name) + .join(","); const title = data.View.title; const published_at = formatTimestampToPsql(data.View.pubdate * SECOND + 8 * HOUR); const duration = data.View.duration; @@ -55,7 +55,7 @@ export async function insertVideoInfo(sql: Psql, aid: number) { ${stat.share}, ${stat.favorite} ) - ` + `; logger.log(`Inserted video metadata for aid: ${aid}`, "mq"); await ClassifyVideoQueue.add("classifyVideo", { aid }); diff --git a/packages/crawler/mq/task/getVideoStats.ts b/packages/crawler/mq/task/getVideoStats.ts index ffec09f..afd5b72 100644 --- a/packages/crawler/mq/task/getVideoStats.ts +++ b/packages/crawler/mq/task/getVideoStats.ts @@ -24,11 +24,7 @@ export interface SnapshotNumber { * - The native `fetch` function threw an error: with error code `FETCH_ERROR` * - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR` */ -export async function insertVideoSnapshot( - sql: Psql, - aid: number, - task: string, -): Promise { +export async function insertVideoSnapshot(sql: Psql, aid: number, task: string): Promise { const data = await getVideoInfo(aid, task); if (typeof data == "number") { return data; @@ -42,10 +38,10 @@ export async function insertVideoSnapshot( const shares = data.stat.share; const favorites = data.stat.favorite; - await sql` + await sql` INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites) VALUES (${aid}, ${views}, ${danmakus}, ${replies}, ${likes}, ${coins}, ${shares}, ${favorites}) - ` + `; logger.log(`Taken snapshot for video ${aid}.`, "net", "fn:insertVideoSnapshot"); @@ -58,6 +54,6 @@ export async function insertVideoSnapshot( coins, shares, favorites, - time, + time }; } diff --git a/packages/crawler/mq/task/queueLatestVideo.ts b/packages/crawler/mq/task/queueLatestVideo.ts index 9f583c2..193787a 100644 --- a/packages/crawler/mq/task/queueLatestVideo.ts +++ b/packages/crawler/mq/task/queueLatestVideo.ts @@ -6,9 +6,7 @@ import logger from "@core/log/logger.ts"; import { LatestVideosQueue } from "mq/index.ts"; import type { Psql } from "@core/db/psql.d.ts"; -export async function queueLatestVideos( - sql: Psql, -): Promise { +export async function queueLatestVideos(sql: Psql): Promise { let page = 1; let i = 0; const videosFound = new Set(); @@ -26,14 +24,18 @@ export async function queueLatestVideos( if (videoExists) { continue; } - await LatestVideosQueue.add("getVideoInfo", { aid }, { - delay, - attempts: 100, - backoff: { - type: "fixed", - delay: SECOND * 5, - }, - }); + await LatestVideosQueue.add( + "getVideoInfo", + { aid }, + { + delay, + attempts: 100, + backoff: { + type: "fixed", + delay: SECOND * 5 + } + } + ); videosFound.add(aid); allExists = false; delay += Math.random() * SECOND * 1.5; @@ -42,7 +44,7 @@ export async function queueLatestVideos( logger.log( `Page ${page} crawled, total: ${videosFound.size}/${i} videos added/observed.`, "net", - "fn:queueLatestVideos()", + "fn:queueLatestVideos()" ); if (allExists) { return 0; diff --git a/packages/crawler/mq/task/regularSnapshotInterval.ts b/packages/crawler/mq/task/regularSnapshotInterval.ts index e7db224..306865a 100644 --- a/packages/crawler/mq/task/regularSnapshotInterval.ts +++ b/packages/crawler/mq/task/regularSnapshotInterval.ts @@ -1,6 +1,6 @@ import { findClosestSnapshot, findSnapshotBefore, getLatestSnapshot } from "db/snapshotSchedule.ts"; import { HOUR } from "@core/const/time.ts"; -import type { Psql } from "@core/db/psql"; +import type { Psql } from "@core/db/psql.d.ts"; export const getRegularSnapshotInterval = async (sql: Psql, aid: number) => { const now = Date.now(); diff --git a/packages/crawler/mq/task/removeAllTimeoutSchedules.ts b/packages/crawler/mq/task/removeAllTimeoutSchedules.ts index ccca69a..325f08b 100644 --- a/packages/crawler/mq/task/removeAllTimeoutSchedules.ts +++ b/packages/crawler/mq/task/removeAllTimeoutSchedules.ts @@ -2,14 +2,10 @@ import { sql } from "@core/db/dbNew"; import logger from "@core/log/logger.ts"; export async function removeAllTimeoutSchedules() { - logger.log( - "Too many timeout schedules, directly removing these schedules...", - "mq", - "fn:scheduleCleanupWorker", - ); + logger.log("Too many timeout schedules, directly removing these schedules...", "mq", "fn:scheduleCleanupWorker"); return await sql` DELETE FROM snapshot_schedule WHERE status IN ('pending', 'processing') AND started_at < NOW() - INTERVAL '30 minutes' `; -} \ No newline at end of file +} diff --git a/packages/crawler/package.json b/packages/crawler/package.json index 82650c1..dfdfcd1 100644 --- a/packages/crawler/package.json +++ b/packages/crawler/package.json @@ -7,7 +7,8 @@ "worker:filter": "bun run ./build/filterWorker.js", "adder": "bun run ./src/jobAdder.ts", "bullui": "bun run ./src/bullui.ts", - "all": "bun run concurrently --restart-tries -1 'bun run worker:main' 'bun run adder' 'bun run bullui' 'bun run worker:filter'" + "all": "bun run concurrently --restart-tries -1 'bun run worker:main' 'bun run adder' 'bun run worker:filter'", + "format": "prettier --write ." }, "devDependencies": { "concurrently": "^9.1.2" diff --git a/packages/crawler/src/build.ts b/packages/crawler/src/build.ts index 942672c..42b7210 100644 --- a/packages/crawler/src/build.ts +++ b/packages/crawler/src/build.ts @@ -3,13 +3,12 @@ import Bun from "bun"; await Bun.build({ entrypoints: ["./src/filterWorker.ts"], outdir: "./build", - target: "node" + target: "node" }); - const file = Bun.file("./build/filterWorker.js"); const code = await file.text(); const modifiedCode = code.replaceAll("../bin/napi-v3/", "../../../node_modules/onnxruntime-node/bin/napi-v3/"); -await Bun.write("./build/filterWorker.js", modifiedCode); \ No newline at end of file +await Bun.write("./build/filterWorker.js", modifiedCode); diff --git a/packages/crawler/src/bullui.ts b/packages/crawler/src/bullui.ts index 5765540..8f2dcc7 100644 --- a/packages/crawler/src/bullui.ts +++ b/packages/crawler/src/bullui.ts @@ -11,9 +11,9 @@ createBullBoard({ queues: [ new BullMQAdapter(LatestVideosQueue), new BullMQAdapter(ClassifyVideoQueue), - new BullMQAdapter(SnapshotQueue), + new BullMQAdapter(SnapshotQueue) ], - serverAdapter: serverAdapter, + serverAdapter: serverAdapter }); const app = express(); diff --git a/packages/crawler/src/filterWorker.ts b/packages/crawler/src/filterWorker.ts index ab85cba..c740336 100644 --- a/packages/crawler/src/filterWorker.ts +++ b/packages/crawler/src/filterWorker.ts @@ -7,13 +7,13 @@ import { lockManager } from "@core/mq/lockManager.ts"; import Akari from "ml/akari.ts"; const shutdown = async (signal: string) => { - logger.log(`${signal} Received: Shutting down workers...`, "mq"); - await filterWorker.close(true); - process.exit(0); + logger.log(`${signal} Received: Shutting down workers...`, "mq"); + await filterWorker.close(true); + process.exit(0); }; -process.on('SIGINT', () => shutdown('SIGINT')); -process.on('SIGTERM', () => shutdown('SIGTERM')); +process.on("SIGINT", () => shutdown("SIGINT")); +process.on("SIGTERM", () => shutdown("SIGTERM")); await Akari.init(); @@ -29,7 +29,7 @@ const filterWorker = new Worker( break; } }, - { connection: redis as ConnectionOptions, concurrency: 2, removeOnComplete: { count: 1000 } }, + { connection: redis as ConnectionOptions, concurrency: 2, removeOnComplete: { count: 1000 } } ); filterWorker.on("active", () => { diff --git a/packages/crawler/src/worker.ts b/packages/crawler/src/worker.ts index 534f488..418c343 100644 --- a/packages/crawler/src/worker.ts +++ b/packages/crawler/src/worker.ts @@ -10,7 +10,7 @@ import { scheduleCleanupWorker, snapshotTickWorker, snapshotVideoWorker, - takeBulkSnapshotForVideosWorker, + takeBulkSnapshotForVideosWorker } from "mq/exec/executors.ts"; import { redis } from "@core/db/redis.ts"; import logger from "@core/log/logger.ts"; @@ -30,15 +30,15 @@ const releaseAllLocks = async () => { }; const shutdown = async (signal: string) => { - logger.log(`${signal} Received: Shutting down workers...`, "mq"); - await releaseAllLocks(); + logger.log(`${signal} Received: Shutting down workers...`, "mq"); + await releaseAllLocks(); await latestVideoWorker.close(true); await snapshotWorker.close(true); - process.exit(0); + process.exit(0); }; -process.on('SIGINT', () => shutdown('SIGINT')); -process.on('SIGTERM', () => shutdown('SIGTERM')); +process.on("SIGINT", () => shutdown("SIGINT")); +process.on("SIGTERM", () => shutdown("SIGTERM")); const latestVideoWorker = new Worker( "latestVideos", @@ -58,8 +58,8 @@ const latestVideoWorker = new Worker( connection: redis as ConnectionOptions, concurrency: 6, removeOnComplete: { count: 1440 }, - removeOnFail: { count: 0 }, - }, + removeOnFail: { count: 0 } + } ); latestVideoWorker.on("active", () => { @@ -95,7 +95,7 @@ const snapshotWorker = new Worker( break; } }, - { connection: redis as ConnectionOptions, concurrency: 50, removeOnComplete: { count: 2000 } }, + { connection: redis as ConnectionOptions, concurrency: 50, removeOnComplete: { count: 2000 } } ); snapshotWorker.on("error", (err) => { diff --git a/packages/crawler/test/db/snapshotSchedule.test.ts b/packages/crawler/test/db/snapshotSchedule.test.ts index b0ff0c1..28de210 100644 --- a/packages/crawler/test/db/snapshotSchedule.test.ts +++ b/packages/crawler/test/db/snapshotSchedule.test.ts @@ -56,65 +56,65 @@ const databasePreparationQuery = ` CREATE INDEX idx_snapshot_schedule_status ON snapshot_schedule USING btree (status); CREATE INDEX idx_snapshot_schedule_type ON snapshot_schedule USING btree (type); CREATE UNIQUE INDEX snapshot_schedule_pkey ON snapshot_schedule USING btree (id); -` +`; const cleanUpQuery = ` DROP SEQUENCE IF EXISTS "snapshot_schedule_id_seq" CASCADE; DROP TABLE IF EXISTS "snapshot_schedule" CASCADE; -` +`; async function testMocking() { - await sql.begin(async tx => { - await tx.unsafe(cleanUpQuery).simple(); + await sql.begin(async (tx) => { + await tx.unsafe(cleanUpQuery).simple(); await tx.unsafe(databasePreparationQuery).simple(); - - await tx` + + await tx` INSERT INTO snapshot_schedule - ${sql(mockSnapshotSchedules, 'aid', 'created_at', 'finished_at', 'id', 'started_at', 'status', 'type')} + ${sql(mockSnapshotSchedules, "aid", "created_at", "finished_at", "id", "started_at", "status", "type")} `; - await tx` + await tx` ROLLBACK; - ` + `; - await tx.unsafe(cleanUpQuery).simple(); - return; + await tx.unsafe(cleanUpQuery).simple(); + return; }); } async function testBulkSetSnapshotStatus() { - return await sql.begin(async tx => { - await tx.unsafe(cleanUpQuery).simple(); - await tx.unsafe(databasePreparationQuery).simple(); + return await sql.begin(async (tx) => { + await tx.unsafe(cleanUpQuery).simple(); + await tx.unsafe(databasePreparationQuery).simple(); - await tx` + await tx` INSERT INTO snapshot_schedule - ${sql(mockSnapshotSchedules, 'aid', 'created_at', 'finished_at', 'id', 'started_at', 'status', 'type')} + ${sql(mockSnapshotSchedules, "aid", "created_at", "finished_at", "id", "started_at", "status", "type")} `; const ids = [1, 2, 3]; - - await bulkSetSnapshotStatus(tx, ids, 'pending') - const rows = tx<{status: string}[]>` + await bulkSetSnapshotStatus(tx, ids, "pending"); + + const rows = tx<{ status: string }[]>` SELECT status FROM snapshot_schedule WHERE id = 1; `.execute(); - await tx` + await tx` ROLLBACK; - ` + `; - await tx.unsafe(cleanUpQuery).simple(); - return rows; + await tx.unsafe(cleanUpQuery).simple(); + return rows; }); } test("data mocking works", async () => { - await testMocking(); + await testMocking(); expect(() => {}).not.toThrowError(); }); test("bulkSetSnapshotStatus core logic works smoothly", async () => { - const rows = await testBulkSetSnapshotStatus(); - expect(rows.every(item => item.status === 'pending')).toBe(true); + const rows = await testBulkSetSnapshotStatus(); + expect(rows.every((item) => item.status === "pending")).toBe(true); }); diff --git a/packages/crawler/utils/formatTimestampToPostgre.ts b/packages/crawler/utils/formatTimestampToPostgre.ts index 8dc37b2..fa444b5 100644 --- a/packages/crawler/utils/formatTimestampToPostgre.ts +++ b/packages/crawler/utils/formatTimestampToPostgre.ts @@ -1,6 +1,6 @@ export function formatTimestampToPsql(timestamp: number) { const date = new Date(timestamp); - return date.toISOString().slice(0, 23).replace("T", " "); + return date.toISOString().slice(0, 23).replace("T", " ") + "+08"; } export function parseTimestampFromPsql(timestamp: string) { diff --git a/packages/crawler/vitest.config.ts b/packages/crawler/vitest.config.ts index fb5072b..63af38b 100644 --- a/packages/crawler/vitest.config.ts +++ b/packages/crawler/vitest.config.ts @@ -1,6 +1,6 @@ import { defineConfig } from "vitest/config"; -import tsconfigPaths from 'vite-tsconfig-paths' +import tsconfigPaths from "vite-tsconfig-paths"; export default defineConfig({ - plugins: [tsconfigPaths()] + plugins: [tsconfigPaths()] });