From e0a19499e1e9b497b945f46ff98be593799bafe2 Mon Sep 17 00:00:00 2001 From: alikia2x Date: Mon, 14 Apr 2025 05:20:35 +0800 Subject: [PATCH] ref: move some worker functions into individual files --- .../mq/exec/dispatchMilestoneSnapshots.ts | 29 ++++++++++ .../mq/exec/dispatchRegularSnapshots.ts | 39 +++++++++++++ packages/crawler/mq/exec/executors.ts | 2 + packages/crawler/mq/exec/snapshotTick.ts | 56 ------------------- packages/crawler/mq/init.ts | 2 +- packages/crawler/src/worker.ts | 10 ++-- 6 files changed, 76 insertions(+), 62 deletions(-) create mode 100644 packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts create mode 100644 packages/crawler/mq/exec/dispatchRegularSnapshots.ts diff --git a/packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts b/packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts new file mode 100644 index 0000000..3be7d0e --- /dev/null +++ b/packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts @@ -0,0 +1,29 @@ +import { Job } from "npm:bullmq@5.45.2"; +import { withDbConnection } from "db/withConnection.ts"; +import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; +import { getVideosNearMilestone } from "db/snapshot.ts"; +import { getAdjustedShortTermETA } from "mq/scheduling.ts"; +import { truncate } from "utils/truncate.ts"; +import { scheduleSnapshot } from "db/snapshotSchedule.ts"; +import logger from "log/logger.ts"; +import { HOUR, MINUTE, SECOND } from "@std/datetime"; + +export const dispatchMilestoneSnapshotsWorker = (_job: Job): Promise => + withDbConnection(async (client: Client) => { + const videos = await getVideosNearMilestone(client); + for (const video of videos) { + const aid = Number(video.aid); + const eta = await getAdjustedShortTermETA(client, aid); + if (eta > 144) continue; + const now = Date.now(); + const scheduledNextSnapshotDelay = eta * HOUR; + const maxInterval = 4 * HOUR; + const minInterval = 1 * SECOND; + const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval); + const targetTime = now + delay; + await scheduleSnapshot(client, aid, "milestone", targetTime); + logger.log(`Scheduled milestone snapshot for aid ${aid} in ${(delay / MINUTE).toFixed(2)} mins.`, "mq"); + } + }, (e) => { + logger.error(e as Error, "mq", "fn:dispatchMilestoneSnapshotsWorker"); + }); diff --git a/packages/crawler/mq/exec/dispatchRegularSnapshots.ts b/packages/crawler/mq/exec/dispatchRegularSnapshots.ts new file mode 100644 index 0000000..7d7f1c2 --- /dev/null +++ b/packages/crawler/mq/exec/dispatchRegularSnapshots.ts @@ -0,0 +1,39 @@ +import { Job } from "npm:bullmq@5.45.2"; +import { withDbConnection } from "db/withConnection.ts"; +import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; +import { getLatestVideoSnapshot } from "db/snapshot.ts"; +import { truncate } from "utils/truncate.ts"; +import { getVideosWithoutActiveSnapshotSchedule, scheduleSnapshot } from "db/snapshotSchedule.ts"; +import logger from "log/logger.ts"; +import { HOUR, MINUTE, WEEK } from "@std/datetime"; +import { lockManager } from "../lockManager.ts"; +import { getRegularSnapshotInterval } from "../task/regularSnapshotInterval.ts"; + +export const dispatchRegularSnapshotsWorker = (_job: Job): Promise => + withDbConnection(async (client: Client) => { + const startedAt = Date.now(); + if (await lockManager.isLocked("dispatchRegularSnapshots")) { + logger.log("dispatchRegularSnapshots is already running", "mq"); + return; + } + await lockManager.acquireLock("dispatchRegularSnapshots", 30 * 60); + + const aids = await getVideosWithoutActiveSnapshotSchedule(client); + for (const rawAid of aids) { + const aid = Number(rawAid); + const latestSnapshot = await getLatestVideoSnapshot(client, aid); + const now = Date.now(); + const lastSnapshotedAt = latestSnapshot?.time ?? now; + const interval = await getRegularSnapshotInterval(client, aid); + logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq"); + const targetTime = truncate(lastSnapshotedAt + interval * HOUR, now + 1, now + 100000 * WEEK); + await scheduleSnapshot(client, aid, "normal", targetTime); + if (now - startedAt > 25 * MINUTE) { + return; + } + } + }, (e) => { + logger.error(e as Error, "mq", "fn:regularSnapshotsWorker"); + }, async () => { + await lockManager.releaseLock("dispatchRegularSnapshots"); + }); diff --git a/packages/crawler/mq/exec/executors.ts b/packages/crawler/mq/exec/executors.ts index 70d4191..63b353a 100644 --- a/packages/crawler/mq/exec/executors.ts +++ b/packages/crawler/mq/exec/executors.ts @@ -3,3 +3,5 @@ export * from "./getVideoInfo.ts"; export * from "./collectSongs.ts"; export * from "./takeBulkSnapshot.ts"; export * from "./archiveSnapshots.ts"; +export * from "./dispatchMilestoneSnapshots.ts"; +export * from "./dispatchRegularSnapshots.ts"; \ No newline at end of file diff --git a/packages/crawler/mq/exec/snapshotTick.ts b/packages/crawler/mq/exec/snapshotTick.ts index a3b413d..d7f774b 100644 --- a/packages/crawler/mq/exec/snapshotTick.ts +++ b/packages/crawler/mq/exec/snapshotTick.ts @@ -104,62 +104,6 @@ export const closetMilestone = (views: number) => { return 10000000; }; -export const collectMilestoneSnapshotsWorker = async (_job: Job) => { - const client = await db.connect(); - try { - const videos = await getVideosNearMilestone(client); - for (const video of videos) { - const aid = Number(video.aid); - const eta = await getAdjustedShortTermETA(client, aid); - if (eta > 144) continue; - const now = Date.now(); - const scheduledNextSnapshotDelay = eta * HOUR; - const maxInterval = 4 * HOUR; - const minInterval = 1 * SECOND; - const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval); - const targetTime = now + delay; - await scheduleSnapshot(client, aid, "milestone", targetTime); - logger.log(`Scheduled milestone snapshot for aid ${aid} in ${(delay / MINUTE).toFixed(2)} mins.`, "mq"); - } - } catch (e) { - logger.error(e as Error, "mq", "fn:collectMilestoneSnapshotsWorker"); - } finally { - client.release(); - } -}; - -export const regularSnapshotsWorker = async (_job: Job) => { - const client = await db.connect(); - const startedAt = Date.now(); - if (await lockManager.isLocked("dispatchRegularSnapshots")) { - logger.log("dispatchRegularSnapshots is already running", "mq"); - client.release(); - return; - } - await lockManager.acquireLock("dispatchRegularSnapshots", 30 * 60); - try { - const aids = await getVideosWithoutActiveSnapshotSchedule(client); - for (const rawAid of aids) { - const aid = Number(rawAid); - const latestSnapshot = await getLatestVideoSnapshot(client, aid); - const now = Date.now(); - const lastSnapshotedAt = latestSnapshot?.time ?? now; - const interval = await getRegularSnapshotInterval(client, aid); - logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq"); - const targetTime = truncate(lastSnapshotedAt + interval * HOUR, now + 1, now + 100000 * WEEK); - await scheduleSnapshot(client, aid, "normal", targetTime); - if (now - startedAt > 25 * MINUTE) { - return; - } - } - } catch (e) { - logger.error(e as Error, "mq", "fn:regularSnapshotsWorker"); - } finally { - await lockManager.releaseLock("dispatchRegularSnapshots"); - client.release(); - } -}; - export const takeSnapshotForVideoWorker = async (job: Job) => { const id = job.data.id; const aid = Number(job.data.aid); diff --git a/packages/crawler/mq/init.ts b/packages/crawler/mq/init.ts index 18703b7..484f60b 100644 --- a/packages/crawler/mq/init.ts +++ b/packages/crawler/mq/init.ts @@ -45,7 +45,7 @@ export async function initMQ() { }, }); - await SnapshotQueue.upsertJobScheduler("collectMilestoneSnapshots", { + await SnapshotQueue.upsertJobScheduler("dispatchMilestoneSnapshots", { every: 5 * MINUTE, immediately: true, }); diff --git a/packages/crawler/src/worker.ts b/packages/crawler/src/worker.ts index 51a84e1..df3d4fc 100644 --- a/packages/crawler/src/worker.ts +++ b/packages/crawler/src/worker.ts @@ -5,6 +5,8 @@ import { getLatestVideosWorker, getVideoInfoWorker, takeBulkSnapshotForVideosWorker, + dispatchMilestoneSnapshotsWorker, + dispatchRegularSnapshotsWorker } from "mq/exec/executors.ts"; import { redis } from "@core/db/redis.ts"; import logger from "log/logger.ts"; @@ -12,8 +14,6 @@ import { lockManager } from "mq/lockManager.ts"; import { WorkerError } from "mq/schema.ts"; import { bulkSnapshotTickWorker, - collectMilestoneSnapshotsWorker, - regularSnapshotsWorker, scheduleCleanupWorker, snapshotTickWorker, takeSnapshotForVideoWorker, @@ -86,10 +86,10 @@ const snapshotWorker = new Worker( return await takeSnapshotForVideoWorker(job); case "snapshotTick": return await snapshotTickWorker(job); - case "collectMilestoneSnapshots": - return await collectMilestoneSnapshotsWorker(job); + case "dispatchMilestoneSnapshots": + return await dispatchMilestoneSnapshotsWorker(job); case "dispatchRegularSnapshots": - return await regularSnapshotsWorker(job); + return await dispatchRegularSnapshotsWorker(job); case "scheduleCleanup": return await scheduleCleanupWorker(job); case "bulkSnapshotVideo":