From 0614067278a2adf8dc5b7cdbb84933f0b69dc102 Mon Sep 17 00:00:00 2001 From: alikia2x Date: Mon, 14 Apr 2025 00:49:57 +0800 Subject: [PATCH] update: handler for SIGINT & SIGTERM --- packages/crawler/src/worker.ts | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/packages/crawler/src/worker.ts b/packages/crawler/src/worker.ts index d002ece..0e3af64 100644 --- a/packages/crawler/src/worker.ts +++ b/packages/crawler/src/worker.ts @@ -16,8 +16,21 @@ import { takeSnapshotForVideoWorker, } from "mq/exec/snapshotTick.ts"; +const releaseLockForJob = async (name: string) => { + await lockManager.releaseLock(name); + logger.log(`Released lock: ${name}`, "mq"); +} + +const releaseAllLocks = async () => { + const locks = ["dispatchRegularSnapshots", "dispatchArchiveSnapshots", "getLatestVideos"]; + for (const lock of locks) { + await releaseLockForJob(lock); + } +} + Deno.addSignalListener("SIGINT", async () => { logger.log("SIGINT Received: Shutting down workers...", "mq"); + await releaseAllLocks(); await latestVideoWorker.close(true); await snapshotWorker.close(true); Deno.exit(); @@ -25,6 +38,7 @@ Deno.addSignalListener("SIGINT", async () => { Deno.addSignalListener("SIGTERM", async () => { logger.log("SIGTERM Received: Shutting down workers...", "mq"); + await releaseAllLocks(); await latestVideoWorker.close(true); await snapshotWorker.close(true); Deno.exit(); @@ -61,10 +75,6 @@ latestVideoWorker.on("error", (err) => { logger.error(e.rawError, e.service, e.codePath); }); -latestVideoWorker.on("closed", async () => { - await lockManager.releaseLock("getLatestVideos"); -}); - const snapshotWorker = new Worker( "snapshot", async (job: Job) => { @@ -95,10 +105,4 @@ const snapshotWorker = new Worker( snapshotWorker.on("error", (err) => { const e = err as WorkerError; logger.error(e.rawError, e.service, e.codePath); -}); - -snapshotWorker.on("closed", async () => { - await lockManager.releaseLock("dispatchRegularSnapshots"); - await lockManager.releaseLock("dispatchArchiveSnapshots"); - logger.log(`Released lock: dispatchArchiveSnapshots, dispatchRegularSnapshots`, "mq", "snapshotWorker:on:closed"); -}); +}); \ No newline at end of file