diff --git a/lib/mq/exec/getVideoTags.ts b/lib/mq/exec/getVideoTags.ts index 72c678e..6d713cc 100644 --- a/lib/mq/exec/getVideoTags.ts +++ b/lib/mq/exec/getVideoTags.ts @@ -78,6 +78,7 @@ export const getVideoTagsWorker = async (job: Job) => { export const getVideoTagsInitializer = async () => { const client = await db.connect(); const videos = await getNullVideoTagsList(client); + client.release(); if (videos.length == 0) { return 4; } diff --git a/src/worker.ts b/src/worker.ts index 75dfb94..98ed995 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -2,8 +2,9 @@ import { Job, Worker } from "bullmq"; import { getLatestVideosWorker } from "lib/mq/executors.ts"; import { redis } from "lib/db/redis.ts"; import logger from "lib/log/logger.ts"; -import {getVideoTagsWorker} from "lib/mq/exec/getVideoTags.ts"; +import { getVideoTagsWorker } from "lib/mq/exec/getVideoTags.ts"; import { getVideoTagsInitializer } from "lib/mq/exec/getVideoTags.ts"; +import { lockManager } from "lib/mq/lockManager.ts"; export class WorkerError extends Error { public service?: string; @@ -18,6 +19,20 @@ export class WorkerError extends Error { } } +Deno.addSignalListener("SIGINT", async () => { + logger.log("SIGINT Received: Shutting down workers...", "mq"); + await latestVideoWorker.close(true); + await videoTagsWorker.close(true); + Deno.exit(); +}); + +Deno.addSignalListener("SIGTERM", async () => { + logger.log("SIGTERM Received: Shutting down workers...", "mq"); + await latestVideoWorker.close(true); + await videoTagsWorker.close(true); + Deno.exit(); +}) + const latestVideoWorker = new Worker( "latestVideos", async (job: Job) => { @@ -41,6 +56,9 @@ latestVideoWorker.on("error", (err) => { logger.error(e.rawError, e.service, e.codePath); }); +latestVideoWorker.on("closed", async () => { + await lockManager.releaseLock("getLatestVideos"); +}); const videoTagsWorker = new Worker( "videoTags", @@ -65,4 +83,3 @@ videoTagsWorker.on("error", (err) => { const e = err as WorkerError; logger.error(e.rawError, e.service, e.codePath); }); -