Compare commits

...

4 Commits

13 changed files with 159 additions and 95 deletions

View File

@ -10,10 +10,11 @@
"build": "deno run -A dev.ts build", "build": "deno run -A dev.ts build",
"preview": "deno run -A main.ts", "preview": "deno run -A main.ts",
"update": "deno run -A -r https://fresh.deno.dev/update .", "update": "deno run -A -r https://fresh.deno.dev/update .",
"worker": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/worker.ts", "worker:main": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/worker.ts",
"worker:filter": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/filterWorker.ts",
"adder": "deno run --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts", "adder": "deno run --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts",
"bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts", "bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts",
"all": "concurrently 'deno task start' 'deno task worker' 'deno task adder' 'deno task bullui'", "all": "concurrently 'deno task start' 'deno task worker:main' 'deno task adder' 'deno task bullui' 'deno task worker:filter'",
"test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run" "test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run"
}, },
"lint": { "lint": {

View File

@ -1,20 +1,37 @@
import { Client, Transaction } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { Client, Transaction } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { AllDataType } from "lib/db/schema.d.ts"; import { AllDataType } from "lib/db/schema.d.ts";
import logger from "lib/log/logger.ts"; import logger from "lib/log/logger.ts";
import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts"; import { formatTimestampToPsql, parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts";
import { VideoListVideo } from "lib/net/bilibili.d.ts";
import { HOUR, SECOND } from "$std/datetime/constants.ts";
import { modelVersion } from "lib/ml/filter_inference.ts";
export async function videoExistsInAllData(client: Client, aid: number) { export async function videoExistsInAllData(client: Client, aid: number) {
return await client.queryObject<{ exists: boolean }>(`SELECT EXISTS(SELECT 1 FROM all_data WHERE aid = $1)`, [aid]) return await client.queryObject<{ exists: boolean }>(`SELECT EXISTS(SELECT 1 FROM all_data WHERE aid = $1)`, [aid])
.then((result) => result.rows[0].exists); .then((result) => result.rows[0].exists);
} }
export async function insertIntoAllData(client: Client, data: AllDataType) { export async function biliUserExists(client: Client, uid: number) {
return await client.queryObject<{ exists: boolean }>(`SELECT EXISTS(SELECT 1 FROM bili_user WHERE uid = $1)`, [uid])
.then((result) => result.rows[0].exists);
}
export async function insertIntoAllData(client: Client, data: VideoListVideo) {
logger.log(`inserted ${data.aid}`, "db-all_data"); logger.log(`inserted ${data.aid}`, "db-all_data");
return await client.queryObject( await client.queryObject(
`INSERT INTO all_data (aid, bvid, description, uid, tags, title, published_at) `INSERT INTO all_data (aid, bvid, description, uid, tags, title, published_at, duration)
VALUES ($1, $2, $3, $4, $5, $6, $7) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (aid) DO NOTHING`, ON CONFLICT (aid) DO NOTHING`,
[data.aid, data.bvid, data.description, data.uid, data.tags, data.title, data.published_at], [
data.aid,
data.bvid,
data.desc,
data.owner.mid,
null,
data.title,
formatTimestampToPsql(data.pubdate * SECOND + 8 * HOUR),
data.duration,
],
); );
} }
@ -59,3 +76,25 @@ export async function getNullVideoTagsList(client: Client) {
}, },
); );
} }
export async function getUnlabeledVideos(client: Client) {
const queryResult = await client.queryObject<{ aid: number }>(
`SELECT a.aid FROM all_data a LEFT JOIN labelling_result l ON a.aid = l.aid WHERE l.aid IS NULL`,
);
return queryResult.rows.map((row) => row.aid);
}
export async function insertVideoLabel(client: Client, aid: number, label: number) {
return await client.queryObject(
`INSERT INTO labelling_result (aid, label, model_version) VALUES ($1, $2, $3) ON CONFLICT (aid, model_version) DO NOTHING`,
[aid, label, modelVersion],
);
}
export async function getVideoInfoFromAllData(client: Client, aid: number) {
const queryResult = await client.queryObject<AllDataType>(
`SELECT * FROM all_data WHERE aid = $1`,
[aid],
);
return queryResult.rows[0];
}

View File

@ -1,21 +1,20 @@
import { AutoTokenizer } from "@huggingface/transformers"; import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers";
import * as ort from "onnxruntime"; import * as ort from "onnxruntime";
import logger from "lib/log/logger.ts";
import { WorkerError } from "../mq/schema.ts";
// 模型路径和名称
const tokenizerModel = "alikia2x/jina-embedding-v3-m2v-1024"; const tokenizerModel = "alikia2x/jina-embedding-v3-m2v-1024";
const onnxClassifierPath = "./model/video_classifier_v3_11.onnx"; const onnxClassifierPath = "./model/video_classifier_v3_11.onnx";
const onnxEmbeddingOriginalPath = "./model/model.onnx"; const onnxEmbeddingOriginalPath = "./model/model.onnx";
export const modelVersion = "3.11"; export const modelVersion = "3.11";
// 全局变量,用于存储模型和分词器
let sessionClassifier: ort.InferenceSession | null = null; let sessionClassifier: ort.InferenceSession | null = null;
let sessionEmbedding: ort.InferenceSession | null = null; let sessionEmbedding: ort.InferenceSession | null = null;
let tokenizer: any | null = null; let tokenizer: PreTrainedTokenizer | null = null;
// 初始化分词器和ONNX会话 export async function initializeModels() {
async function initializeModels() {
if (tokenizer && sessionClassifier && sessionEmbedding) { if (tokenizer && sessionClassifier && sessionEmbedding) {
return; // 模型已加载,无需重复加载 return;
} }
try { try {
@ -30,8 +29,8 @@ async function initializeModels() {
sessionClassifier = classifierSession; sessionClassifier = classifierSession;
sessionEmbedding = embeddingSession; sessionEmbedding = embeddingSession;
} catch (error) { } catch (error) {
console.error("Error initializing models:", error); const e = new WorkerError(error as Error, "ml", "fn:initializeModels");
throw error; // 重新抛出错误,以便调用方处理 throw e;
} }
} }
@ -51,14 +50,12 @@ async function getONNXEmbeddings(texts: string[], session: ort.InferenceSession)
return_tensor: false, return_tensor: false,
}); });
// 构造输入参数
const cumsum = (arr: number[]): number[] => const cumsum = (arr: number[]): number[] =>
arr.reduce((acc: number[], num: number, i: number) => [...acc, num + (acc[i - 1] || 0)], []); arr.reduce((acc: number[], num: number, i: number) => [...acc, num + (acc[i - 1] || 0)], []);
const offsets: number[] = [0, ...cumsum(input_ids.slice(0, -1).map((x: string) => x.length))]; const offsets: number[] = [0, ...cumsum(input_ids.slice(0, -1).map((x: string) => x.length))];
const flattened_input_ids = input_ids.flat(); const flattened_input_ids = input_ids.flat();
// 准备ONNX输入
const inputs = { const inputs = {
input_ids: new ort.Tensor("int64", new BigInt64Array(flattened_input_ids.map(BigInt)), [ input_ids: new ort.Tensor("int64", new BigInt64Array(flattened_input_ids.map(BigInt)), [
flattened_input_ids.length, flattened_input_ids.length,
@ -66,12 +63,11 @@ async function getONNXEmbeddings(texts: string[], session: ort.InferenceSession)
offsets: new ort.Tensor("int64", new BigInt64Array(offsets.map(BigInt)), [offsets.length]), offsets: new ort.Tensor("int64", new BigInt64Array(offsets.map(BigInt)), [offsets.length]),
}; };
// 执行推理
const { embeddings } = await session.run(inputs); const { embeddings } = await session.run(inputs);
return Array.from(embeddings.data as Float32Array); return Array.from(embeddings.data as Float32Array);
} }
// 分类推理函数
async function runClassification(embeddings: number[]): Promise<number[]> { async function runClassification(embeddings: number[]): Promise<number[]> {
if (!sessionClassifier) { if (!sessionClassifier) {
throw new Error("Classifier session is not initialized. Call initializeModels() first."); throw new Error("Classifier session is not initialized. Call initializeModels() first.");
@ -85,13 +81,13 @@ async function runClassification(embeddings: number[]): Promise<number[]> {
return softmax(logits.data as Float32Array); return softmax(logits.data as Float32Array);
} }
// 导出分类函数
export async function classifyVideo( export async function classifyVideo(
title: string, title: string,
description: string, description: string,
tags: string, tags: string,
author_info: string, author_info: string,
): Promise<number[]> { aid: number
): Promise<number> {
if (!sessionEmbedding) { if (!sessionEmbedding) {
throw new Error("Embedding session is not initialized. Call initializeModels() first."); throw new Error("Embedding session is not initialized. Call initializeModels() first.");
} }
@ -101,7 +97,7 @@ export async function classifyVideo(
tags, tags,
author_info, author_info,
], sessionEmbedding); ], sessionEmbedding);
const probabilities = await runClassification(embeddings); const probabilities = await runClassification(embeddings);
return probabilities; logger.log(`Prediction result for aid: ${aid}: [${probabilities.map((p) => p.toFixed(5))}]`, "ml")
return probabilities.indexOf(Math.max(...probabilities));
} }

View File

@ -0,0 +1,47 @@
import { Job } from "bullmq";
import { db } from "lib/db/init.ts";
import { getUnlabeledVideos, getVideoInfoFromAllData, insertVideoLabel} from "lib/db/allData.ts";
import { classifyVideo, initializeModels } from "lib/ml/filter_inference.ts";
import { ClassifyVideoQueue } from "lib/mq/index.ts";
import logger from "lib/log/logger.ts";
export const classifyVideoWorker = async (job: Job) => {
const client = await db.connect();
const aid = job.data.aid;
if (!aid) {
return 3;
}
const videoInfo = await getVideoInfoFromAllData(client, aid);
const title = videoInfo.title?.trim() || "untitled";
const description = videoInfo.description?.trim() || "N/A";
const tags = videoInfo.tags?.trim() || "empty";
const authorInfo = "No";
const label = await classifyVideo(title, description, tags, authorInfo, aid);
if (label == -1) {
logger.warn(`Failed to classify video ${aid}`, "ml");
}
insertVideoLabel(client, aid, label);
client.release();
job.updateData({
...job.data, label: label,
});
return 0;
};
export const classifyVideosWorker = async () => {
await initializeModels();
const client = await db.connect();
const videos = await getUnlabeledVideos(client);
client.release();
let i = 0;
for (const aid of videos) {
if (i > 200) return 10000 + i;
await ClassifyVideoQueue.add("classifyVideo", { aid: Number(aid) });
i++;
}
return 0;
};

View File

@ -8,7 +8,7 @@ import logger from "lib/log/logger.ts";
import { getNullVideoTagsList, updateVideoTags } from "lib/db/allData.ts"; import { getNullVideoTagsList, updateVideoTags } from "lib/db/allData.ts";
import { getVideoTags } from "lib/net/getVideoTags.ts"; import { getVideoTags } from "lib/net/getVideoTags.ts";
import { NetSchedulerError } from "lib/mq/scheduler.ts"; import { NetSchedulerError } from "lib/mq/scheduler.ts";
import { WorkerError } from "src/worker.ts"; import { WorkerError } from "../schema.ts";
const delayMap = [0.5, 3, 5, 15, 30, 60]; const delayMap = [0.5, 3, 5, 15, 30, 60];
const getJobPriority = (diff: number) => { const getJobPriority = (diff: number) => {

View File

@ -1,22 +1,19 @@
import { MINUTE, SECOND } from "$std/datetime/constants.ts"; import { MINUTE, SECOND } from "$std/datetime/constants.ts";
import { LatestVideosQueue, VideoTagsQueue } from "lib/mq/index.ts"; import { ClassifyVideoQueue, LatestVideosQueue, VideoTagsQueue } from "lib/mq/index.ts";
import logger from "lib/log/logger.ts"; import logger from "lib/log/logger.ts";
async function configGetLatestVideos() { export async function initMQ() {
await LatestVideosQueue.upsertJobScheduler("getLatestVideos", { await LatestVideosQueue.upsertJobScheduler("getLatestVideos", {
every: 1 * MINUTE, every: 1 * MINUTE
}); });
}
async function configGetVideosTags() {
await VideoTagsQueue.upsertJobScheduler("getVideosTags", { await VideoTagsQueue.upsertJobScheduler("getVideosTags", {
every: 30 * SECOND, every: 30 * SECOND,
immediately: true, immediately: true,
}); });
} await ClassifyVideoQueue.upsertJobScheduler("classifyVideos", {
every: 30 * SECOND,
immediately: true,
})
export async function initMQ() {
await configGetLatestVideos();
await configGetVideosTags();
logger.log("Message queue initialized."); logger.log("Message queue initialized.");
} }

12
lib/mq/schema.ts Normal file
View File

@ -0,0 +1,12 @@
export class WorkerError extends Error {
public service?: string;
public codePath?: string;
public rawError: Error;
constructor(rawError: Error, service?: string, codePath?: string) {
super(rawError.message);
this.name = "WorkerFailure";
this.codePath = codePath;
this.service = service;
this.rawError = rawError;
}
}

View File

@ -1,7 +1,8 @@
import { getLatestVideos } from "lib/net/getLatestVideos.ts"; import { getLatestVideos } from "lib/net/getLatestVideos.ts";
import { AllDataType } from "lib/db/schema.d.ts"; import { SECOND } from "$std/datetime/constants.ts";
import { VideoListVideo } from "lib/net/bilibili.d.ts";
export async function getVideoPositionInNewList(timestamp: number): Promise<number | null | AllDataType[]> { export async function getVideoPositionInNewList(timestamp: number): Promise<number | null | VideoListVideo[]> {
const virtualPageSize = 50; const virtualPageSize = 50;
let lowPage = 1; let lowPage = 1;
@ -10,16 +11,15 @@ export async function getVideoPositionInNewList(timestamp: number): Promise<numb
while (true) { while (true) {
const ps = highPage < 2 ? 50 : 1 const ps = highPage < 2 ? 50 : 1
const pn = highPage < 2 ? 1 : highPage * virtualPageSize; const pn = highPage < 2 ? 1 : highPage * virtualPageSize;
const fetchTags = highPage < 2 ? true : false; const videos = await getLatestVideos(pn, ps);
const videos = await getLatestVideos(pn, ps, 250, fetchTags);
if (!videos || videos.length === 0) { if (!videos || videos.length === 0) {
break; break;
} }
const lastVideo = videos[videos.length - 1]; const lastVideo = videos[videos.length - 1];
if (!lastVideo || !lastVideo.published_at) { if (!lastVideo || !lastVideo.pubdate) {
break; break;
} }
const lastTime = Date.parse(lastVideo.published_at); const lastTime = lastVideo.pubdate * SECOND
if (lastTime <= timestamp && highPage == 1) { if (lastTime <= timestamp && highPage == 1) {
return videos; return videos;
} }
@ -41,7 +41,7 @@ export async function getVideoPositionInNewList(timestamp: number): Promise<numb
let hi = highPage; let hi = highPage;
while (lo <= hi) { while (lo <= hi) {
const mid = Math.floor((lo + hi) / 2); const mid = Math.floor((lo + hi) / 2);
const videos = await getLatestVideos(mid * virtualPageSize, 1, 250, false); const videos = await getLatestVideos(mid * virtualPageSize, 1);
if (!videos) { if (!videos) {
return null; return null;
} }
@ -50,11 +50,11 @@ export async function getVideoPositionInNewList(timestamp: number): Promise<numb
continue; continue;
} }
const lastVideo = videos[videos.length - 1]; const lastVideo = videos[videos.length - 1];
if (!lastVideo || !lastVideo.published_at) { if (!lastVideo || !lastVideo.pubdate) {
hi = mid - 1; hi = mid - 1;
continue; continue;
} }
const lastTime = Date.parse(lastVideo.published_at); const lastTime = lastVideo.pubdate * SECOND
if (lastTime > timestamp) { if (lastTime > timestamp) {
lo = mid + 1; lo = mid + 1;
} else { } else {
@ -63,15 +63,15 @@ export async function getVideoPositionInNewList(timestamp: number): Promise<numb
} }
} }
const boundaryVideos = await getLatestVideos(boundaryPage, virtualPageSize, 250, false); const boundaryVideos = await getLatestVideos(boundaryPage, virtualPageSize);
let indexInPage = 0; let indexInPage = 0;
if (boundaryVideos && boundaryVideos.length > 0) { if (boundaryVideos && boundaryVideos.length > 0) {
for (let i = 0; i < boundaryVideos.length; i++) { for (let i = 0; i < boundaryVideos.length; i++) {
const video = boundaryVideos[i]; const video = boundaryVideos[i];
if (!video.published_at) { if (!video.pubdate) {
continue; continue;
} }
const videoTime = Date.parse(video.published_at); const videoTime = video.pubdate * SECOND
if (videoTime > timestamp) { if (videoTime > timestamp) {
indexInPage++; indexInPage++;
} else { } else {

View File

@ -1,15 +1,10 @@
import { VideoListResponse } from "lib/net/bilibili.d.ts"; import { VideoListResponse, VideoListVideo } from "lib/net/bilibili.d.ts";
import { formatTimestampToPsql as formatPublishedAt } from "lib/utils/formatTimestampToPostgre.ts";
import { AllDataType } from "lib/db/schema.d.ts";
import logger from "lib/log/logger.ts"; import logger from "lib/log/logger.ts";
import { HOUR, SECOND } from "$std/datetime/constants.ts";
export async function getLatestVideos( export async function getLatestVideos(
page: number = 1, page: number = 1,
pageSize: number = 10, pageSize: number = 10
sleepRate: number = 250, ): Promise<VideoListVideo[] | null> {
fetchTags: boolean = true,
): Promise<AllDataType[] | null> {
try { try {
const response = await fetch( const response = await fetch(
`https://api.bilibili.com/x/web-interface/newlist?rid=30&ps=${pageSize}&pn=${page}`, `https://api.bilibili.com/x/web-interface/newlist?rid=30&ps=${pageSize}&pn=${page}`,
@ -26,18 +21,7 @@ export async function getLatestVideos(
return []; return [];
} }
return data.data.archives.map((video) => { return data.data.archives;
const published_at = formatPublishedAt(video.pubdate * SECOND + 8 * HOUR);
return {
aid: video.aid,
bvid: video.bvid,
description: video.desc,
uid: video.owner.mid,
tags: null,
title: video.title,
published_at: published_at,
} as AllDataType;
});
} catch (error) { } catch (error) {
logger.error(error as Error, "net", "getLatestVideos"); logger.error(error as Error, "net", "getLatestVideos");
return null; return null;

View File

@ -9,7 +9,6 @@ import logger from "lib/log/logger.ts";
export async function insertLatestVideos( export async function insertLatestVideos(
client: Client, client: Client,
pageSize: number = 10, pageSize: number = 10,
sleepRate: number = 250,
intervalRate: number = 4000, intervalRate: number = 4000,
): Promise<number | null> { ): Promise<number | null> {
const latestVideoTimestamp = await getLatestVideoTimestampFromAllData(client); const latestVideoTimestamp = await getLatestVideoTimestampFromAllData(client);
@ -27,7 +26,7 @@ export async function insertLatestVideos(
for (const video of videoIndex) { for (const video of videoIndex) {
const videoExists = await videoExistsInAllData(client, video.aid); const videoExists = await videoExistsInAllData(client, video.aid);
if (!videoExists) { if (!videoExists) {
insertIntoAllData(client, video); await insertIntoAllData(client, video);
} }
} }
return 0; return 0;
@ -37,7 +36,7 @@ export async function insertLatestVideos(
const insertedVideos = new Set(); const insertedVideos = new Set();
while (true) { while (true) {
try { try {
const videos = await getLatestVideos(page, pageSize, sleepRate); const videos = await getLatestVideos(page, pageSize);
if (videos == null) { if (videos == null) {
failCount++; failCount++;
if (failCount > 5) { if (failCount > 5) {
@ -53,7 +52,7 @@ export async function insertLatestVideos(
for (const video of videos) { for (const video of videos) {
const videoExists = await videoExistsInAllData(client, video.aid); const videoExists = await videoExistsInAllData(client, video.aid);
if (!videoExists) { if (!videoExists) {
insertIntoAllData(client, video); await insertIntoAllData(client, video);
insertedVideos.add(video.aid); insertedVideos.add(video.aid);
} }
} }
@ -68,7 +67,7 @@ export async function insertLatestVideos(
if (failCount > 5) { if (failCount > 5) {
return null; return null;
} }
continue;
} finally { } finally {
await sleep(Math.random() * intervalRate + failCount * 3 * SECOND + SECOND); await sleep(Math.random() * intervalRate + failCount * 3 * SECOND + SECOND);
} }

View File

@ -2,13 +2,13 @@ import express from "express";
import { createBullBoard } from "@bull-board/api"; import { createBullBoard } from "@bull-board/api";
import { BullMQAdapter } from "@bull-board/api/bullMQAdapter.js"; import { BullMQAdapter } from "@bull-board/api/bullMQAdapter.js";
import { ExpressAdapter } from "@bull-board/express"; import { ExpressAdapter } from "@bull-board/express";
import { LatestVideosQueue, VideoTagsQueue } from "lib/mq/index.ts"; import { ClassifyVideoQueue, LatestVideosQueue, VideoTagsQueue } from "lib/mq/index.ts";
const serverAdapter = new ExpressAdapter(); const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath("/"); serverAdapter.setBasePath("/");
createBullBoard({ createBullBoard({
queues: [new BullMQAdapter(LatestVideosQueue), new BullMQAdapter(VideoTagsQueue)], queues: [new BullMQAdapter(LatestVideosQueue), new BullMQAdapter(VideoTagsQueue), new BullMQAdapter(ClassifyVideoQueue)],
serverAdapter: serverAdapter, serverAdapter: serverAdapter,
}); });

View File

@ -1,25 +1,26 @@
import { Job, Worker } from "bullmq"; import { Job, Worker } from "bullmq";
import { redis } from "lib/db/redis.ts"; import { redis } from "lib/db/redis.ts";
import logger from "lib/log/logger.ts"; import logger from "lib/log/logger.ts";
import { WorkerError } from "src/worker.ts"; import { classifyVideosWorker, classifyVideoWorker } from "lib/mq/exec/classifyVideo.ts";
import { WorkerError } from "../lib/mq/schema.ts";
const filterWorker = new Worker( const filterWorker = new Worker(
"classifyVideo", "classifyVideo",
async (job: Job) => { async (job: Job) => {
switch (job.name) { switch (job.name) {
case "classifyVideo": case "classifyVideo":
return await getVideoTagsWorker(job); return await classifyVideoWorker(job);
case "classifyVideos": case "classifyVideos":
return await getVideoTagsInitializer(); return await classifyVideosWorker();
default: default:
break; break;
} }
}, },
{ connection: redis, concurrency: 1, removeOnComplete: { count: 1440 } }, { connection: redis, concurrency: 4, removeOnComplete: { count: 1000 } },
); );
filterWorker.on("active", () => { filterWorker.on("active", () => {
logger.log("Worker activated.", "mq"); logger.log("Worker (filter) activated.", "mq");
}); });
filterWorker.on("error", (err) => { filterWorker.on("error", (err) => {

View File

@ -5,19 +5,7 @@ import logger from "lib/log/logger.ts";
import { getVideoTagsWorker } from "lib/mq/exec/getVideoTags.ts"; import { getVideoTagsWorker } from "lib/mq/exec/getVideoTags.ts";
import { getVideoTagsInitializer } from "lib/mq/exec/getVideoTags.ts"; import { getVideoTagsInitializer } from "lib/mq/exec/getVideoTags.ts";
import { lockManager } from "lib/mq/lockManager.ts"; import { lockManager } from "lib/mq/lockManager.ts";
import { WorkerError } from "../lib/mq/schema.ts";
export class WorkerError extends Error {
public service?: string;
public codePath?: string;
public rawError: Error;
constructor(rawError: Error, service?: string, codePath?: string) {
super(rawError.message);
this.name = "WorkerFailure";
this.codePath = codePath;
this.service = service;
this.rawError = rawError;
}
}
Deno.addSignalListener("SIGINT", async () => { Deno.addSignalListener("SIGINT", async () => {
logger.log("SIGINT Received: Shutting down workers...", "mq"); logger.log("SIGINT Received: Shutting down workers...", "mq");
@ -48,7 +36,7 @@ const latestVideoWorker = new Worker(
); );
latestVideoWorker.on("active", () => { latestVideoWorker.on("active", () => {
logger.log("Worker activated.", "mq"); logger.log("Worker (latestVideos) activated.", "mq");
}); });
latestVideoWorker.on("error", (err) => { latestVideoWorker.on("error", (err) => {
@ -82,7 +70,7 @@ const videoTagsWorker = new Worker(
); );
videoTagsWorker.on("active", () => { videoTagsWorker.on("active", () => {
logger.log("Worker activated.", "mq"); logger.log("Worker (videoTags) activated.", "mq");
}); });
videoTagsWorker.on("error", (err) => { videoTagsWorker.on("error", (err) => {