Compare commits

...

4 Commits

13 changed files with 159 additions and 95 deletions

View File

@ -10,10 +10,11 @@
"build": "deno run -A dev.ts build",
"preview": "deno run -A main.ts",
"update": "deno run -A -r https://fresh.deno.dev/update .",
"worker": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/worker.ts",
"worker:main": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/worker.ts",
"worker:filter": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/filterWorker.ts",
"adder": "deno run --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts",
"bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts",
"all": "concurrently 'deno task start' 'deno task worker' 'deno task adder' 'deno task bullui'",
"all": "concurrently 'deno task start' 'deno task worker:main' 'deno task adder' 'deno task bullui' 'deno task worker:filter'",
"test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run"
},
"lint": {

View File

@ -1,20 +1,37 @@
import { Client, Transaction } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { AllDataType } from "lib/db/schema.d.ts";
import logger from "lib/log/logger.ts";
import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts";
import { formatTimestampToPsql, parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts";
import { VideoListVideo } from "lib/net/bilibili.d.ts";
import { HOUR, SECOND } from "$std/datetime/constants.ts";
import { modelVersion } from "lib/ml/filter_inference.ts";
export async function videoExistsInAllData(client: Client, aid: number) {
return await client.queryObject<{ exists: boolean }>(`SELECT EXISTS(SELECT 1 FROM all_data WHERE aid = $1)`, [aid])
.then((result) => result.rows[0].exists);
}
export async function insertIntoAllData(client: Client, data: AllDataType) {
export async function biliUserExists(client: Client, uid: number) {
return await client.queryObject<{ exists: boolean }>(`SELECT EXISTS(SELECT 1 FROM bili_user WHERE uid = $1)`, [uid])
.then((result) => result.rows[0].exists);
}
export async function insertIntoAllData(client: Client, data: VideoListVideo) {
logger.log(`inserted ${data.aid}`, "db-all_data");
return await client.queryObject(
`INSERT INTO all_data (aid, bvid, description, uid, tags, title, published_at)
VALUES ($1, $2, $3, $4, $5, $6, $7)
await client.queryObject(
`INSERT INTO all_data (aid, bvid, description, uid, tags, title, published_at, duration)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (aid) DO NOTHING`,
[data.aid, data.bvid, data.description, data.uid, data.tags, data.title, data.published_at],
[
data.aid,
data.bvid,
data.desc,
data.owner.mid,
null,
data.title,
formatTimestampToPsql(data.pubdate * SECOND + 8 * HOUR),
data.duration,
],
);
}
@ -59,3 +76,25 @@ export async function getNullVideoTagsList(client: Client) {
},
);
}
export async function getUnlabeledVideos(client: Client) {
const queryResult = await client.queryObject<{ aid: number }>(
`SELECT a.aid FROM all_data a LEFT JOIN labelling_result l ON a.aid = l.aid WHERE l.aid IS NULL`,
);
return queryResult.rows.map((row) => row.aid);
}
export async function insertVideoLabel(client: Client, aid: number, label: number) {
return await client.queryObject(
`INSERT INTO labelling_result (aid, label, model_version) VALUES ($1, $2, $3) ON CONFLICT (aid, model_version) DO NOTHING`,
[aid, label, modelVersion],
);
}
export async function getVideoInfoFromAllData(client: Client, aid: number) {
const queryResult = await client.queryObject<AllDataType>(
`SELECT * FROM all_data WHERE aid = $1`,
[aid],
);
return queryResult.rows[0];
}

View File

@ -1,21 +1,20 @@
import { AutoTokenizer } from "@huggingface/transformers";
import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers";
import * as ort from "onnxruntime";
import logger from "lib/log/logger.ts";
import { WorkerError } from "../mq/schema.ts";
// 模型路径和名称
const tokenizerModel = "alikia2x/jina-embedding-v3-m2v-1024";
const onnxClassifierPath = "./model/video_classifier_v3_11.onnx";
const onnxEmbeddingOriginalPath = "./model/model.onnx";
export const modelVersion = "3.11";
// 全局变量,用于存储模型和分词器
let sessionClassifier: ort.InferenceSession | null = null;
let sessionEmbedding: ort.InferenceSession | null = null;
let tokenizer: any | null = null;
let tokenizer: PreTrainedTokenizer | null = null;
// 初始化分词器和ONNX会话
async function initializeModels() {
export async function initializeModels() {
if (tokenizer && sessionClassifier && sessionEmbedding) {
return; // 模型已加载,无需重复加载
return;
}
try {
@ -30,8 +29,8 @@ async function initializeModels() {
sessionClassifier = classifierSession;
sessionEmbedding = embeddingSession;
} catch (error) {
console.error("Error initializing models:", error);
throw error; // 重新抛出错误,以便调用方处理
const e = new WorkerError(error as Error, "ml", "fn:initializeModels");
throw e;
}
}
@ -51,14 +50,12 @@ async function getONNXEmbeddings(texts: string[], session: ort.InferenceSession)
return_tensor: false,
});
// 构造输入参数
const cumsum = (arr: number[]): number[] =>
arr.reduce((acc: number[], num: number, i: number) => [...acc, num + (acc[i - 1] || 0)], []);
const offsets: number[] = [0, ...cumsum(input_ids.slice(0, -1).map((x: string) => x.length))];
const flattened_input_ids = input_ids.flat();
// 准备ONNX输入
const inputs = {
input_ids: new ort.Tensor("int64", new BigInt64Array(flattened_input_ids.map(BigInt)), [
flattened_input_ids.length,
@ -66,12 +63,11 @@ async function getONNXEmbeddings(texts: string[], session: ort.InferenceSession)
offsets: new ort.Tensor("int64", new BigInt64Array(offsets.map(BigInt)), [offsets.length]),
};
// 执行推理
const { embeddings } = await session.run(inputs);
return Array.from(embeddings.data as Float32Array);
}
// 分类推理函数
async function runClassification(embeddings: number[]): Promise<number[]> {
if (!sessionClassifier) {
throw new Error("Classifier session is not initialized. Call initializeModels() first.");
@ -85,13 +81,13 @@ async function runClassification(embeddings: number[]): Promise<number[]> {
return softmax(logits.data as Float32Array);
}
// 导出分类函数
export async function classifyVideo(
title: string,
description: string,
tags: string,
author_info: string,
): Promise<number[]> {
aid: number
): Promise<number> {
if (!sessionEmbedding) {
throw new Error("Embedding session is not initialized. Call initializeModels() first.");
}
@ -101,7 +97,7 @@ export async function classifyVideo(
tags,
author_info,
], sessionEmbedding);
const probabilities = await runClassification(embeddings);
return probabilities;
logger.log(`Prediction result for aid: ${aid}: [${probabilities.map((p) => p.toFixed(5))}]`, "ml")
return probabilities.indexOf(Math.max(...probabilities));
}

View File

@ -0,0 +1,47 @@
import { Job } from "bullmq";
import { db } from "lib/db/init.ts";
import { getUnlabeledVideos, getVideoInfoFromAllData, insertVideoLabel} from "lib/db/allData.ts";
import { classifyVideo, initializeModels } from "lib/ml/filter_inference.ts";
import { ClassifyVideoQueue } from "lib/mq/index.ts";
import logger from "lib/log/logger.ts";
export const classifyVideoWorker = async (job: Job) => {
const client = await db.connect();
const aid = job.data.aid;
if (!aid) {
return 3;
}
const videoInfo = await getVideoInfoFromAllData(client, aid);
const title = videoInfo.title?.trim() || "untitled";
const description = videoInfo.description?.trim() || "N/A";
const tags = videoInfo.tags?.trim() || "empty";
const authorInfo = "No";
const label = await classifyVideo(title, description, tags, authorInfo, aid);
if (label == -1) {
logger.warn(`Failed to classify video ${aid}`, "ml");
}
insertVideoLabel(client, aid, label);
client.release();
job.updateData({
...job.data, label: label,
});
return 0;
};
export const classifyVideosWorker = async () => {
await initializeModels();
const client = await db.connect();
const videos = await getUnlabeledVideos(client);
client.release();
let i = 0;
for (const aid of videos) {
if (i > 200) return 10000 + i;
await ClassifyVideoQueue.add("classifyVideo", { aid: Number(aid) });
i++;
}
return 0;
};

View File

@ -8,7 +8,7 @@ import logger from "lib/log/logger.ts";
import { getNullVideoTagsList, updateVideoTags } from "lib/db/allData.ts";
import { getVideoTags } from "lib/net/getVideoTags.ts";
import { NetSchedulerError } from "lib/mq/scheduler.ts";
import { WorkerError } from "src/worker.ts";
import { WorkerError } from "../schema.ts";
const delayMap = [0.5, 3, 5, 15, 30, 60];
const getJobPriority = (diff: number) => {

View File

@ -1,22 +1,19 @@
import { MINUTE, SECOND } from "$std/datetime/constants.ts";
import { LatestVideosQueue, VideoTagsQueue } from "lib/mq/index.ts";
import { ClassifyVideoQueue, LatestVideosQueue, VideoTagsQueue } from "lib/mq/index.ts";
import logger from "lib/log/logger.ts";
async function configGetLatestVideos() {
export async function initMQ() {
await LatestVideosQueue.upsertJobScheduler("getLatestVideos", {
every: 1 * MINUTE,
every: 1 * MINUTE
});
}
async function configGetVideosTags() {
await VideoTagsQueue.upsertJobScheduler("getVideosTags", {
every: 30 * SECOND,
immediately: true,
});
}
await ClassifyVideoQueue.upsertJobScheduler("classifyVideos", {
every: 30 * SECOND,
immediately: true,
})
export async function initMQ() {
await configGetLatestVideos();
await configGetVideosTags();
logger.log("Message queue initialized.");
}

12
lib/mq/schema.ts Normal file
View File

@ -0,0 +1,12 @@
export class WorkerError extends Error {
public service?: string;
public codePath?: string;
public rawError: Error;
constructor(rawError: Error, service?: string, codePath?: string) {
super(rawError.message);
this.name = "WorkerFailure";
this.codePath = codePath;
this.service = service;
this.rawError = rawError;
}
}

View File

@ -1,7 +1,8 @@
import { getLatestVideos } from "lib/net/getLatestVideos.ts";
import { AllDataType } from "lib/db/schema.d.ts";
import { SECOND } from "$std/datetime/constants.ts";
import { VideoListVideo } from "lib/net/bilibili.d.ts";
export async function getVideoPositionInNewList(timestamp: number): Promise<number | null | AllDataType[]> {
export async function getVideoPositionInNewList(timestamp: number): Promise<number | null | VideoListVideo[]> {
const virtualPageSize = 50;
let lowPage = 1;
@ -10,16 +11,15 @@ export async function getVideoPositionInNewList(timestamp: number): Promise<numb
while (true) {
const ps = highPage < 2 ? 50 : 1
const pn = highPage < 2 ? 1 : highPage * virtualPageSize;
const fetchTags = highPage < 2 ? true : false;
const videos = await getLatestVideos(pn, ps, 250, fetchTags);
const videos = await getLatestVideos(pn, ps);
if (!videos || videos.length === 0) {
break;
}
const lastVideo = videos[videos.length - 1];
if (!lastVideo || !lastVideo.published_at) {
if (!lastVideo || !lastVideo.pubdate) {
break;
}
const lastTime = Date.parse(lastVideo.published_at);
const lastTime = lastVideo.pubdate * SECOND
if (lastTime <= timestamp && highPage == 1) {
return videos;
}
@ -41,7 +41,7 @@ export async function getVideoPositionInNewList(timestamp: number): Promise<numb
let hi = highPage;
while (lo <= hi) {
const mid = Math.floor((lo + hi) / 2);
const videos = await getLatestVideos(mid * virtualPageSize, 1, 250, false);
const videos = await getLatestVideos(mid * virtualPageSize, 1);
if (!videos) {
return null;
}
@ -50,11 +50,11 @@ export async function getVideoPositionInNewList(timestamp: number): Promise<numb
continue;
}
const lastVideo = videos[videos.length - 1];
if (!lastVideo || !lastVideo.published_at) {
if (!lastVideo || !lastVideo.pubdate) {
hi = mid - 1;
continue;
}
const lastTime = Date.parse(lastVideo.published_at);
const lastTime = lastVideo.pubdate * SECOND
if (lastTime > timestamp) {
lo = mid + 1;
} else {
@ -63,15 +63,15 @@ export async function getVideoPositionInNewList(timestamp: number): Promise<numb
}
}
const boundaryVideos = await getLatestVideos(boundaryPage, virtualPageSize, 250, false);
const boundaryVideos = await getLatestVideos(boundaryPage, virtualPageSize);
let indexInPage = 0;
if (boundaryVideos && boundaryVideos.length > 0) {
for (let i = 0; i < boundaryVideos.length; i++) {
const video = boundaryVideos[i];
if (!video.published_at) {
if (!video.pubdate) {
continue;
}
const videoTime = Date.parse(video.published_at);
const videoTime = video.pubdate * SECOND
if (videoTime > timestamp) {
indexInPage++;
} else {

View File

@ -1,15 +1,10 @@
import { VideoListResponse } from "lib/net/bilibili.d.ts";
import { formatTimestampToPsql as formatPublishedAt } from "lib/utils/formatTimestampToPostgre.ts";
import { AllDataType } from "lib/db/schema.d.ts";
import { VideoListResponse, VideoListVideo } from "lib/net/bilibili.d.ts";
import logger from "lib/log/logger.ts";
import { HOUR, SECOND } from "$std/datetime/constants.ts";
export async function getLatestVideos(
page: number = 1,
pageSize: number = 10,
sleepRate: number = 250,
fetchTags: boolean = true,
): Promise<AllDataType[] | null> {
pageSize: number = 10
): Promise<VideoListVideo[] | null> {
try {
const response = await fetch(
`https://api.bilibili.com/x/web-interface/newlist?rid=30&ps=${pageSize}&pn=${page}`,
@ -26,18 +21,7 @@ export async function getLatestVideos(
return [];
}
return data.data.archives.map((video) => {
const published_at = formatPublishedAt(video.pubdate * SECOND + 8 * HOUR);
return {
aid: video.aid,
bvid: video.bvid,
description: video.desc,
uid: video.owner.mid,
tags: null,
title: video.title,
published_at: published_at,
} as AllDataType;
});
return data.data.archives;
} catch (error) {
logger.error(error as Error, "net", "getLatestVideos");
return null;

View File

@ -9,7 +9,6 @@ import logger from "lib/log/logger.ts";
export async function insertLatestVideos(
client: Client,
pageSize: number = 10,
sleepRate: number = 250,
intervalRate: number = 4000,
): Promise<number | null> {
const latestVideoTimestamp = await getLatestVideoTimestampFromAllData(client);
@ -27,7 +26,7 @@ export async function insertLatestVideos(
for (const video of videoIndex) {
const videoExists = await videoExistsInAllData(client, video.aid);
if (!videoExists) {
insertIntoAllData(client, video);
await insertIntoAllData(client, video);
}
}
return 0;
@ -37,7 +36,7 @@ export async function insertLatestVideos(
const insertedVideos = new Set();
while (true) {
try {
const videos = await getLatestVideos(page, pageSize, sleepRate);
const videos = await getLatestVideos(page, pageSize);
if (videos == null) {
failCount++;
if (failCount > 5) {
@ -53,7 +52,7 @@ export async function insertLatestVideos(
for (const video of videos) {
const videoExists = await videoExistsInAllData(client, video.aid);
if (!videoExists) {
insertIntoAllData(client, video);
await insertIntoAllData(client, video);
insertedVideos.add(video.aid);
}
}
@ -68,7 +67,7 @@ export async function insertLatestVideos(
if (failCount > 5) {
return null;
}
continue;
} finally {
await sleep(Math.random() * intervalRate + failCount * 3 * SECOND + SECOND);
}

View File

@ -2,13 +2,13 @@ import express from "express";
import { createBullBoard } from "@bull-board/api";
import { BullMQAdapter } from "@bull-board/api/bullMQAdapter.js";
import { ExpressAdapter } from "@bull-board/express";
import { LatestVideosQueue, VideoTagsQueue } from "lib/mq/index.ts";
import { ClassifyVideoQueue, LatestVideosQueue, VideoTagsQueue } from "lib/mq/index.ts";
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath("/");
createBullBoard({
queues: [new BullMQAdapter(LatestVideosQueue), new BullMQAdapter(VideoTagsQueue)],
queues: [new BullMQAdapter(LatestVideosQueue), new BullMQAdapter(VideoTagsQueue), new BullMQAdapter(ClassifyVideoQueue)],
serverAdapter: serverAdapter,
});

View File

@ -1,25 +1,26 @@
import { Job, Worker } from "bullmq";
import { redis } from "lib/db/redis.ts";
import logger from "lib/log/logger.ts";
import { WorkerError } from "src/worker.ts";
import { classifyVideosWorker, classifyVideoWorker } from "lib/mq/exec/classifyVideo.ts";
import { WorkerError } from "../lib/mq/schema.ts";
const filterWorker = new Worker(
"classifyVideo",
async (job: Job) => {
switch (job.name) {
case "classifyVideo":
return await getVideoTagsWorker(job);
return await classifyVideoWorker(job);
case "classifyVideos":
return await getVideoTagsInitializer();
return await classifyVideosWorker();
default:
break;
}
},
{ connection: redis, concurrency: 1, removeOnComplete: { count: 1440 } },
{ connection: redis, concurrency: 4, removeOnComplete: { count: 1000 } },
);
filterWorker.on("active", () => {
logger.log("Worker activated.", "mq");
logger.log("Worker (filter) activated.", "mq");
});
filterWorker.on("error", (err) => {

View File

@ -5,19 +5,7 @@ import logger from "lib/log/logger.ts";
import { getVideoTagsWorker } from "lib/mq/exec/getVideoTags.ts";
import { getVideoTagsInitializer } from "lib/mq/exec/getVideoTags.ts";
import { lockManager } from "lib/mq/lockManager.ts";
export class WorkerError extends Error {
public service?: string;
public codePath?: string;
public rawError: Error;
constructor(rawError: Error, service?: string, codePath?: string) {
super(rawError.message);
this.name = "WorkerFailure";
this.codePath = codePath;
this.service = service;
this.rawError = rawError;
}
}
import { WorkerError } from "../lib/mq/schema.ts";
Deno.addSignalListener("SIGINT", async () => {
logger.log("SIGINT Received: Shutting down workers...", "mq");
@ -48,7 +36,7 @@ const latestVideoWorker = new Worker(
);
latestVideoWorker.on("active", () => {
logger.log("Worker activated.", "mq");
logger.log("Worker (latestVideos) activated.", "mq");
});
latestVideoWorker.on("error", (err) => {
@ -82,7 +70,7 @@ const videoTagsWorker = new Worker(
);
videoTagsWorker.on("active", () => {
logger.log("Worker activated.", "mq");
logger.log("Worker (videoTags) activated.", "mq");
});
videoTagsWorker.on("error", (err) => {