diff --git a/deno.json b/deno.json index a8e546b..ecc8076 100644 --- a/deno.json +++ b/deno.json @@ -14,7 +14,7 @@ "worker:filter": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/filterWorker.ts", "adder": "deno run --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts", "bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts", - "all": "concurrently 'deno task start' 'deno task worker:main' 'deno task adder' 'deno task bullui' 'deno task worker:filter'", + "all": "concurrently 'deno task worker:main' 'deno task adder' 'deno task bullui' 'deno task worker:filter'", "test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run" }, "lint": { diff --git a/lib/mq/exec/classifyVideo.ts b/lib/mq/exec/classifyVideo.ts index 4e3c935..aa09d8c 100644 --- a/lib/mq/exec/classifyVideo.ts +++ b/lib/mq/exec/classifyVideo.ts @@ -4,6 +4,7 @@ import { getUnlabelledVideos, getVideoInfoFromAllData, insertVideoLabel} from "l import { classifyVideo, initializeModels } from "lib/ml/filter_inference.ts"; import { ClassifyVideoQueue } from "lib/mq/index.ts"; import logger from "lib/log/logger.ts"; +import { lockManager } from "lib/mq/lockManager.ts"; export const classifyVideoWorker = async (job: Job) => { const client = await db.connect(); @@ -33,16 +34,29 @@ export const classifyVideoWorker = async (job: Job) => { }; export const classifyVideosWorker = async () => { + if (await lockManager.isLocked("classifyVideos")) { + logger.log("job:classifyVideos is locked, skipping.", "mq"); + return; + } + + lockManager.acquireLock("classifyVideos"); + await initializeModels(); + const client = await db.connect(); const videos = await getUnlabelledVideos(client); logger.log(`Found ${videos.length} unlabelled videos`) client.release(); + let i = 0; for (const aid of videos) { - if (i > 200) return 10000 + i; + if (i > 200) { + lockManager.releaseLock("classifyVideos"); + return 10000 + i; + } await ClassifyVideoQueue.add("classifyVideo", { aid: Number(aid) }); i++; } + lockManager.releaseLock("classifyVideos"); return 0; }; diff --git a/lib/task/insertLatestVideo.ts b/lib/task/insertLatestVideo.ts index 0cebd34..e6b750b 100644 --- a/lib/task/insertLatestVideo.ts +++ b/lib/task/insertLatestVideo.ts @@ -18,7 +18,6 @@ export async function insertLatestVideos( } logger.log(`Latest video in the database: ${new Date(latestVideoTimestamp).toISOString()}`, "net", "fn:insertLatestVideos()") const videoIndex = await getVideoPositionInNewList(latestVideoTimestamp); - logger.log(`Position of the video in the latest list: ${videoIndex}`, "net", "fn:insertLatestVideos()") if (videoIndex == null) { logger.error("Cannot locate the video through bisect.", "net", "fn:insertLatestVideos()"); return null diff --git a/src/filterWorker.ts b/src/filterWorker.ts index 51d80a2..c4ace3c 100644 --- a/src/filterWorker.ts +++ b/src/filterWorker.ts @@ -2,7 +2,20 @@ import { Job, Worker } from "bullmq"; import { redis } from "lib/db/redis.ts"; import logger from "lib/log/logger.ts"; import { classifyVideosWorker, classifyVideoWorker } from "lib/mq/exec/classifyVideo.ts"; -import { WorkerError } from "../lib/mq/schema.ts"; +import { WorkerError } from "lib/mq/schema.ts"; +import { lockManager } from "lib/mq/lockManager.ts"; + +Deno.addSignalListener("SIGINT", async () => { + logger.log("SIGINT Received: Shutting down workers...", "mq"); + await filterWorker.close(true); + Deno.exit(); +}); + +Deno.addSignalListener("SIGTERM", async () => { + logger.log("SIGTERM Received: Shutting down workers...", "mq"); + await filterWorker.close(true); + Deno.exit(); +}); const filterWorker = new Worker( "classifyVideo", @@ -26,4 +39,8 @@ filterWorker.on("active", () => { filterWorker.on("error", (err) => { const e = err as WorkerError; logger.error(e.rawError, e.service, e.codePath); -}); \ No newline at end of file +}); + +filterWorker.on("closed", async() => { + await lockManager.releaseLock("classifyVideos"); +})