update: add lock for classifyVideo
This commit is contained in:
parent
46191cfd56
commit
c23753aceb
@ -14,7 +14,7 @@
|
|||||||
"worker:filter": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/filterWorker.ts",
|
"worker:filter": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/filterWorker.ts",
|
||||||
"adder": "deno run --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts",
|
"adder": "deno run --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts",
|
||||||
"bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts",
|
"bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts",
|
||||||
"all": "concurrently 'deno task start' 'deno task worker:main' 'deno task adder' 'deno task bullui' 'deno task worker:filter'",
|
"all": "concurrently 'deno task worker:main' 'deno task adder' 'deno task bullui' 'deno task worker:filter'",
|
||||||
"test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run"
|
"test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run"
|
||||||
},
|
},
|
||||||
"lint": {
|
"lint": {
|
||||||
|
@ -4,6 +4,7 @@ import { getUnlabelledVideos, getVideoInfoFromAllData, insertVideoLabel} from "l
|
|||||||
import { classifyVideo, initializeModels } from "lib/ml/filter_inference.ts";
|
import { classifyVideo, initializeModels } from "lib/ml/filter_inference.ts";
|
||||||
import { ClassifyVideoQueue } from "lib/mq/index.ts";
|
import { ClassifyVideoQueue } from "lib/mq/index.ts";
|
||||||
import logger from "lib/log/logger.ts";
|
import logger from "lib/log/logger.ts";
|
||||||
|
import { lockManager } from "lib/mq/lockManager.ts";
|
||||||
|
|
||||||
export const classifyVideoWorker = async (job: Job) => {
|
export const classifyVideoWorker = async (job: Job) => {
|
||||||
const client = await db.connect();
|
const client = await db.connect();
|
||||||
@ -33,16 +34,29 @@ export const classifyVideoWorker = async (job: Job) => {
|
|||||||
};
|
};
|
||||||
|
|
||||||
export const classifyVideosWorker = async () => {
|
export const classifyVideosWorker = async () => {
|
||||||
|
if (await lockManager.isLocked("classifyVideos")) {
|
||||||
|
logger.log("job:classifyVideos is locked, skipping.", "mq");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
lockManager.acquireLock("classifyVideos");
|
||||||
|
|
||||||
await initializeModels();
|
await initializeModels();
|
||||||
|
|
||||||
const client = await db.connect();
|
const client = await db.connect();
|
||||||
const videos = await getUnlabelledVideos(client);
|
const videos = await getUnlabelledVideos(client);
|
||||||
logger.log(`Found ${videos.length} unlabelled videos`)
|
logger.log(`Found ${videos.length} unlabelled videos`)
|
||||||
client.release();
|
client.release();
|
||||||
|
|
||||||
let i = 0;
|
let i = 0;
|
||||||
for (const aid of videos) {
|
for (const aid of videos) {
|
||||||
if (i > 200) return 10000 + i;
|
if (i > 200) {
|
||||||
|
lockManager.releaseLock("classifyVideos");
|
||||||
|
return 10000 + i;
|
||||||
|
}
|
||||||
await ClassifyVideoQueue.add("classifyVideo", { aid: Number(aid) });
|
await ClassifyVideoQueue.add("classifyVideo", { aid: Number(aid) });
|
||||||
i++;
|
i++;
|
||||||
}
|
}
|
||||||
|
lockManager.releaseLock("classifyVideos");
|
||||||
return 0;
|
return 0;
|
||||||
};
|
};
|
||||||
|
@ -18,7 +18,6 @@ export async function insertLatestVideos(
|
|||||||
}
|
}
|
||||||
logger.log(`Latest video in the database: ${new Date(latestVideoTimestamp).toISOString()}`, "net", "fn:insertLatestVideos()")
|
logger.log(`Latest video in the database: ${new Date(latestVideoTimestamp).toISOString()}`, "net", "fn:insertLatestVideos()")
|
||||||
const videoIndex = await getVideoPositionInNewList(latestVideoTimestamp);
|
const videoIndex = await getVideoPositionInNewList(latestVideoTimestamp);
|
||||||
logger.log(`Position of the video in the latest list: ${videoIndex}`, "net", "fn:insertLatestVideos()")
|
|
||||||
if (videoIndex == null) {
|
if (videoIndex == null) {
|
||||||
logger.error("Cannot locate the video through bisect.", "net", "fn:insertLatestVideos()");
|
logger.error("Cannot locate the video through bisect.", "net", "fn:insertLatestVideos()");
|
||||||
return null
|
return null
|
||||||
|
@ -2,7 +2,20 @@ import { Job, Worker } from "bullmq";
|
|||||||
import { redis } from "lib/db/redis.ts";
|
import { redis } from "lib/db/redis.ts";
|
||||||
import logger from "lib/log/logger.ts";
|
import logger from "lib/log/logger.ts";
|
||||||
import { classifyVideosWorker, classifyVideoWorker } from "lib/mq/exec/classifyVideo.ts";
|
import { classifyVideosWorker, classifyVideoWorker } from "lib/mq/exec/classifyVideo.ts";
|
||||||
import { WorkerError } from "../lib/mq/schema.ts";
|
import { WorkerError } from "lib/mq/schema.ts";
|
||||||
|
import { lockManager } from "lib/mq/lockManager.ts";
|
||||||
|
|
||||||
|
Deno.addSignalListener("SIGINT", async () => {
|
||||||
|
logger.log("SIGINT Received: Shutting down workers...", "mq");
|
||||||
|
await filterWorker.close(true);
|
||||||
|
Deno.exit();
|
||||||
|
});
|
||||||
|
|
||||||
|
Deno.addSignalListener("SIGTERM", async () => {
|
||||||
|
logger.log("SIGTERM Received: Shutting down workers...", "mq");
|
||||||
|
await filterWorker.close(true);
|
||||||
|
Deno.exit();
|
||||||
|
});
|
||||||
|
|
||||||
const filterWorker = new Worker(
|
const filterWorker = new Worker(
|
||||||
"classifyVideo",
|
"classifyVideo",
|
||||||
@ -26,4 +39,8 @@ filterWorker.on("active", () => {
|
|||||||
filterWorker.on("error", (err) => {
|
filterWorker.on("error", (err) => {
|
||||||
const e = err as WorkerError;
|
const e = err as WorkerError;
|
||||||
logger.error(e.rawError, e.service, e.codePath);
|
logger.error(e.rawError, e.service, e.codePath);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
filterWorker.on("closed", async() => {
|
||||||
|
await lockManager.releaseLock("classifyVideos");
|
||||||
|
})
|
||||||
|
Loading…
Reference in New Issue
Block a user