diff --git a/lib/mq/exec/snapshotTick.ts b/lib/mq/exec/snapshotTick.ts index c4882c4..8fd7247 100644 --- a/lib/mq/exec/snapshotTick.ts +++ b/lib/mq/exec/snapshotTick.ts @@ -20,6 +20,7 @@ import { insertVideoSnapshot } from "lib/mq/task/getVideoStats.ts"; import { NetSchedulerError } from "lib/mq/scheduler.ts"; import { setBiliVideoStatus } from "lib/db/allData.ts"; import { truncate } from "lib/utils/truncate.ts"; +import { lockManager } from "lib/mq/lockManager.ts"; const priorityMap: { [key: string]: number } = { "milestone": 1, @@ -132,6 +133,7 @@ export const collectMilestoneSnapshotsWorker = async (_job: Job) => { export const regularSnapshotsWorker = async (_job: Job) => { const client = await db.connect(); const startedAt = Date.now(); + await lockManager.acquireLock("dispatchRegularSnapshots"); try { const aids = await getVideosWithoutActiveSnapshotSchedule(client); for (const rawAid of aids) { @@ -148,6 +150,7 @@ export const regularSnapshotsWorker = async (_job: Job) => { } catch (e) { logger.error(e as Error, "mq", "fn:regularSnapshotsWorker"); } finally { + lockManager.releaseLock("dispatchRegularSnapshots"); client.release(); } }; diff --git a/src/worker.ts b/src/worker.ts index 8198d09..aa11c25 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -91,3 +91,7 @@ snapshotWorker.on("error", (err) => { const e = err as WorkerError; logger.error(e.rawError, e.service, e.codePath); }); + +snapshotWorker.on("closed", async () => { + await lockManager.releaseLock("dispatchRegularSnapshots"); +})