From b14a43f63fb3ea7953088febd785b1df15324202 Mon Sep 17 00:00:00 2001 From: alikia2x Date: Sat, 22 Feb 2025 20:56:57 +0800 Subject: [PATCH] fix: feeding empty data to the filter model --- deno.json | 5 +++-- lib/ml/filter_inference.ts | 3 +-- lib/mq/exec/classifyVideo.ts | 23 ++++++++++++++++++++--- lib/mq/exec/getVideoTags.ts | 2 +- lib/mq/init.ts | 17 +++++++---------- lib/mq/schema.ts | 12 ++++++++++++ src/bullui.ts | 4 ++-- src/filterWorker.ts | 6 +++--- src/worker.ts | 18 +++--------------- 9 files changed, 52 insertions(+), 38 deletions(-) create mode 100644 lib/mq/schema.ts diff --git a/deno.json b/deno.json index b358748..16c5998 100644 --- a/deno.json +++ b/deno.json @@ -10,10 +10,11 @@ "build": "deno run -A dev.ts build", "preview": "deno run -A main.ts", "update": "deno run -A -r https://fresh.deno.dev/update .", - "worker": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/worker.ts", + "worker:main": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/worker.ts", + "worker:filter": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/filterWorker.ts", "adder": "deno run --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts", "bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts", - "all": "concurrently 'deno task start' 'deno task worker' 'deno task adder' 'deno task bullui'", + "all": "concurrently 'deno task start' 'deno task worker:main' 'deno task adder' 'deno task bullui' 'deno task worker:filter'", "test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run" }, "lint": { diff --git a/lib/ml/filter_inference.ts b/lib/ml/filter_inference.ts index 4f7cb25..cb0f1e8 100644 --- a/lib/ml/filter_inference.ts +++ b/lib/ml/filter_inference.ts @@ -1,7 +1,7 @@ import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers"; import * as ort from "onnxruntime"; import logger from "lib/log/logger.ts"; -import { WorkerError } from "src/worker.ts"; +import { WorkerError } from "../mq/schema.ts"; const tokenizerModel = "alikia2x/jina-embedding-v3-m2v-1024"; const onnxClassifierPath = "./model/video_classifier_v3_11.onnx"; @@ -97,7 +97,6 @@ export async function classifyVideo( tags, author_info, ], sessionEmbedding); - const probabilities = await runClassification(embeddings); logger.log(`Prediction result for aid: ${aid}: [${probabilities.map((p) => p.toFixed(5))}]`, "ml") return probabilities.indexOf(Math.max(...probabilities)); diff --git a/lib/mq/exec/classifyVideo.ts b/lib/mq/exec/classifyVideo.ts index 848dd7b..195b79d 100644 --- a/lib/mq/exec/classifyVideo.ts +++ b/lib/mq/exec/classifyVideo.ts @@ -3,6 +3,7 @@ import { db } from "lib/db/init.ts"; import { getUnlabeledVideos, getVideoInfoFromAllData, insertVideoLabel} from "lib/db/allData.ts"; import { classifyVideo, initializeModels } from "lib/ml/filter_inference.ts"; import { ClassifyVideoQueue } from "lib/mq/index.ts"; +import logger from "lib/log/logger.ts"; export const classifyVideoWorker = async (job: Job) => { const client = await db.connect(); @@ -12,10 +13,22 @@ export const classifyVideoWorker = async (job: Job) => { } const videoInfo = await getVideoInfoFromAllData(client, aid); - const label = await classifyVideo(videoInfo.title ?? "", videoInfo.description ?? "", videoInfo.tags ?? "", "", aid); + const title = videoInfo.title?.trim() || "untitled"; + const description = videoInfo.description?.trim() || "N/A"; + const tags = videoInfo.tags?.trim() || "empty"; + const authorInfo = "No"; + const label = await classifyVideo(title, description, tags, authorInfo, aid); + if (label == -1) { + logger.warn(`Failed to classify video ${aid}`, "ml"); + } insertVideoLabel(client, aid, label); - + client.release(); + + job.updateData({ + ...job.data, label: label, + }); + return 0; }; @@ -24,7 +37,11 @@ export const classifyVideosWorker = async () => { const client = await db.connect(); const videos = await getUnlabeledVideos(client); client.release(); + let i = 0; for (const aid of videos) { - await ClassifyVideoQueue.add("classifyVideo", { aid }); + if (i > 200) return 10000 + i; + await ClassifyVideoQueue.add("classifyVideo", { aid: Number(aid) }); + i++; } + return 0; }; diff --git a/lib/mq/exec/getVideoTags.ts b/lib/mq/exec/getVideoTags.ts index 6d713cc..1608098 100644 --- a/lib/mq/exec/getVideoTags.ts +++ b/lib/mq/exec/getVideoTags.ts @@ -8,7 +8,7 @@ import logger from "lib/log/logger.ts"; import { getNullVideoTagsList, updateVideoTags } from "lib/db/allData.ts"; import { getVideoTags } from "lib/net/getVideoTags.ts"; import { NetSchedulerError } from "lib/mq/scheduler.ts"; -import { WorkerError } from "src/worker.ts"; +import { WorkerError } from "../schema.ts"; const delayMap = [0.5, 3, 5, 15, 30, 60]; const getJobPriority = (diff: number) => { diff --git a/lib/mq/init.ts b/lib/mq/init.ts index a79ba03..336d843 100644 --- a/lib/mq/init.ts +++ b/lib/mq/init.ts @@ -1,22 +1,19 @@ import { MINUTE, SECOND } from "$std/datetime/constants.ts"; -import { LatestVideosQueue, VideoTagsQueue } from "lib/mq/index.ts"; +import { ClassifyVideoQueue, LatestVideosQueue, VideoTagsQueue } from "lib/mq/index.ts"; import logger from "lib/log/logger.ts"; -async function configGetLatestVideos() { +export async function initMQ() { await LatestVideosQueue.upsertJobScheduler("getLatestVideos", { - every: 1 * MINUTE, + every: 1 * MINUTE }); -} - -async function configGetVideosTags() { await VideoTagsQueue.upsertJobScheduler("getVideosTags", { every: 30 * SECOND, immediately: true, }); -} + await ClassifyVideoQueue.upsertJobScheduler("classifyVideos", { + every: 30 * SECOND, + immediately: true, + }) -export async function initMQ() { - await configGetLatestVideos(); - await configGetVideosTags(); logger.log("Message queue initialized."); } diff --git a/lib/mq/schema.ts b/lib/mq/schema.ts new file mode 100644 index 0000000..9b48e99 --- /dev/null +++ b/lib/mq/schema.ts @@ -0,0 +1,12 @@ +export class WorkerError extends Error { + public service?: string; + public codePath?: string; + public rawError: Error; + constructor(rawError: Error, service?: string, codePath?: string) { + super(rawError.message); + this.name = "WorkerFailure"; + this.codePath = codePath; + this.service = service; + this.rawError = rawError; + } +} \ No newline at end of file diff --git a/src/bullui.ts b/src/bullui.ts index 9aab14b..1850bac 100644 --- a/src/bullui.ts +++ b/src/bullui.ts @@ -2,13 +2,13 @@ import express from "express"; import { createBullBoard } from "@bull-board/api"; import { BullMQAdapter } from "@bull-board/api/bullMQAdapter.js"; import { ExpressAdapter } from "@bull-board/express"; -import { LatestVideosQueue, VideoTagsQueue } from "lib/mq/index.ts"; +import { ClassifyVideoQueue, LatestVideosQueue, VideoTagsQueue } from "lib/mq/index.ts"; const serverAdapter = new ExpressAdapter(); serverAdapter.setBasePath("/"); createBullBoard({ - queues: [new BullMQAdapter(LatestVideosQueue), new BullMQAdapter(VideoTagsQueue)], + queues: [new BullMQAdapter(LatestVideosQueue), new BullMQAdapter(VideoTagsQueue), new BullMQAdapter(ClassifyVideoQueue)], serverAdapter: serverAdapter, }); diff --git a/src/filterWorker.ts b/src/filterWorker.ts index 79a06db..51d80a2 100644 --- a/src/filterWorker.ts +++ b/src/filterWorker.ts @@ -1,8 +1,8 @@ import { Job, Worker } from "bullmq"; import { redis } from "lib/db/redis.ts"; import logger from "lib/log/logger.ts"; -import { WorkerError } from "src/worker.ts"; import { classifyVideosWorker, classifyVideoWorker } from "lib/mq/exec/classifyVideo.ts"; +import { WorkerError } from "../lib/mq/schema.ts"; const filterWorker = new Worker( "classifyVideo", @@ -16,11 +16,11 @@ const filterWorker = new Worker( break; } }, - { connection: redis, concurrency: 1, removeOnComplete: { count: 1000 } }, + { connection: redis, concurrency: 4, removeOnComplete: { count: 1000 } }, ); filterWorker.on("active", () => { - logger.log("Worker activated.", "mq"); + logger.log("Worker (filter) activated.", "mq"); }); filterWorker.on("error", (err) => { diff --git a/src/worker.ts b/src/worker.ts index fc483ed..0540d3a 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -5,19 +5,7 @@ import logger from "lib/log/logger.ts"; import { getVideoTagsWorker } from "lib/mq/exec/getVideoTags.ts"; import { getVideoTagsInitializer } from "lib/mq/exec/getVideoTags.ts"; import { lockManager } from "lib/mq/lockManager.ts"; - -export class WorkerError extends Error { - public service?: string; - public codePath?: string; - public rawError: Error; - constructor(rawError: Error, service?: string, codePath?: string) { - super(rawError.message); - this.name = "WorkerFailure"; - this.codePath = codePath; - this.service = service; - this.rawError = rawError; - } -} +import { WorkerError } from "../lib/mq/schema.ts"; Deno.addSignalListener("SIGINT", async () => { logger.log("SIGINT Received: Shutting down workers...", "mq"); @@ -48,7 +36,7 @@ const latestVideoWorker = new Worker( ); latestVideoWorker.on("active", () => { - logger.log("Worker activated.", "mq"); + logger.log("Worker (latestVideos) activated.", "mq"); }); latestVideoWorker.on("error", (err) => { @@ -82,7 +70,7 @@ const videoTagsWorker = new Worker( ); videoTagsWorker.on("active", () => { - logger.log("Worker activated.", "mq"); + logger.log("Worker (videoTags) activated.", "mq"); }); videoTagsWorker.on("error", (err) => {