From 7a6892ae8e3cc9ed50d89aed5b9b53fe4688553f Mon Sep 17 00:00:00 2001 From: alikia2x Date: Sun, 13 Apr 2025 22:22:41 +0800 Subject: [PATCH 01/12] fix: passing reference causes a const object to be incorrectly changed --- packages/backend/root.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/backend/root.ts b/packages/backend/root.ts index d8024c7..aa2a1e9 100644 --- a/packages/backend/root.ts +++ b/packages/backend/root.ts @@ -3,10 +3,10 @@ import { VERSION } from "./main.ts"; import { createHandlers } from "./utils.ts"; export const rootHandler = createHandlers((c) => { - let singer: Singer | Singer[] | null; + let singer: Singer | Singer[]; const shouldShowSpecialSinger = Math.random() < 0.016; if (getSingerForBirthday().length !== 0) { - singer = getSingerForBirthday(); + singer = JSON.parse(JSON.stringify(getSingerForBirthday())) as Singer[]; for (const s of singer) { delete s.birthday; s.message = `祝${s.name}生日快乐~`; From d80a6bfcd90ecd948311e02670513497b978fc6f Mon Sep 17 00:00:00 2001 From: alikia2x Date: Sun, 13 Apr 2025 23:15:09 +0800 Subject: [PATCH 02/12] add: restarting when process quit --- packages/crawler/deno.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/crawler/deno.json b/packages/crawler/deno.json index 7ceed53..9509763 100644 --- a/packages/crawler/deno.json +++ b/packages/crawler/deno.json @@ -12,7 +12,7 @@ "worker:filter": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/filterWorker.ts", "adder": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts", "bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts", - "all": "concurrently 'deno task worker:main' 'deno task adder' 'deno task bullui' 'deno task worker:filter'", + "all": "concurrently --restart-tries -1 'deno task worker:main' 'deno task adder' 'deno task bullui' 'deno task worker:filter'", "test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run" }, "lint": { From 9ef513eed730c37bbbbd1fdec93da73fcaf90c4a Mon Sep 17 00:00:00 2001 From: alikia2x Date: Sun, 13 Apr 2025 23:55:43 +0800 Subject: [PATCH 03/12] update: add randomness in interval of re-scheduling when no proxy available --- packages/crawler/mq/exec/snapshotTick.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/crawler/mq/exec/snapshotTick.ts b/packages/crawler/mq/exec/snapshotTick.ts index ac7f04e..39f9ca1 100644 --- a/packages/crawler/mq/exec/snapshotTick.ts +++ b/packages/crawler/mq/exec/snapshotTick.ts @@ -234,7 +234,7 @@ export const takeBulkSnapshotForVideosWorker = async (job: Job) => { "fn:takeBulkSnapshotForVideosWorker", ); await bulkSetSnapshotStatus(client, ids, "completed"); - await bulkScheduleSnapshot(client, aidsToFetch, "normal", Date.now() + 2 * MINUTE); + await bulkScheduleSnapshot(client, aidsToFetch, "normal", Date.now() + 20 * MINUTE * Math.random()); return; } logger.error(e as Error, "mq", "fn:takeBulkSnapshotForVideosWorker"); From 1f6411b512e538e33df8d261881c5cc0959f07c2 Mon Sep 17 00:00:00 2001 From: alikia2x Date: Mon, 14 Apr 2025 00:02:00 +0800 Subject: [PATCH 04/12] fix: added lower limit in collecting videos near 100k milestone --- packages/crawler/db/snapshot.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/crawler/db/snapshot.ts b/packages/crawler/db/snapshot.ts index 119e353..f8b65a8 100644 --- a/packages/crawler/db/snapshot.ts +++ b/packages/crawler/db/snapshot.ts @@ -7,7 +7,7 @@ export async function getVideosNearMilestone(client: Client) { SELECT ls.* FROM latest_video_snapshot ls WHERE - views < 100000 OR + (views >= 5000 AND views < 100000) OR (views >= 900000 AND views < 1000000) OR (views >= 9900000 AND views < 10000000) `); From 10b761e3dbf6884f5f345d9d9376ea9f2107b0aa Mon Sep 17 00:00:00 2001 From: alikia2x Date: Mon, 14 Apr 2025 00:07:50 +0800 Subject: [PATCH 05/12] improve: query for getVideosNearMilestone --- packages/crawler/db/snapshot.ts | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/packages/crawler/db/snapshot.ts b/packages/crawler/db/snapshot.ts index f8b65a8..25c582d 100644 --- a/packages/crawler/db/snapshot.ts +++ b/packages/crawler/db/snapshot.ts @@ -6,8 +6,16 @@ export async function getVideosNearMilestone(client: Client) { const queryResult = await client.queryObject(` SELECT ls.* FROM latest_video_snapshot ls + RIGHT JOIN songs ON songs.aid = ls.aid WHERE - (views >= 5000 AND views < 100000) OR + (views >= 50000 AND views < 100000) OR + (views >= 900000 AND views < 1000000) OR + (views >= 9900000 AND views < 10000000) + UNION + SELECT ls.* + FROM latest_video_snapshot ls + WHERE + (views >= 90000 AND views < 100000) OR (views >= 900000 AND views < 1000000) OR (views >= 9900000 AND views < 10000000) `); From 7a7c5cada935237e567600ea3bb5fa953057911e Mon Sep 17 00:00:00 2001 From: alikia2x Date: Mon, 14 Apr 2025 00:25:39 +0800 Subject: [PATCH 06/12] feat: full snapshot archiving for all videos --- packages/crawler/db/snapshotSchedule.ts | 30 ++++++++++--- packages/crawler/mq/exec/snapshotTick.ts | 57 +++++++++++++++++++----- packages/crawler/mq/init.ts | 7 ++- packages/crawler/src/worker.ts | 3 ++ 4 files changed, 78 insertions(+), 19 deletions(-) diff --git a/packages/crawler/db/snapshotSchedule.ts b/packages/crawler/db/snapshotSchedule.ts index fa56f44..7fb4080 100644 --- a/packages/crawler/db/snapshotSchedule.ts +++ b/packages/crawler/db/snapshotSchedule.ts @@ -2,7 +2,7 @@ import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { SnapshotScheduleType } from "@core/db/schema"; import logger from "log/logger.ts"; import { MINUTE } from "$std/datetime/constants.ts"; -import { redis } from "../../core/db/redis.ts"; +import { redis } from "@core/db/redis.ts"; import { Redis } from "ioredis"; const REDIS_KEY = "cvsa:snapshot_window_counts"; @@ -272,11 +272,17 @@ export async function getSnapshotsInNextSecond(client: Client) { export async function getBulkSnapshotsInNextSecond(client: Client) { const query = ` - SELECT * - FROM snapshot_schedule - WHERE started_at <= NOW() + INTERVAL '15 seconds' AND status = 'pending' AND type = 'normal' - ORDER BY started_at - LIMIT 1000; + SELECT * + FROM snapshot_schedule + WHERE (started_at <= NOW() + INTERVAL '15 seconds') + AND status = 'pending' + AND (type = 'normal' OR type = 'archive') + ORDER BY CASE + WHEN type = 'normal' THEN 1 + WHEN type = 'archive' THEN 2 + END, + started_at + LIMIT 1000; `; const res = await client.queryObject(query, []); return res.rows; @@ -306,3 +312,15 @@ export async function getVideosWithoutActiveSnapshotSchedule(client: Client) { const res = await client.queryObject<{ aid: number }>(query, []); return res.rows.map((r) => Number(r.aid)); } + +export async function getAllVideosWithoutActiveSnapshotSchedule(client: Client) { + const query: string = ` + SELECT s.aid + FROM bilibili_metadata s + LEFT JOIN snapshot_schedule ss ON s.aid = ss.aid AND (ss.status = 'pending' OR ss.status = 'processing') + WHERE ss.aid IS NULL + `; + const res = await client.queryObject<{ aid: number }>(query, []); + return res.rows.map((r) => Number(r.aid)); +} + diff --git a/packages/crawler/mq/exec/snapshotTick.ts b/packages/crawler/mq/exec/snapshotTick.ts index 39f9ca1..9855b1a 100644 --- a/packages/crawler/mq/exec/snapshotTick.ts +++ b/packages/crawler/mq/exec/snapshotTick.ts @@ -6,7 +6,7 @@ import { bulkScheduleSnapshot, bulkSetSnapshotStatus, findClosestSnapshot, - findSnapshotBefore, + findSnapshotBefore, getAllVideosWithoutActiveSnapshotSchedule, getBulkSnapshotsInNextSecond, getLatestSnapshot, getSnapshotsInNextSecond, @@ -28,6 +28,7 @@ import { lockManager } from "mq/lockManager.ts"; import { getSongsPublihsedAt } from "db/songs.ts"; import { bulkGetVideoStats } from "net/bulkGetVideoStats.ts"; import { getAdjustedShortTermETA } from "../scheduling.ts"; +import {SnapshotScheduleType} from "@core/db/schema"; const priorityMap: { [key: string]: number } = { "milestone": 1, @@ -52,13 +53,8 @@ export const bulkSnapshotTickWorker = async (_job: Job) => { const filteredAids = await bulkGetVideosWithoutProcessingSchedules(client, aids); if (filteredAids.length === 0) continue; await bulkSetSnapshotStatus(client, filteredAids, "processing"); - const dataMap: { [key: number]: number } = {}; - for (const schedule of group) { - const id = Number(schedule.id); - dataMap[id] = Number(schedule.aid); - } await SnapshotQueue.add("bulkSnapshotVideo", { - map: dataMap, + schedules: group, }, { priority: 3 }); } return `OK` @@ -146,6 +142,38 @@ const getRegularSnapshotInterval = async (client: Client, aid: number) => { return 6; }; +export const archiveSnapshotsWorker = async (_job: Job) => { + const client = await db.connect(); + const startedAt = Date.now(); + if (await lockManager.isLocked("dispatchArchiveSnapshots")) { + logger.log("dispatchArchiveSnapshots is already running", "mq"); + client.release(); + return; + } + await lockManager.acquireLock("dispatchArchiveSnapshots", 30 * 60); + try { + const aids = await getAllVideosWithoutActiveSnapshotSchedule(client); + for (const rawAid of aids) { + const aid = Number(rawAid); + const latestSnapshot = await getLatestVideoSnapshot(client, aid); + const now = Date.now(); + const lastSnapshotedAt = latestSnapshot?.time ?? now; + const interval = 168; + logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq"); + const targetTime = lastSnapshotedAt + interval * HOUR; + await scheduleSnapshot(client, aid, "archive", targetTime); + if (now - startedAt > 250 * MINUTE) { + return; + } + } + } catch (e) { + logger.error(e as Error, "mq", "fn:archiveSnapshotsWorker"); + } finally { + await lockManager.releaseLock("dispatchArchiveSnapshots"); + client.release(); + } +}; + export const regularSnapshotsWorker = async (_job: Job) => { const client = await db.connect(); const startedAt = Date.now(); @@ -179,13 +207,14 @@ export const regularSnapshotsWorker = async (_job: Job) => { }; export const takeBulkSnapshotForVideosWorker = async (job: Job) => { - const dataMap: { [key: number]: number } = job.data.map; - const ids = Object.keys(dataMap).map((id) => Number(id)); + const schedules: SnapshotScheduleType[] = job.data.schedules; + const ids = schedules.map((schedule) => Number(schedule.id)); const aidsToFetch: number[] = []; const client = await db.connect(); try { - for (const id of ids) { - const aid = Number(dataMap[id]); + for (const schedule of schedules) { + const aid = Number(schedule.aid); + const id = Number(schedule.id); const exists = await snapshotScheduleExists(client, id); if (!exists) { continue; @@ -220,7 +249,11 @@ export const takeBulkSnapshotForVideosWorker = async (job: Job) => { logger.log(`Taken snapshot for video ${aid} in bulk.`, "net", "fn:takeBulkSnapshotForVideosWorker"); } await bulkSetSnapshotStatus(client, ids, "completed"); - for (const aid of aidsToFetch) { + + for (const schedule of schedules) { + const aid = Number(schedule.aid); + const type = schedule.type; + if (type == 'archive') continue; const interval = await getRegularSnapshotInterval(client, aid); logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq"); await scheduleSnapshot(client, aid, "normal", Date.now() + interval * HOUR); diff --git a/packages/crawler/mq/init.ts b/packages/crawler/mq/init.ts index 9db7afc..18703b7 100644 --- a/packages/crawler/mq/init.ts +++ b/packages/crawler/mq/init.ts @@ -1,4 +1,4 @@ -import { MINUTE, SECOND } from "$std/datetime/constants.ts"; +import { HOUR, MINUTE, SECOND } from "$std/datetime/constants.ts"; import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "mq/index.ts"; import logger from "log/logger.ts"; import { initSnapshotWindowCounts } from "db/snapshotSchedule.ts"; @@ -55,6 +55,11 @@ export async function initMQ() { immediately: true, }); + await SnapshotQueue.upsertJobScheduler("dispatchArchiveSnapshots", { + every: 6 * HOUR, + immediately: true, + }); + await SnapshotQueue.upsertJobScheduler("scheduleCleanup", { every: 30 * MINUTE, immediately: true, diff --git a/packages/crawler/src/worker.ts b/packages/crawler/src/worker.ts index cde3f89..9c5a902 100644 --- a/packages/crawler/src/worker.ts +++ b/packages/crawler/src/worker.ts @@ -6,6 +6,7 @@ import { lockManager } from "mq/lockManager.ts"; import { WorkerError } from "mq/schema.ts"; import { getVideoInfoWorker } from "mq/exec/getLatestVideos.ts"; import { + archiveSnapshotsWorker, bulkSnapshotTickWorker, collectMilestoneSnapshotsWorker, regularSnapshotsWorker, @@ -82,6 +83,8 @@ const snapshotWorker = new Worker( return await takeBulkSnapshotForVideosWorker(job); case "bulkSnapshotTick": return await bulkSnapshotTickWorker(job); + case "dispatchArchiveSnapshots": + return await archiveSnapshotsWorker(job); default: break; } From d0b7d93e5b843f49c3fb59cbc30c916dc4b158f6 Mon Sep 17 00:00:00 2001 From: alikia2x Date: Mon, 14 Apr 2025 00:26:59 +0800 Subject: [PATCH 07/12] fix: incorrect logging text in archiveSnapshotsWorker --- packages/crawler/mq/exec/snapshotTick.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/crawler/mq/exec/snapshotTick.ts b/packages/crawler/mq/exec/snapshotTick.ts index 9855b1a..8fbc50f 100644 --- a/packages/crawler/mq/exec/snapshotTick.ts +++ b/packages/crawler/mq/exec/snapshotTick.ts @@ -159,7 +159,7 @@ export const archiveSnapshotsWorker = async (_job: Job) => { const now = Date.now(); const lastSnapshotedAt = latestSnapshot?.time ?? now; const interval = 168; - logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq"); + logger.log(`Scheduled archive snapshot for aid ${aid} in ${interval} hours.`, "mq"); const targetTime = lastSnapshotedAt + interval * HOUR; await scheduleSnapshot(client, aid, "archive", targetTime); if (now - startedAt > 250 * MINUTE) { From f1651fee30dc62427fbb79493545cba7dd1940eb Mon Sep 17 00:00:00 2001 From: alikia2x Date: Mon, 14 Apr 2025 00:28:45 +0800 Subject: [PATCH 08/12] fix: did not release lock for dispatchArchiveSnapshots when quitting --- packages/crawler/mq/exec/snapshotTick.ts | 2 +- packages/crawler/src/worker.ts | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/crawler/mq/exec/snapshotTick.ts b/packages/crawler/mq/exec/snapshotTick.ts index 8fbc50f..00ae1bc 100644 --- a/packages/crawler/mq/exec/snapshotTick.ts +++ b/packages/crawler/mq/exec/snapshotTick.ts @@ -159,7 +159,7 @@ export const archiveSnapshotsWorker = async (_job: Job) => { const now = Date.now(); const lastSnapshotedAt = latestSnapshot?.time ?? now; const interval = 168; - logger.log(`Scheduled archive snapshot for aid ${aid} in ${interval} hours.`, "mq"); + logger.log(`Scheduled archive snapshot for aid ${aid} in ${interval} hours.`, "mq", "fn:archiveSnapshotsWorker"); const targetTime = lastSnapshotedAt + interval * HOUR; await scheduleSnapshot(client, aid, "archive", targetTime); if (now - startedAt > 250 * MINUTE) { diff --git a/packages/crawler/src/worker.ts b/packages/crawler/src/worker.ts index 9c5a902..065857e 100644 --- a/packages/crawler/src/worker.ts +++ b/packages/crawler/src/worker.ts @@ -99,4 +99,5 @@ snapshotWorker.on("error", (err) => { snapshotWorker.on("closed", async () => { await lockManager.releaseLock("dispatchRegularSnapshots"); + await lockManager.releaseLock("dispatchArchiveSnapshots"); }); From 21c918f1fa1ff31e092f43434889e06eb4cf41c2 Mon Sep 17 00:00:00 2001 From: alikia2x Date: Mon, 14 Apr 2025 00:32:06 +0800 Subject: [PATCH 09/12] fix: BitInt serialization in bulkSnapshotTickWorker --- packages/crawler/mq/exec/snapshotTick.ts | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/packages/crawler/mq/exec/snapshotTick.ts b/packages/crawler/mq/exec/snapshotTick.ts index 00ae1bc..73e76a7 100644 --- a/packages/crawler/mq/exec/snapshotTick.ts +++ b/packages/crawler/mq/exec/snapshotTick.ts @@ -53,8 +53,19 @@ export const bulkSnapshotTickWorker = async (_job: Job) => { const filteredAids = await bulkGetVideosWithoutProcessingSchedules(client, aids); if (filteredAids.length === 0) continue; await bulkSetSnapshotStatus(client, filteredAids, "processing"); + const schedulesData = group.map((schedule) => { + return { + aid: Number(schedule.aid), + id: Number(schedule.id), + type: schedule.type, + created_at: schedule.created_at, + started_at: schedule.started_at, + finished_at: schedule.finished_at, + status: schedule.status + } + }) await SnapshotQueue.add("bulkSnapshotVideo", { - schedules: group, + schedules: schedulesData, }, { priority: 3 }); } return `OK` @@ -80,7 +91,7 @@ export const snapshotTickWorker = async (_job: Job) => { const aid = Number(schedule.aid); await setSnapshotStatus(client, schedule.id, "processing"); await SnapshotQueue.add("snapshotVideo", { - aid: aid, + aid: Number(aid), id: Number(schedule.id), type: schedule.type ?? "normal", }, { priority }); From bae1f84beaf4a1443a0f722e1d27f20f4563171e Mon Sep 17 00:00:00 2001 From: alikia2x Date: Mon, 14 Apr 2025 00:39:17 +0800 Subject: [PATCH 10/12] add: logging for releasing locks --- packages/crawler/src/worker.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/crawler/src/worker.ts b/packages/crawler/src/worker.ts index 065857e..d002ece 100644 --- a/packages/crawler/src/worker.ts +++ b/packages/crawler/src/worker.ts @@ -1,6 +1,6 @@ import { ConnectionOptions, Job, Worker } from "bullmq"; import { collectSongsWorker, getLatestVideosWorker } from "mq/executors.ts"; -import { redis } from "../../core/db/redis.ts"; +import { redis } from "@core/db/redis.ts"; import logger from "log/logger.ts"; import { lockManager } from "mq/lockManager.ts"; import { WorkerError } from "mq/schema.ts"; @@ -100,4 +100,5 @@ snapshotWorker.on("error", (err) => { snapshotWorker.on("closed", async () => { await lockManager.releaseLock("dispatchRegularSnapshots"); await lockManager.releaseLock("dispatchArchiveSnapshots"); + logger.log(`Released lock: dispatchArchiveSnapshots, dispatchRegularSnapshots`, "mq", "snapshotWorker:on:closed"); }); From 6df6345ec19c6a59e0d1d381255ef41a05dea6f8 Mon Sep 17 00:00:00 2001 From: alikia2x Date: Mon, 14 Apr 2025 00:46:43 +0800 Subject: [PATCH 11/12] update: status of schedule when no proxy available --- packages/crawler/mq/exec/snapshotTick.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/crawler/mq/exec/snapshotTick.ts b/packages/crawler/mq/exec/snapshotTick.ts index 73e76a7..69f36de 100644 --- a/packages/crawler/mq/exec/snapshotTick.ts +++ b/packages/crawler/mq/exec/snapshotTick.ts @@ -277,7 +277,7 @@ export const takeBulkSnapshotForVideosWorker = async (job: Job) => { "mq", "fn:takeBulkSnapshotForVideosWorker", ); - await bulkSetSnapshotStatus(client, ids, "completed"); + await bulkSetSnapshotStatus(client, ids, "no_proxy"); await bulkScheduleSnapshot(client, aidsToFetch, "normal", Date.now() + 20 * MINUTE * Math.random()); return; } @@ -356,7 +356,7 @@ export const takeSnapshotForVideoWorker = async (job: Job) => { "mq", "fn:takeSnapshotForVideoWorker", ); - await setSnapshotStatus(client, id, "completed"); + await setSnapshotStatus(client, id, "no_proxy"); await scheduleSnapshot(client, aid, type, Date.now() + retryInterval); return; } From 0614067278a2adf8dc5b7cdbb84933f0b69dc102 Mon Sep 17 00:00:00 2001 From: alikia2x Date: Mon, 14 Apr 2025 00:49:57 +0800 Subject: [PATCH 12/12] update: handler for SIGINT & SIGTERM --- packages/crawler/src/worker.ts | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/packages/crawler/src/worker.ts b/packages/crawler/src/worker.ts index d002ece..0e3af64 100644 --- a/packages/crawler/src/worker.ts +++ b/packages/crawler/src/worker.ts @@ -16,8 +16,21 @@ import { takeSnapshotForVideoWorker, } from "mq/exec/snapshotTick.ts"; +const releaseLockForJob = async (name: string) => { + await lockManager.releaseLock(name); + logger.log(`Released lock: ${name}`, "mq"); +} + +const releaseAllLocks = async () => { + const locks = ["dispatchRegularSnapshots", "dispatchArchiveSnapshots", "getLatestVideos"]; + for (const lock of locks) { + await releaseLockForJob(lock); + } +} + Deno.addSignalListener("SIGINT", async () => { logger.log("SIGINT Received: Shutting down workers...", "mq"); + await releaseAllLocks(); await latestVideoWorker.close(true); await snapshotWorker.close(true); Deno.exit(); @@ -25,6 +38,7 @@ Deno.addSignalListener("SIGINT", async () => { Deno.addSignalListener("SIGTERM", async () => { logger.log("SIGTERM Received: Shutting down workers...", "mq"); + await releaseAllLocks(); await latestVideoWorker.close(true); await snapshotWorker.close(true); Deno.exit(); @@ -61,10 +75,6 @@ latestVideoWorker.on("error", (err) => { logger.error(e.rawError, e.service, e.codePath); }); -latestVideoWorker.on("closed", async () => { - await lockManager.releaseLock("getLatestVideos"); -}); - const snapshotWorker = new Worker( "snapshot", async (job: Job) => { @@ -95,10 +105,4 @@ const snapshotWorker = new Worker( snapshotWorker.on("error", (err) => { const e = err as WorkerError; logger.error(e.rawError, e.service, e.codePath); -}); - -snapshotWorker.on("closed", async () => { - await lockManager.releaseLock("dispatchRegularSnapshots"); - await lockManager.releaseLock("dispatchArchiveSnapshots"); - logger.log(`Released lock: dispatchArchiveSnapshots, dispatchRegularSnapshots`, "mq", "snapshotWorker:on:closed"); -}); +}); \ No newline at end of file