Compare commits
4 Commits
bfc622bfda
...
b14a43f63f
Author | SHA1 | Date | |
---|---|---|---|
b14a43f63f | |||
cecc1c1d2c | |||
7946cb6e96 | |||
4b48357ab6 |
@ -10,10 +10,11 @@
|
|||||||
"build": "deno run -A dev.ts build",
|
"build": "deno run -A dev.ts build",
|
||||||
"preview": "deno run -A main.ts",
|
"preview": "deno run -A main.ts",
|
||||||
"update": "deno run -A -r https://fresh.deno.dev/update .",
|
"update": "deno run -A -r https://fresh.deno.dev/update .",
|
||||||
"worker": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/worker.ts",
|
"worker:main": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/worker.ts",
|
||||||
|
"worker:filter": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/filterWorker.ts",
|
||||||
"adder": "deno run --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts",
|
"adder": "deno run --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts",
|
||||||
"bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts",
|
"bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts",
|
||||||
"all": "concurrently 'deno task start' 'deno task worker' 'deno task adder' 'deno task bullui'",
|
"all": "concurrently 'deno task start' 'deno task worker:main' 'deno task adder' 'deno task bullui' 'deno task worker:filter'",
|
||||||
"test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run"
|
"test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run"
|
||||||
},
|
},
|
||||||
"lint": {
|
"lint": {
|
||||||
|
@ -1,20 +1,37 @@
|
|||||||
import { Client, Transaction } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
import { Client, Transaction } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
import { AllDataType } from "lib/db/schema.d.ts";
|
import { AllDataType } from "lib/db/schema.d.ts";
|
||||||
import logger from "lib/log/logger.ts";
|
import logger from "lib/log/logger.ts";
|
||||||
import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts";
|
import { formatTimestampToPsql, parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts";
|
||||||
|
import { VideoListVideo } from "lib/net/bilibili.d.ts";
|
||||||
|
import { HOUR, SECOND } from "$std/datetime/constants.ts";
|
||||||
|
import { modelVersion } from "lib/ml/filter_inference.ts";
|
||||||
|
|
||||||
export async function videoExistsInAllData(client: Client, aid: number) {
|
export async function videoExistsInAllData(client: Client, aid: number) {
|
||||||
return await client.queryObject<{ exists: boolean }>(`SELECT EXISTS(SELECT 1 FROM all_data WHERE aid = $1)`, [aid])
|
return await client.queryObject<{ exists: boolean }>(`SELECT EXISTS(SELECT 1 FROM all_data WHERE aid = $1)`, [aid])
|
||||||
.then((result) => result.rows[0].exists);
|
.then((result) => result.rows[0].exists);
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function insertIntoAllData(client: Client, data: AllDataType) {
|
export async function biliUserExists(client: Client, uid: number) {
|
||||||
|
return await client.queryObject<{ exists: boolean }>(`SELECT EXISTS(SELECT 1 FROM bili_user WHERE uid = $1)`, [uid])
|
||||||
|
.then((result) => result.rows[0].exists);
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function insertIntoAllData(client: Client, data: VideoListVideo) {
|
||||||
logger.log(`inserted ${data.aid}`, "db-all_data");
|
logger.log(`inserted ${data.aid}`, "db-all_data");
|
||||||
return await client.queryObject(
|
await client.queryObject(
|
||||||
`INSERT INTO all_data (aid, bvid, description, uid, tags, title, published_at)
|
`INSERT INTO all_data (aid, bvid, description, uid, tags, title, published_at, duration)
|
||||||
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||||
ON CONFLICT (aid) DO NOTHING`,
|
ON CONFLICT (aid) DO NOTHING`,
|
||||||
[data.aid, data.bvid, data.description, data.uid, data.tags, data.title, data.published_at],
|
[
|
||||||
|
data.aid,
|
||||||
|
data.bvid,
|
||||||
|
data.desc,
|
||||||
|
data.owner.mid,
|
||||||
|
null,
|
||||||
|
data.title,
|
||||||
|
formatTimestampToPsql(data.pubdate * SECOND + 8 * HOUR),
|
||||||
|
data.duration,
|
||||||
|
],
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -59,3 +76,25 @@ export async function getNullVideoTagsList(client: Client) {
|
|||||||
},
|
},
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function getUnlabeledVideos(client: Client) {
|
||||||
|
const queryResult = await client.queryObject<{ aid: number }>(
|
||||||
|
`SELECT a.aid FROM all_data a LEFT JOIN labelling_result l ON a.aid = l.aid WHERE l.aid IS NULL`,
|
||||||
|
);
|
||||||
|
return queryResult.rows.map((row) => row.aid);
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function insertVideoLabel(client: Client, aid: number, label: number) {
|
||||||
|
return await client.queryObject(
|
||||||
|
`INSERT INTO labelling_result (aid, label, model_version) VALUES ($1, $2, $3) ON CONFLICT (aid, model_version) DO NOTHING`,
|
||||||
|
[aid, label, modelVersion],
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function getVideoInfoFromAllData(client: Client, aid: number) {
|
||||||
|
const queryResult = await client.queryObject<AllDataType>(
|
||||||
|
`SELECT * FROM all_data WHERE aid = $1`,
|
||||||
|
[aid],
|
||||||
|
);
|
||||||
|
return queryResult.rows[0];
|
||||||
|
}
|
||||||
|
@ -1,21 +1,20 @@
|
|||||||
import { AutoTokenizer } from "@huggingface/transformers";
|
import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers";
|
||||||
import * as ort from "onnxruntime";
|
import * as ort from "onnxruntime";
|
||||||
|
import logger from "lib/log/logger.ts";
|
||||||
|
import { WorkerError } from "../mq/schema.ts";
|
||||||
|
|
||||||
// 模型路径和名称
|
|
||||||
const tokenizerModel = "alikia2x/jina-embedding-v3-m2v-1024";
|
const tokenizerModel = "alikia2x/jina-embedding-v3-m2v-1024";
|
||||||
const onnxClassifierPath = "./model/video_classifier_v3_11.onnx";
|
const onnxClassifierPath = "./model/video_classifier_v3_11.onnx";
|
||||||
const onnxEmbeddingOriginalPath = "./model/model.onnx";
|
const onnxEmbeddingOriginalPath = "./model/model.onnx";
|
||||||
export const modelVersion = "3.11";
|
export const modelVersion = "3.11";
|
||||||
|
|
||||||
// 全局变量,用于存储模型和分词器
|
|
||||||
let sessionClassifier: ort.InferenceSession | null = null;
|
let sessionClassifier: ort.InferenceSession | null = null;
|
||||||
let sessionEmbedding: ort.InferenceSession | null = null;
|
let sessionEmbedding: ort.InferenceSession | null = null;
|
||||||
let tokenizer: any | null = null;
|
let tokenizer: PreTrainedTokenizer | null = null;
|
||||||
|
|
||||||
// 初始化分词器和ONNX会话
|
export async function initializeModels() {
|
||||||
async function initializeModels() {
|
|
||||||
if (tokenizer && sessionClassifier && sessionEmbedding) {
|
if (tokenizer && sessionClassifier && sessionEmbedding) {
|
||||||
return; // 模型已加载,无需重复加载
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@ -30,8 +29,8 @@ async function initializeModels() {
|
|||||||
sessionClassifier = classifierSession;
|
sessionClassifier = classifierSession;
|
||||||
sessionEmbedding = embeddingSession;
|
sessionEmbedding = embeddingSession;
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error("Error initializing models:", error);
|
const e = new WorkerError(error as Error, "ml", "fn:initializeModels");
|
||||||
throw error; // 重新抛出错误,以便调用方处理
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -51,14 +50,12 @@ async function getONNXEmbeddings(texts: string[], session: ort.InferenceSession)
|
|||||||
return_tensor: false,
|
return_tensor: false,
|
||||||
});
|
});
|
||||||
|
|
||||||
// 构造输入参数
|
|
||||||
const cumsum = (arr: number[]): number[] =>
|
const cumsum = (arr: number[]): number[] =>
|
||||||
arr.reduce((acc: number[], num: number, i: number) => [...acc, num + (acc[i - 1] || 0)], []);
|
arr.reduce((acc: number[], num: number, i: number) => [...acc, num + (acc[i - 1] || 0)], []);
|
||||||
|
|
||||||
const offsets: number[] = [0, ...cumsum(input_ids.slice(0, -1).map((x: string) => x.length))];
|
const offsets: number[] = [0, ...cumsum(input_ids.slice(0, -1).map((x: string) => x.length))];
|
||||||
const flattened_input_ids = input_ids.flat();
|
const flattened_input_ids = input_ids.flat();
|
||||||
|
|
||||||
// 准备ONNX输入
|
|
||||||
const inputs = {
|
const inputs = {
|
||||||
input_ids: new ort.Tensor("int64", new BigInt64Array(flattened_input_ids.map(BigInt)), [
|
input_ids: new ort.Tensor("int64", new BigInt64Array(flattened_input_ids.map(BigInt)), [
|
||||||
flattened_input_ids.length,
|
flattened_input_ids.length,
|
||||||
@ -66,12 +63,11 @@ async function getONNXEmbeddings(texts: string[], session: ort.InferenceSession)
|
|||||||
offsets: new ort.Tensor("int64", new BigInt64Array(offsets.map(BigInt)), [offsets.length]),
|
offsets: new ort.Tensor("int64", new BigInt64Array(offsets.map(BigInt)), [offsets.length]),
|
||||||
};
|
};
|
||||||
|
|
||||||
// 执行推理
|
|
||||||
const { embeddings } = await session.run(inputs);
|
const { embeddings } = await session.run(inputs);
|
||||||
return Array.from(embeddings.data as Float32Array);
|
return Array.from(embeddings.data as Float32Array);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 分类推理函数
|
|
||||||
async function runClassification(embeddings: number[]): Promise<number[]> {
|
async function runClassification(embeddings: number[]): Promise<number[]> {
|
||||||
if (!sessionClassifier) {
|
if (!sessionClassifier) {
|
||||||
throw new Error("Classifier session is not initialized. Call initializeModels() first.");
|
throw new Error("Classifier session is not initialized. Call initializeModels() first.");
|
||||||
@ -85,13 +81,13 @@ async function runClassification(embeddings: number[]): Promise<number[]> {
|
|||||||
return softmax(logits.data as Float32Array);
|
return softmax(logits.data as Float32Array);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 导出分类函数
|
|
||||||
export async function classifyVideo(
|
export async function classifyVideo(
|
||||||
title: string,
|
title: string,
|
||||||
description: string,
|
description: string,
|
||||||
tags: string,
|
tags: string,
|
||||||
author_info: string,
|
author_info: string,
|
||||||
): Promise<number[]> {
|
aid: number
|
||||||
|
): Promise<number> {
|
||||||
if (!sessionEmbedding) {
|
if (!sessionEmbedding) {
|
||||||
throw new Error("Embedding session is not initialized. Call initializeModels() first.");
|
throw new Error("Embedding session is not initialized. Call initializeModels() first.");
|
||||||
}
|
}
|
||||||
@ -101,7 +97,7 @@ export async function classifyVideo(
|
|||||||
tags,
|
tags,
|
||||||
author_info,
|
author_info,
|
||||||
], sessionEmbedding);
|
], sessionEmbedding);
|
||||||
|
|
||||||
const probabilities = await runClassification(embeddings);
|
const probabilities = await runClassification(embeddings);
|
||||||
return probabilities;
|
logger.log(`Prediction result for aid: ${aid}: [${probabilities.map((p) => p.toFixed(5))}]`, "ml")
|
||||||
|
return probabilities.indexOf(Math.max(...probabilities));
|
||||||
}
|
}
|
||||||
|
47
lib/mq/exec/classifyVideo.ts
Normal file
47
lib/mq/exec/classifyVideo.ts
Normal file
@ -0,0 +1,47 @@
|
|||||||
|
import { Job } from "bullmq";
|
||||||
|
import { db } from "lib/db/init.ts";
|
||||||
|
import { getUnlabeledVideos, getVideoInfoFromAllData, insertVideoLabel} from "lib/db/allData.ts";
|
||||||
|
import { classifyVideo, initializeModels } from "lib/ml/filter_inference.ts";
|
||||||
|
import { ClassifyVideoQueue } from "lib/mq/index.ts";
|
||||||
|
import logger from "lib/log/logger.ts";
|
||||||
|
|
||||||
|
export const classifyVideoWorker = async (job: Job) => {
|
||||||
|
const client = await db.connect();
|
||||||
|
const aid = job.data.aid;
|
||||||
|
if (!aid) {
|
||||||
|
return 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
const videoInfo = await getVideoInfoFromAllData(client, aid);
|
||||||
|
const title = videoInfo.title?.trim() || "untitled";
|
||||||
|
const description = videoInfo.description?.trim() || "N/A";
|
||||||
|
const tags = videoInfo.tags?.trim() || "empty";
|
||||||
|
const authorInfo = "No";
|
||||||
|
const label = await classifyVideo(title, description, tags, authorInfo, aid);
|
||||||
|
if (label == -1) {
|
||||||
|
logger.warn(`Failed to classify video ${aid}`, "ml");
|
||||||
|
}
|
||||||
|
insertVideoLabel(client, aid, label);
|
||||||
|
|
||||||
|
client.release();
|
||||||
|
|
||||||
|
job.updateData({
|
||||||
|
...job.data, label: label,
|
||||||
|
});
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
export const classifyVideosWorker = async () => {
|
||||||
|
await initializeModels();
|
||||||
|
const client = await db.connect();
|
||||||
|
const videos = await getUnlabeledVideos(client);
|
||||||
|
client.release();
|
||||||
|
let i = 0;
|
||||||
|
for (const aid of videos) {
|
||||||
|
if (i > 200) return 10000 + i;
|
||||||
|
await ClassifyVideoQueue.add("classifyVideo", { aid: Number(aid) });
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
};
|
@ -8,7 +8,7 @@ import logger from "lib/log/logger.ts";
|
|||||||
import { getNullVideoTagsList, updateVideoTags } from "lib/db/allData.ts";
|
import { getNullVideoTagsList, updateVideoTags } from "lib/db/allData.ts";
|
||||||
import { getVideoTags } from "lib/net/getVideoTags.ts";
|
import { getVideoTags } from "lib/net/getVideoTags.ts";
|
||||||
import { NetSchedulerError } from "lib/mq/scheduler.ts";
|
import { NetSchedulerError } from "lib/mq/scheduler.ts";
|
||||||
import { WorkerError } from "src/worker.ts";
|
import { WorkerError } from "../schema.ts";
|
||||||
|
|
||||||
const delayMap = [0.5, 3, 5, 15, 30, 60];
|
const delayMap = [0.5, 3, 5, 15, 30, 60];
|
||||||
const getJobPriority = (diff: number) => {
|
const getJobPriority = (diff: number) => {
|
||||||
|
@ -1,22 +1,19 @@
|
|||||||
import { MINUTE, SECOND } from "$std/datetime/constants.ts";
|
import { MINUTE, SECOND } from "$std/datetime/constants.ts";
|
||||||
import { LatestVideosQueue, VideoTagsQueue } from "lib/mq/index.ts";
|
import { ClassifyVideoQueue, LatestVideosQueue, VideoTagsQueue } from "lib/mq/index.ts";
|
||||||
import logger from "lib/log/logger.ts";
|
import logger from "lib/log/logger.ts";
|
||||||
|
|
||||||
async function configGetLatestVideos() {
|
export async function initMQ() {
|
||||||
await LatestVideosQueue.upsertJobScheduler("getLatestVideos", {
|
await LatestVideosQueue.upsertJobScheduler("getLatestVideos", {
|
||||||
every: 1 * MINUTE,
|
every: 1 * MINUTE
|
||||||
});
|
});
|
||||||
}
|
|
||||||
|
|
||||||
async function configGetVideosTags() {
|
|
||||||
await VideoTagsQueue.upsertJobScheduler("getVideosTags", {
|
await VideoTagsQueue.upsertJobScheduler("getVideosTags", {
|
||||||
every: 30 * SECOND,
|
every: 30 * SECOND,
|
||||||
immediately: true,
|
immediately: true,
|
||||||
});
|
});
|
||||||
}
|
await ClassifyVideoQueue.upsertJobScheduler("classifyVideos", {
|
||||||
|
every: 30 * SECOND,
|
||||||
|
immediately: true,
|
||||||
|
})
|
||||||
|
|
||||||
export async function initMQ() {
|
|
||||||
await configGetLatestVideos();
|
|
||||||
await configGetVideosTags();
|
|
||||||
logger.log("Message queue initialized.");
|
logger.log("Message queue initialized.");
|
||||||
}
|
}
|
||||||
|
12
lib/mq/schema.ts
Normal file
12
lib/mq/schema.ts
Normal file
@ -0,0 +1,12 @@
|
|||||||
|
export class WorkerError extends Error {
|
||||||
|
public service?: string;
|
||||||
|
public codePath?: string;
|
||||||
|
public rawError: Error;
|
||||||
|
constructor(rawError: Error, service?: string, codePath?: string) {
|
||||||
|
super(rawError.message);
|
||||||
|
this.name = "WorkerFailure";
|
||||||
|
this.codePath = codePath;
|
||||||
|
this.service = service;
|
||||||
|
this.rawError = rawError;
|
||||||
|
}
|
||||||
|
}
|
@ -1,7 +1,8 @@
|
|||||||
import { getLatestVideos } from "lib/net/getLatestVideos.ts";
|
import { getLatestVideos } from "lib/net/getLatestVideos.ts";
|
||||||
import { AllDataType } from "lib/db/schema.d.ts";
|
import { SECOND } from "$std/datetime/constants.ts";
|
||||||
|
import { VideoListVideo } from "lib/net/bilibili.d.ts";
|
||||||
|
|
||||||
export async function getVideoPositionInNewList(timestamp: number): Promise<number | null | AllDataType[]> {
|
export async function getVideoPositionInNewList(timestamp: number): Promise<number | null | VideoListVideo[]> {
|
||||||
const virtualPageSize = 50;
|
const virtualPageSize = 50;
|
||||||
|
|
||||||
let lowPage = 1;
|
let lowPage = 1;
|
||||||
@ -10,16 +11,15 @@ export async function getVideoPositionInNewList(timestamp: number): Promise<numb
|
|||||||
while (true) {
|
while (true) {
|
||||||
const ps = highPage < 2 ? 50 : 1
|
const ps = highPage < 2 ? 50 : 1
|
||||||
const pn = highPage < 2 ? 1 : highPage * virtualPageSize;
|
const pn = highPage < 2 ? 1 : highPage * virtualPageSize;
|
||||||
const fetchTags = highPage < 2 ? true : false;
|
const videos = await getLatestVideos(pn, ps);
|
||||||
const videos = await getLatestVideos(pn, ps, 250, fetchTags);
|
|
||||||
if (!videos || videos.length === 0) {
|
if (!videos || videos.length === 0) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
const lastVideo = videos[videos.length - 1];
|
const lastVideo = videos[videos.length - 1];
|
||||||
if (!lastVideo || !lastVideo.published_at) {
|
if (!lastVideo || !lastVideo.pubdate) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
const lastTime = Date.parse(lastVideo.published_at);
|
const lastTime = lastVideo.pubdate * SECOND
|
||||||
if (lastTime <= timestamp && highPage == 1) {
|
if (lastTime <= timestamp && highPage == 1) {
|
||||||
return videos;
|
return videos;
|
||||||
}
|
}
|
||||||
@ -41,7 +41,7 @@ export async function getVideoPositionInNewList(timestamp: number): Promise<numb
|
|||||||
let hi = highPage;
|
let hi = highPage;
|
||||||
while (lo <= hi) {
|
while (lo <= hi) {
|
||||||
const mid = Math.floor((lo + hi) / 2);
|
const mid = Math.floor((lo + hi) / 2);
|
||||||
const videos = await getLatestVideos(mid * virtualPageSize, 1, 250, false);
|
const videos = await getLatestVideos(mid * virtualPageSize, 1);
|
||||||
if (!videos) {
|
if (!videos) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
@ -50,11 +50,11 @@ export async function getVideoPositionInNewList(timestamp: number): Promise<numb
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
const lastVideo = videos[videos.length - 1];
|
const lastVideo = videos[videos.length - 1];
|
||||||
if (!lastVideo || !lastVideo.published_at) {
|
if (!lastVideo || !lastVideo.pubdate) {
|
||||||
hi = mid - 1;
|
hi = mid - 1;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
const lastTime = Date.parse(lastVideo.published_at);
|
const lastTime = lastVideo.pubdate * SECOND
|
||||||
if (lastTime > timestamp) {
|
if (lastTime > timestamp) {
|
||||||
lo = mid + 1;
|
lo = mid + 1;
|
||||||
} else {
|
} else {
|
||||||
@ -63,15 +63,15 @@ export async function getVideoPositionInNewList(timestamp: number): Promise<numb
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const boundaryVideos = await getLatestVideos(boundaryPage, virtualPageSize, 250, false);
|
const boundaryVideos = await getLatestVideos(boundaryPage, virtualPageSize);
|
||||||
let indexInPage = 0;
|
let indexInPage = 0;
|
||||||
if (boundaryVideos && boundaryVideos.length > 0) {
|
if (boundaryVideos && boundaryVideos.length > 0) {
|
||||||
for (let i = 0; i < boundaryVideos.length; i++) {
|
for (let i = 0; i < boundaryVideos.length; i++) {
|
||||||
const video = boundaryVideos[i];
|
const video = boundaryVideos[i];
|
||||||
if (!video.published_at) {
|
if (!video.pubdate) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
const videoTime = Date.parse(video.published_at);
|
const videoTime = video.pubdate * SECOND
|
||||||
if (videoTime > timestamp) {
|
if (videoTime > timestamp) {
|
||||||
indexInPage++;
|
indexInPage++;
|
||||||
} else {
|
} else {
|
||||||
|
@ -1,15 +1,10 @@
|
|||||||
import { VideoListResponse } from "lib/net/bilibili.d.ts";
|
import { VideoListResponse, VideoListVideo } from "lib/net/bilibili.d.ts";
|
||||||
import { formatTimestampToPsql as formatPublishedAt } from "lib/utils/formatTimestampToPostgre.ts";
|
|
||||||
import { AllDataType } from "lib/db/schema.d.ts";
|
|
||||||
import logger from "lib/log/logger.ts";
|
import logger from "lib/log/logger.ts";
|
||||||
import { HOUR, SECOND } from "$std/datetime/constants.ts";
|
|
||||||
|
|
||||||
export async function getLatestVideos(
|
export async function getLatestVideos(
|
||||||
page: number = 1,
|
page: number = 1,
|
||||||
pageSize: number = 10,
|
pageSize: number = 10
|
||||||
sleepRate: number = 250,
|
): Promise<VideoListVideo[] | null> {
|
||||||
fetchTags: boolean = true,
|
|
||||||
): Promise<AllDataType[] | null> {
|
|
||||||
try {
|
try {
|
||||||
const response = await fetch(
|
const response = await fetch(
|
||||||
`https://api.bilibili.com/x/web-interface/newlist?rid=30&ps=${pageSize}&pn=${page}`,
|
`https://api.bilibili.com/x/web-interface/newlist?rid=30&ps=${pageSize}&pn=${page}`,
|
||||||
@ -26,18 +21,7 @@ export async function getLatestVideos(
|
|||||||
return [];
|
return [];
|
||||||
}
|
}
|
||||||
|
|
||||||
return data.data.archives.map((video) => {
|
return data.data.archives;
|
||||||
const published_at = formatPublishedAt(video.pubdate * SECOND + 8 * HOUR);
|
|
||||||
return {
|
|
||||||
aid: video.aid,
|
|
||||||
bvid: video.bvid,
|
|
||||||
description: video.desc,
|
|
||||||
uid: video.owner.mid,
|
|
||||||
tags: null,
|
|
||||||
title: video.title,
|
|
||||||
published_at: published_at,
|
|
||||||
} as AllDataType;
|
|
||||||
});
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error(error as Error, "net", "getLatestVideos");
|
logger.error(error as Error, "net", "getLatestVideos");
|
||||||
return null;
|
return null;
|
||||||
|
@ -9,7 +9,6 @@ import logger from "lib/log/logger.ts";
|
|||||||
export async function insertLatestVideos(
|
export async function insertLatestVideos(
|
||||||
client: Client,
|
client: Client,
|
||||||
pageSize: number = 10,
|
pageSize: number = 10,
|
||||||
sleepRate: number = 250,
|
|
||||||
intervalRate: number = 4000,
|
intervalRate: number = 4000,
|
||||||
): Promise<number | null> {
|
): Promise<number | null> {
|
||||||
const latestVideoTimestamp = await getLatestVideoTimestampFromAllData(client);
|
const latestVideoTimestamp = await getLatestVideoTimestampFromAllData(client);
|
||||||
@ -27,7 +26,7 @@ export async function insertLatestVideos(
|
|||||||
for (const video of videoIndex) {
|
for (const video of videoIndex) {
|
||||||
const videoExists = await videoExistsInAllData(client, video.aid);
|
const videoExists = await videoExistsInAllData(client, video.aid);
|
||||||
if (!videoExists) {
|
if (!videoExists) {
|
||||||
insertIntoAllData(client, video);
|
await insertIntoAllData(client, video);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
@ -37,7 +36,7 @@ export async function insertLatestVideos(
|
|||||||
const insertedVideos = new Set();
|
const insertedVideos = new Set();
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
const videos = await getLatestVideos(page, pageSize, sleepRate);
|
const videos = await getLatestVideos(page, pageSize);
|
||||||
if (videos == null) {
|
if (videos == null) {
|
||||||
failCount++;
|
failCount++;
|
||||||
if (failCount > 5) {
|
if (failCount > 5) {
|
||||||
@ -53,7 +52,7 @@ export async function insertLatestVideos(
|
|||||||
for (const video of videos) {
|
for (const video of videos) {
|
||||||
const videoExists = await videoExistsInAllData(client, video.aid);
|
const videoExists = await videoExistsInAllData(client, video.aid);
|
||||||
if (!videoExists) {
|
if (!videoExists) {
|
||||||
insertIntoAllData(client, video);
|
await insertIntoAllData(client, video);
|
||||||
insertedVideos.add(video.aid);
|
insertedVideos.add(video.aid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -68,7 +67,7 @@ export async function insertLatestVideos(
|
|||||||
if (failCount > 5) {
|
if (failCount > 5) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
continue;
|
|
||||||
} finally {
|
} finally {
|
||||||
await sleep(Math.random() * intervalRate + failCount * 3 * SECOND + SECOND);
|
await sleep(Math.random() * intervalRate + failCount * 3 * SECOND + SECOND);
|
||||||
}
|
}
|
||||||
|
@ -2,13 +2,13 @@ import express from "express";
|
|||||||
import { createBullBoard } from "@bull-board/api";
|
import { createBullBoard } from "@bull-board/api";
|
||||||
import { BullMQAdapter } from "@bull-board/api/bullMQAdapter.js";
|
import { BullMQAdapter } from "@bull-board/api/bullMQAdapter.js";
|
||||||
import { ExpressAdapter } from "@bull-board/express";
|
import { ExpressAdapter } from "@bull-board/express";
|
||||||
import { LatestVideosQueue, VideoTagsQueue } from "lib/mq/index.ts";
|
import { ClassifyVideoQueue, LatestVideosQueue, VideoTagsQueue } from "lib/mq/index.ts";
|
||||||
|
|
||||||
const serverAdapter = new ExpressAdapter();
|
const serverAdapter = new ExpressAdapter();
|
||||||
serverAdapter.setBasePath("/");
|
serverAdapter.setBasePath("/");
|
||||||
|
|
||||||
createBullBoard({
|
createBullBoard({
|
||||||
queues: [new BullMQAdapter(LatestVideosQueue), new BullMQAdapter(VideoTagsQueue)],
|
queues: [new BullMQAdapter(LatestVideosQueue), new BullMQAdapter(VideoTagsQueue), new BullMQAdapter(ClassifyVideoQueue)],
|
||||||
serverAdapter: serverAdapter,
|
serverAdapter: serverAdapter,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -1,25 +1,26 @@
|
|||||||
import { Job, Worker } from "bullmq";
|
import { Job, Worker } from "bullmq";
|
||||||
import { redis } from "lib/db/redis.ts";
|
import { redis } from "lib/db/redis.ts";
|
||||||
import logger from "lib/log/logger.ts";
|
import logger from "lib/log/logger.ts";
|
||||||
import { WorkerError } from "src/worker.ts";
|
import { classifyVideosWorker, classifyVideoWorker } from "lib/mq/exec/classifyVideo.ts";
|
||||||
|
import { WorkerError } from "../lib/mq/schema.ts";
|
||||||
|
|
||||||
const filterWorker = new Worker(
|
const filterWorker = new Worker(
|
||||||
"classifyVideo",
|
"classifyVideo",
|
||||||
async (job: Job) => {
|
async (job: Job) => {
|
||||||
switch (job.name) {
|
switch (job.name) {
|
||||||
case "classifyVideo":
|
case "classifyVideo":
|
||||||
return await getVideoTagsWorker(job);
|
return await classifyVideoWorker(job);
|
||||||
case "classifyVideos":
|
case "classifyVideos":
|
||||||
return await getVideoTagsInitializer();
|
return await classifyVideosWorker();
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
{ connection: redis, concurrency: 1, removeOnComplete: { count: 1440 } },
|
{ connection: redis, concurrency: 4, removeOnComplete: { count: 1000 } },
|
||||||
);
|
);
|
||||||
|
|
||||||
filterWorker.on("active", () => {
|
filterWorker.on("active", () => {
|
||||||
logger.log("Worker activated.", "mq");
|
logger.log("Worker (filter) activated.", "mq");
|
||||||
});
|
});
|
||||||
|
|
||||||
filterWorker.on("error", (err) => {
|
filterWorker.on("error", (err) => {
|
||||||
|
@ -5,19 +5,7 @@ import logger from "lib/log/logger.ts";
|
|||||||
import { getVideoTagsWorker } from "lib/mq/exec/getVideoTags.ts";
|
import { getVideoTagsWorker } from "lib/mq/exec/getVideoTags.ts";
|
||||||
import { getVideoTagsInitializer } from "lib/mq/exec/getVideoTags.ts";
|
import { getVideoTagsInitializer } from "lib/mq/exec/getVideoTags.ts";
|
||||||
import { lockManager } from "lib/mq/lockManager.ts";
|
import { lockManager } from "lib/mq/lockManager.ts";
|
||||||
|
import { WorkerError } from "../lib/mq/schema.ts";
|
||||||
export class WorkerError extends Error {
|
|
||||||
public service?: string;
|
|
||||||
public codePath?: string;
|
|
||||||
public rawError: Error;
|
|
||||||
constructor(rawError: Error, service?: string, codePath?: string) {
|
|
||||||
super(rawError.message);
|
|
||||||
this.name = "WorkerFailure";
|
|
||||||
this.codePath = codePath;
|
|
||||||
this.service = service;
|
|
||||||
this.rawError = rawError;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Deno.addSignalListener("SIGINT", async () => {
|
Deno.addSignalListener("SIGINT", async () => {
|
||||||
logger.log("SIGINT Received: Shutting down workers...", "mq");
|
logger.log("SIGINT Received: Shutting down workers...", "mq");
|
||||||
@ -48,7 +36,7 @@ const latestVideoWorker = new Worker(
|
|||||||
);
|
);
|
||||||
|
|
||||||
latestVideoWorker.on("active", () => {
|
latestVideoWorker.on("active", () => {
|
||||||
logger.log("Worker activated.", "mq");
|
logger.log("Worker (latestVideos) activated.", "mq");
|
||||||
});
|
});
|
||||||
|
|
||||||
latestVideoWorker.on("error", (err) => {
|
latestVideoWorker.on("error", (err) => {
|
||||||
@ -82,7 +70,7 @@ const videoTagsWorker = new Worker(
|
|||||||
);
|
);
|
||||||
|
|
||||||
videoTagsWorker.on("active", () => {
|
videoTagsWorker.on("active", () => {
|
||||||
logger.log("Worker activated.", "mq");
|
logger.log("Worker (videoTags) activated.", "mq");
|
||||||
});
|
});
|
||||||
|
|
||||||
videoTagsWorker.on("error", (err) => {
|
videoTagsWorker.on("error", (err) => {
|
||||||
|
Loading…
Reference in New Issue
Block a user