fix: feeding empty data to the filter model

This commit is contained in:
alikia2x (寒寒) 2025-02-22 20:56:57 +08:00
parent cecc1c1d2c
commit b14a43f63f
Signed by: alikia2x
GPG Key ID: 56209E0CCD8420C6
9 changed files with 52 additions and 38 deletions

View File

@ -10,10 +10,11 @@
"build": "deno run -A dev.ts build", "build": "deno run -A dev.ts build",
"preview": "deno run -A main.ts", "preview": "deno run -A main.ts",
"update": "deno run -A -r https://fresh.deno.dev/update .", "update": "deno run -A -r https://fresh.deno.dev/update .",
"worker": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/worker.ts", "worker:main": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/worker.ts",
"worker:filter": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/filterWorker.ts",
"adder": "deno run --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts", "adder": "deno run --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts",
"bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts", "bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts",
"all": "concurrently 'deno task start' 'deno task worker' 'deno task adder' 'deno task bullui'", "all": "concurrently 'deno task start' 'deno task worker:main' 'deno task adder' 'deno task bullui' 'deno task worker:filter'",
"test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run" "test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run"
}, },
"lint": { "lint": {

View File

@ -1,7 +1,7 @@
import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers"; import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers";
import * as ort from "onnxruntime"; import * as ort from "onnxruntime";
import logger from "lib/log/logger.ts"; import logger from "lib/log/logger.ts";
import { WorkerError } from "src/worker.ts"; import { WorkerError } from "../mq/schema.ts";
const tokenizerModel = "alikia2x/jina-embedding-v3-m2v-1024"; const tokenizerModel = "alikia2x/jina-embedding-v3-m2v-1024";
const onnxClassifierPath = "./model/video_classifier_v3_11.onnx"; const onnxClassifierPath = "./model/video_classifier_v3_11.onnx";
@ -97,7 +97,6 @@ export async function classifyVideo(
tags, tags,
author_info, author_info,
], sessionEmbedding); ], sessionEmbedding);
const probabilities = await runClassification(embeddings); const probabilities = await runClassification(embeddings);
logger.log(`Prediction result for aid: ${aid}: [${probabilities.map((p) => p.toFixed(5))}]`, "ml") logger.log(`Prediction result for aid: ${aid}: [${probabilities.map((p) => p.toFixed(5))}]`, "ml")
return probabilities.indexOf(Math.max(...probabilities)); return probabilities.indexOf(Math.max(...probabilities));

View File

@ -3,6 +3,7 @@ import { db } from "lib/db/init.ts";
import { getUnlabeledVideos, getVideoInfoFromAllData, insertVideoLabel} from "lib/db/allData.ts"; import { getUnlabeledVideos, getVideoInfoFromAllData, insertVideoLabel} from "lib/db/allData.ts";
import { classifyVideo, initializeModels } from "lib/ml/filter_inference.ts"; import { classifyVideo, initializeModels } from "lib/ml/filter_inference.ts";
import { ClassifyVideoQueue } from "lib/mq/index.ts"; import { ClassifyVideoQueue } from "lib/mq/index.ts";
import logger from "lib/log/logger.ts";
export const classifyVideoWorker = async (job: Job) => { export const classifyVideoWorker = async (job: Job) => {
const client = await db.connect(); const client = await db.connect();
@ -12,10 +13,22 @@ export const classifyVideoWorker = async (job: Job) => {
} }
const videoInfo = await getVideoInfoFromAllData(client, aid); const videoInfo = await getVideoInfoFromAllData(client, aid);
const label = await classifyVideo(videoInfo.title ?? "", videoInfo.description ?? "", videoInfo.tags ?? "", "", aid); const title = videoInfo.title?.trim() || "untitled";
const description = videoInfo.description?.trim() || "N/A";
const tags = videoInfo.tags?.trim() || "empty";
const authorInfo = "No";
const label = await classifyVideo(title, description, tags, authorInfo, aid);
if (label == -1) {
logger.warn(`Failed to classify video ${aid}`, "ml");
}
insertVideoLabel(client, aid, label); insertVideoLabel(client, aid, label);
client.release(); client.release();
job.updateData({
...job.data, label: label,
});
return 0; return 0;
}; };
@ -24,7 +37,11 @@ export const classifyVideosWorker = async () => {
const client = await db.connect(); const client = await db.connect();
const videos = await getUnlabeledVideos(client); const videos = await getUnlabeledVideos(client);
client.release(); client.release();
let i = 0;
for (const aid of videos) { for (const aid of videos) {
await ClassifyVideoQueue.add("classifyVideo", { aid }); if (i > 200) return 10000 + i;
await ClassifyVideoQueue.add("classifyVideo", { aid: Number(aid) });
i++;
} }
return 0;
}; };

View File

@ -8,7 +8,7 @@ import logger from "lib/log/logger.ts";
import { getNullVideoTagsList, updateVideoTags } from "lib/db/allData.ts"; import { getNullVideoTagsList, updateVideoTags } from "lib/db/allData.ts";
import { getVideoTags } from "lib/net/getVideoTags.ts"; import { getVideoTags } from "lib/net/getVideoTags.ts";
import { NetSchedulerError } from "lib/mq/scheduler.ts"; import { NetSchedulerError } from "lib/mq/scheduler.ts";
import { WorkerError } from "src/worker.ts"; import { WorkerError } from "../schema.ts";
const delayMap = [0.5, 3, 5, 15, 30, 60]; const delayMap = [0.5, 3, 5, 15, 30, 60];
const getJobPriority = (diff: number) => { const getJobPriority = (diff: number) => {

View File

@ -1,22 +1,19 @@
import { MINUTE, SECOND } from "$std/datetime/constants.ts"; import { MINUTE, SECOND } from "$std/datetime/constants.ts";
import { LatestVideosQueue, VideoTagsQueue } from "lib/mq/index.ts"; import { ClassifyVideoQueue, LatestVideosQueue, VideoTagsQueue } from "lib/mq/index.ts";
import logger from "lib/log/logger.ts"; import logger from "lib/log/logger.ts";
async function configGetLatestVideos() { export async function initMQ() {
await LatestVideosQueue.upsertJobScheduler("getLatestVideos", { await LatestVideosQueue.upsertJobScheduler("getLatestVideos", {
every: 1 * MINUTE, every: 1 * MINUTE
}); });
}
async function configGetVideosTags() {
await VideoTagsQueue.upsertJobScheduler("getVideosTags", { await VideoTagsQueue.upsertJobScheduler("getVideosTags", {
every: 30 * SECOND, every: 30 * SECOND,
immediately: true, immediately: true,
}); });
} await ClassifyVideoQueue.upsertJobScheduler("classifyVideos", {
every: 30 * SECOND,
immediately: true,
})
export async function initMQ() {
await configGetLatestVideos();
await configGetVideosTags();
logger.log("Message queue initialized."); logger.log("Message queue initialized.");
} }

12
lib/mq/schema.ts Normal file
View File

@ -0,0 +1,12 @@
export class WorkerError extends Error {
public service?: string;
public codePath?: string;
public rawError: Error;
constructor(rawError: Error, service?: string, codePath?: string) {
super(rawError.message);
this.name = "WorkerFailure";
this.codePath = codePath;
this.service = service;
this.rawError = rawError;
}
}

View File

@ -2,13 +2,13 @@ import express from "express";
import { createBullBoard } from "@bull-board/api"; import { createBullBoard } from "@bull-board/api";
import { BullMQAdapter } from "@bull-board/api/bullMQAdapter.js"; import { BullMQAdapter } from "@bull-board/api/bullMQAdapter.js";
import { ExpressAdapter } from "@bull-board/express"; import { ExpressAdapter } from "@bull-board/express";
import { LatestVideosQueue, VideoTagsQueue } from "lib/mq/index.ts"; import { ClassifyVideoQueue, LatestVideosQueue, VideoTagsQueue } from "lib/mq/index.ts";
const serverAdapter = new ExpressAdapter(); const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath("/"); serverAdapter.setBasePath("/");
createBullBoard({ createBullBoard({
queues: [new BullMQAdapter(LatestVideosQueue), new BullMQAdapter(VideoTagsQueue)], queues: [new BullMQAdapter(LatestVideosQueue), new BullMQAdapter(VideoTagsQueue), new BullMQAdapter(ClassifyVideoQueue)],
serverAdapter: serverAdapter, serverAdapter: serverAdapter,
}); });

View File

@ -1,8 +1,8 @@
import { Job, Worker } from "bullmq"; import { Job, Worker } from "bullmq";
import { redis } from "lib/db/redis.ts"; import { redis } from "lib/db/redis.ts";
import logger from "lib/log/logger.ts"; import logger from "lib/log/logger.ts";
import { WorkerError } from "src/worker.ts";
import { classifyVideosWorker, classifyVideoWorker } from "lib/mq/exec/classifyVideo.ts"; import { classifyVideosWorker, classifyVideoWorker } from "lib/mq/exec/classifyVideo.ts";
import { WorkerError } from "../lib/mq/schema.ts";
const filterWorker = new Worker( const filterWorker = new Worker(
"classifyVideo", "classifyVideo",
@ -16,11 +16,11 @@ const filterWorker = new Worker(
break; break;
} }
}, },
{ connection: redis, concurrency: 1, removeOnComplete: { count: 1000 } }, { connection: redis, concurrency: 4, removeOnComplete: { count: 1000 } },
); );
filterWorker.on("active", () => { filterWorker.on("active", () => {
logger.log("Worker activated.", "mq"); logger.log("Worker (filter) activated.", "mq");
}); });
filterWorker.on("error", (err) => { filterWorker.on("error", (err) => {

View File

@ -5,19 +5,7 @@ import logger from "lib/log/logger.ts";
import { getVideoTagsWorker } from "lib/mq/exec/getVideoTags.ts"; import { getVideoTagsWorker } from "lib/mq/exec/getVideoTags.ts";
import { getVideoTagsInitializer } from "lib/mq/exec/getVideoTags.ts"; import { getVideoTagsInitializer } from "lib/mq/exec/getVideoTags.ts";
import { lockManager } from "lib/mq/lockManager.ts"; import { lockManager } from "lib/mq/lockManager.ts";
import { WorkerError } from "../lib/mq/schema.ts";
export class WorkerError extends Error {
public service?: string;
public codePath?: string;
public rawError: Error;
constructor(rawError: Error, service?: string, codePath?: string) {
super(rawError.message);
this.name = "WorkerFailure";
this.codePath = codePath;
this.service = service;
this.rawError = rawError;
}
}
Deno.addSignalListener("SIGINT", async () => { Deno.addSignalListener("SIGINT", async () => {
logger.log("SIGINT Received: Shutting down workers...", "mq"); logger.log("SIGINT Received: Shutting down workers...", "mq");
@ -48,7 +36,7 @@ const latestVideoWorker = new Worker(
); );
latestVideoWorker.on("active", () => { latestVideoWorker.on("active", () => {
logger.log("Worker activated.", "mq"); logger.log("Worker (latestVideos) activated.", "mq");
}); });
latestVideoWorker.on("error", (err) => { latestVideoWorker.on("error", (err) => {
@ -82,7 +70,7 @@ const videoTagsWorker = new Worker(
); );
videoTagsWorker.on("active", () => { videoTagsWorker.on("active", () => {
logger.log("Worker activated.", "mq"); logger.log("Worker (videoTags) activated.", "mq");
}); });
videoTagsWorker.on("error", (err) => { videoTagsWorker.on("error", (err) => {