fix: didn't release after connected to database

add: gracefully shutdown
This commit is contained in:
alikia2x (寒寒) 2025-02-12 01:47:34 +08:00
parent d870627220
commit 7797fef685
Signed by: alikia2x
GPG Key ID: 56209E0CCD8420C6
2 changed files with 20 additions and 2 deletions

View File

@ -78,6 +78,7 @@ export const getVideoTagsWorker = async (job: Job) => {
export const getVideoTagsInitializer = async () => { export const getVideoTagsInitializer = async () => {
const client = await db.connect(); const client = await db.connect();
const videos = await getNullVideoTagsList(client); const videos = await getNullVideoTagsList(client);
client.release();
if (videos.length == 0) { if (videos.length == 0) {
return 4; return 4;
} }

View File

@ -2,8 +2,9 @@ import { Job, Worker } from "bullmq";
import { getLatestVideosWorker } from "lib/mq/executors.ts"; import { getLatestVideosWorker } from "lib/mq/executors.ts";
import { redis } from "lib/db/redis.ts"; import { redis } from "lib/db/redis.ts";
import logger from "lib/log/logger.ts"; import logger from "lib/log/logger.ts";
import {getVideoTagsWorker} from "lib/mq/exec/getVideoTags.ts"; import { getVideoTagsWorker } from "lib/mq/exec/getVideoTags.ts";
import { getVideoTagsInitializer } from "lib/mq/exec/getVideoTags.ts"; import { getVideoTagsInitializer } from "lib/mq/exec/getVideoTags.ts";
import { lockManager } from "lib/mq/lockManager.ts";
export class WorkerError extends Error { export class WorkerError extends Error {
public service?: string; public service?: string;
@ -18,6 +19,20 @@ export class WorkerError extends Error {
} }
} }
Deno.addSignalListener("SIGINT", async () => {
logger.log("SIGINT Received: Shutting down workers...", "mq");
await latestVideoWorker.close(true);
await videoTagsWorker.close(true);
Deno.exit();
});
Deno.addSignalListener("SIGTERM", async () => {
logger.log("SIGTERM Received: Shutting down workers...", "mq");
await latestVideoWorker.close(true);
await videoTagsWorker.close(true);
Deno.exit();
})
const latestVideoWorker = new Worker( const latestVideoWorker = new Worker(
"latestVideos", "latestVideos",
async (job: Job) => { async (job: Job) => {
@ -41,6 +56,9 @@ latestVideoWorker.on("error", (err) => {
logger.error(e.rawError, e.service, e.codePath); logger.error(e.rawError, e.service, e.codePath);
}); });
latestVideoWorker.on("closed", async () => {
await lockManager.releaseLock("getLatestVideos");
});
const videoTagsWorker = new Worker( const videoTagsWorker = new Worker(
"videoTags", "videoTags",
@ -65,4 +83,3 @@ videoTagsWorker.on("error", (err) => {
const e = err as WorkerError; const e = err as WorkerError;
logger.error(e.rawError, e.service, e.codePath); logger.error(e.rawError, e.service, e.codePath);
}); });