From 7689e687ff01714c4a959999895d756aa4830e5b Mon Sep 17 00:00:00 2001 From: alikia2x Date: Tue, 15 Apr 2025 03:50:03 +0800 Subject: [PATCH] ref: extract scheduleCleanup into individual file improve: logic of scheduleCleanup --- packages/crawler/mq/exec/executors.ts | 4 +- packages/crawler/mq/exec/scheduleCleanup.ts | 45 +++++++++++++++++++ packages/crawler/mq/exec/snapshotTick.ts | 32 ------------- packages/crawler/mq/init.ts | 2 +- .../mq/task/getTimeoutSchedulesCount.ts | 13 ++++++ .../mq/task/removeAllTimeoutSchedules.ts | 16 +++++++ packages/crawler/src/worker.ts | 4 +- 7 files changed, 81 insertions(+), 35 deletions(-) create mode 100644 packages/crawler/mq/exec/scheduleCleanup.ts create mode 100644 packages/crawler/mq/task/getTimeoutSchedulesCount.ts create mode 100644 packages/crawler/mq/task/removeAllTimeoutSchedules.ts diff --git a/packages/crawler/mq/exec/executors.ts b/packages/crawler/mq/exec/executors.ts index 3aab9f2..f7b9cf8 100644 --- a/packages/crawler/mq/exec/executors.ts +++ b/packages/crawler/mq/exec/executors.ts @@ -5,4 +5,6 @@ export * from "./takeBulkSnapshot.ts"; export * from "./archiveSnapshots.ts"; export * from "./dispatchMilestoneSnapshots.ts"; export * from "./dispatchRegularSnapshots.ts"; -export * from "./snapshotVideo.ts"; \ No newline at end of file +export * from "./snapshotVideo.ts"; +export * from "./scheduleCleanup.ts"; +export * from "./snapshotTick.ts"; \ No newline at end of file diff --git a/packages/crawler/mq/exec/scheduleCleanup.ts b/packages/crawler/mq/exec/scheduleCleanup.ts new file mode 100644 index 0000000..920458e --- /dev/null +++ b/packages/crawler/mq/exec/scheduleCleanup.ts @@ -0,0 +1,45 @@ +import { Job } from "npm:bullmq@5.45.2"; +import { withDbConnection } from "db/withConnection.ts"; +import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; +import logger from "log/logger.ts"; +import { scheduleSnapshot, setSnapshotStatus } from "db/snapshotSchedule.ts"; +import { SECOND } from "@std/datetime"; +import { getTimeoutSchedulesCount } from "mq/task/getTimeoutSchedulesCount.ts"; +import { removeAllTimeoutSchedules } from "mq/task/removeAllTimeoutSchedules.ts"; + +export const scheduleCleanupWorker = async (_job: Job): Promise => + await withDbConnection(async (client: Client) => { + if (await getTimeoutSchedulesCount(client) > 2000) { + await removeAllTimeoutSchedules(client); + return; + } + + const query: string = ` + SELECT id, aid, type + FROM snapshot_schedule + WHERE status IN ('pending', 'processing') + AND started_at < NOW() - INTERVAL '30 minutes' + UNION + SELECT id, aid, type + FROM snapshot_schedule + WHERE status IN ('pending', 'processing') + AND started_at < NOW() - INTERVAL '2 minutes' + AND type = 'milestone' + `; + const { rows } = await client.queryObject<{ id: bigint; aid: bigint; type: string }>(query); + if (rows.length === 0) return; + for (const row of rows) { + const id = Number(row.id); + const aid = Number(row.aid); + const type = row.type; + await setSnapshotStatus(client, id, "timeout"); + await scheduleSnapshot(client, aid, type, Date.now() + 10 * SECOND); + logger.log( + `Schedule ${id} has not received any response in a while, rescheduled.`, + "mq", + "fn:scheduleCleanupWorker", + ); + } + }, (e) => { + logger.error(e as Error, "mq", "fn:scheduleCleanupWorker"); + }); diff --git a/packages/crawler/mq/exec/snapshotTick.ts b/packages/crawler/mq/exec/snapshotTick.ts index ef6fa07..74b7598 100644 --- a/packages/crawler/mq/exec/snapshotTick.ts +++ b/packages/crawler/mq/exec/snapshotTick.ts @@ -5,11 +5,9 @@ import { bulkSetSnapshotStatus, getBulkSnapshotsInNextSecond, getSnapshotsInNextSecond, - scheduleSnapshot, setSnapshotStatus, videoHasProcessingSchedule, } from "db/snapshotSchedule.ts"; -import { SECOND } from "@std/datetime"; import logger from "log/logger.ts"; import { SnapshotQueue } from "mq/index.ts"; @@ -86,33 +84,3 @@ export const closetMilestone = (views: number) => { if (views < 1000000) return 1000000; return 10000000; }; - -export const scheduleCleanupWorker = async (_job: Job) => { - const client = await db.connect(); - try { - const query = ` - SELECT id, aid, type - FROM snapshot_schedule - WHERE status IN ('pending', 'processing') - AND started_at < NOW() - INTERVAL '30 minutes' - `; - const { rows } = await client.queryObject<{ id: bigint; aid: bigint; type: string }>(query); - if (rows.length === 0) return; - for (const row of rows) { - const id = Number(row.id); - const aid = Number(row.aid); - const type = row.type; - await setSnapshotStatus(client, id, "timeout"); - await scheduleSnapshot(client, aid, type, Date.now() + 10 * SECOND); - logger.log( - `Schedule ${id} has no response received for 5 minutes, rescheduled.`, - "mq", - "fn:scheduleCleanupWorker", - ); - } - } catch (e) { - logger.error(e as Error, "mq", "fn:scheduleCleanupWorker"); - } finally { - client.release(); - } -}; diff --git a/packages/crawler/mq/init.ts b/packages/crawler/mq/init.ts index 484f60b..518afe4 100644 --- a/packages/crawler/mq/init.ts +++ b/packages/crawler/mq/init.ts @@ -61,7 +61,7 @@ export async function initMQ() { }); await SnapshotQueue.upsertJobScheduler("scheduleCleanup", { - every: 30 * MINUTE, + every: 2 * MINUTE, immediately: true, }); diff --git a/packages/crawler/mq/task/getTimeoutSchedulesCount.ts b/packages/crawler/mq/task/getTimeoutSchedulesCount.ts new file mode 100644 index 0000000..4e7bb14 --- /dev/null +++ b/packages/crawler/mq/task/getTimeoutSchedulesCount.ts @@ -0,0 +1,13 @@ +import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; + +export async function getTimeoutSchedulesCount(client: Client) { + const query: string = ` + SELECT COUNT(id) + FROM snapshot_schedule + WHERE status IN ('pending', 'processing') + AND started_at < NOW() - INTERVAL '30 minutes' + `; + + const { rows } = await client.queryObject<{ count: number }>(query); + return rows[0].count; +} diff --git a/packages/crawler/mq/task/removeAllTimeoutSchedules.ts b/packages/crawler/mq/task/removeAllTimeoutSchedules.ts new file mode 100644 index 0000000..bb8c382 --- /dev/null +++ b/packages/crawler/mq/task/removeAllTimeoutSchedules.ts @@ -0,0 +1,16 @@ +import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; +import logger from "log/logger.ts"; + +export async function removeAllTimeoutSchedules(client: Client) { + logger.log( + "Too many timeout schedules, directly removing these schedules...", + "mq", + "fn:scheduleCleanupWorker", + ); + const query: string = ` + DELETE FROM snapshot_schedule + WHERE status IN ('pending', 'processing') + AND started_at < NOW() - INTERVAL '30 minutes' + `; + await client.queryObject(query); +} \ No newline at end of file diff --git a/packages/crawler/src/worker.ts b/packages/crawler/src/worker.ts index 84023fe..0f0c164 100644 --- a/packages/crawler/src/worker.ts +++ b/packages/crawler/src/worker.ts @@ -1,11 +1,14 @@ import { ConnectionOptions, Job, Worker } from "bullmq"; import { archiveSnapshotsWorker, + bulkSnapshotTickWorker, collectSongsWorker, dispatchMilestoneSnapshotsWorker, dispatchRegularSnapshotsWorker, getLatestVideosWorker, getVideoInfoWorker, + scheduleCleanupWorker, + snapshotTickWorker, snapshotVideoWorker, takeBulkSnapshotForVideosWorker, } from "mq/exec/executors.ts"; @@ -13,7 +16,6 @@ import { redis } from "@core/db/redis.ts"; import logger from "log/logger.ts"; import { lockManager } from "mq/lockManager.ts"; import { WorkerError } from "mq/schema.ts"; -import { bulkSnapshotTickWorker, scheduleCleanupWorker, snapshotTickWorker } from "mq/exec/snapshotTick.ts"; const releaseLockForJob = async (name: string) => { await lockManager.releaseLock(name);