Compare commits
4 Commits
bfc622bfda
...
b14a43f63f
Author | SHA1 | Date | |
---|---|---|---|
b14a43f63f | |||
cecc1c1d2c | |||
7946cb6e96 | |||
4b48357ab6 |
@ -10,10 +10,11 @@
|
||||
"build": "deno run -A dev.ts build",
|
||||
"preview": "deno run -A main.ts",
|
||||
"update": "deno run -A -r https://fresh.deno.dev/update .",
|
||||
"worker": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/worker.ts",
|
||||
"worker:main": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/worker.ts",
|
||||
"worker:filter": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/filterWorker.ts",
|
||||
"adder": "deno run --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts",
|
||||
"bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts",
|
||||
"all": "concurrently 'deno task start' 'deno task worker' 'deno task adder' 'deno task bullui'",
|
||||
"all": "concurrently 'deno task start' 'deno task worker:main' 'deno task adder' 'deno task bullui' 'deno task worker:filter'",
|
||||
"test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run"
|
||||
},
|
||||
"lint": {
|
||||
|
@ -1,20 +1,37 @@
|
||||
import { Client, Transaction } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||
import { AllDataType } from "lib/db/schema.d.ts";
|
||||
import logger from "lib/log/logger.ts";
|
||||
import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts";
|
||||
import { formatTimestampToPsql, parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts";
|
||||
import { VideoListVideo } from "lib/net/bilibili.d.ts";
|
||||
import { HOUR, SECOND } from "$std/datetime/constants.ts";
|
||||
import { modelVersion } from "lib/ml/filter_inference.ts";
|
||||
|
||||
export async function videoExistsInAllData(client: Client, aid: number) {
|
||||
return await client.queryObject<{ exists: boolean }>(`SELECT EXISTS(SELECT 1 FROM all_data WHERE aid = $1)`, [aid])
|
||||
.then((result) => result.rows[0].exists);
|
||||
}
|
||||
|
||||
export async function insertIntoAllData(client: Client, data: AllDataType) {
|
||||
export async function biliUserExists(client: Client, uid: number) {
|
||||
return await client.queryObject<{ exists: boolean }>(`SELECT EXISTS(SELECT 1 FROM bili_user WHERE uid = $1)`, [uid])
|
||||
.then((result) => result.rows[0].exists);
|
||||
}
|
||||
|
||||
export async function insertIntoAllData(client: Client, data: VideoListVideo) {
|
||||
logger.log(`inserted ${data.aid}`, "db-all_data");
|
||||
return await client.queryObject(
|
||||
`INSERT INTO all_data (aid, bvid, description, uid, tags, title, published_at)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
||||
await client.queryObject(
|
||||
`INSERT INTO all_data (aid, bvid, description, uid, tags, title, published_at, duration)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||
ON CONFLICT (aid) DO NOTHING`,
|
||||
[data.aid, data.bvid, data.description, data.uid, data.tags, data.title, data.published_at],
|
||||
[
|
||||
data.aid,
|
||||
data.bvid,
|
||||
data.desc,
|
||||
data.owner.mid,
|
||||
null,
|
||||
data.title,
|
||||
formatTimestampToPsql(data.pubdate * SECOND + 8 * HOUR),
|
||||
data.duration,
|
||||
],
|
||||
);
|
||||
}
|
||||
|
||||
@ -59,3 +76,25 @@ export async function getNullVideoTagsList(client: Client) {
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
export async function getUnlabeledVideos(client: Client) {
|
||||
const queryResult = await client.queryObject<{ aid: number }>(
|
||||
`SELECT a.aid FROM all_data a LEFT JOIN labelling_result l ON a.aid = l.aid WHERE l.aid IS NULL`,
|
||||
);
|
||||
return queryResult.rows.map((row) => row.aid);
|
||||
}
|
||||
|
||||
export async function insertVideoLabel(client: Client, aid: number, label: number) {
|
||||
return await client.queryObject(
|
||||
`INSERT INTO labelling_result (aid, label, model_version) VALUES ($1, $2, $3) ON CONFLICT (aid, model_version) DO NOTHING`,
|
||||
[aid, label, modelVersion],
|
||||
);
|
||||
}
|
||||
|
||||
export async function getVideoInfoFromAllData(client: Client, aid: number) {
|
||||
const queryResult = await client.queryObject<AllDataType>(
|
||||
`SELECT * FROM all_data WHERE aid = $1`,
|
||||
[aid],
|
||||
);
|
||||
return queryResult.rows[0];
|
||||
}
|
||||
|
@ -1,21 +1,20 @@
|
||||
import { AutoTokenizer } from "@huggingface/transformers";
|
||||
import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers";
|
||||
import * as ort from "onnxruntime";
|
||||
import logger from "lib/log/logger.ts";
|
||||
import { WorkerError } from "../mq/schema.ts";
|
||||
|
||||
// 模型路径和名称
|
||||
const tokenizerModel = "alikia2x/jina-embedding-v3-m2v-1024";
|
||||
const onnxClassifierPath = "./model/video_classifier_v3_11.onnx";
|
||||
const onnxEmbeddingOriginalPath = "./model/model.onnx";
|
||||
export const modelVersion = "3.11";
|
||||
|
||||
// 全局变量,用于存储模型和分词器
|
||||
let sessionClassifier: ort.InferenceSession | null = null;
|
||||
let sessionEmbedding: ort.InferenceSession | null = null;
|
||||
let tokenizer: any | null = null;
|
||||
let tokenizer: PreTrainedTokenizer | null = null;
|
||||
|
||||
// 初始化分词器和ONNX会话
|
||||
async function initializeModels() {
|
||||
export async function initializeModels() {
|
||||
if (tokenizer && sessionClassifier && sessionEmbedding) {
|
||||
return; // 模型已加载,无需重复加载
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
@ -30,8 +29,8 @@ async function initializeModels() {
|
||||
sessionClassifier = classifierSession;
|
||||
sessionEmbedding = embeddingSession;
|
||||
} catch (error) {
|
||||
console.error("Error initializing models:", error);
|
||||
throw error; // 重新抛出错误,以便调用方处理
|
||||
const e = new WorkerError(error as Error, "ml", "fn:initializeModels");
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@ -51,14 +50,12 @@ async function getONNXEmbeddings(texts: string[], session: ort.InferenceSession)
|
||||
return_tensor: false,
|
||||
});
|
||||
|
||||
// 构造输入参数
|
||||
const cumsum = (arr: number[]): number[] =>
|
||||
arr.reduce((acc: number[], num: number, i: number) => [...acc, num + (acc[i - 1] || 0)], []);
|
||||
|
||||
const offsets: number[] = [0, ...cumsum(input_ids.slice(0, -1).map((x: string) => x.length))];
|
||||
const flattened_input_ids = input_ids.flat();
|
||||
|
||||
// 准备ONNX输入
|
||||
const inputs = {
|
||||
input_ids: new ort.Tensor("int64", new BigInt64Array(flattened_input_ids.map(BigInt)), [
|
||||
flattened_input_ids.length,
|
||||
@ -66,12 +63,11 @@ async function getONNXEmbeddings(texts: string[], session: ort.InferenceSession)
|
||||
offsets: new ort.Tensor("int64", new BigInt64Array(offsets.map(BigInt)), [offsets.length]),
|
||||
};
|
||||
|
||||
// 执行推理
|
||||
const { embeddings } = await session.run(inputs);
|
||||
return Array.from(embeddings.data as Float32Array);
|
||||
}
|
||||
|
||||
// 分类推理函数
|
||||
|
||||
async function runClassification(embeddings: number[]): Promise<number[]> {
|
||||
if (!sessionClassifier) {
|
||||
throw new Error("Classifier session is not initialized. Call initializeModels() first.");
|
||||
@ -85,13 +81,13 @@ async function runClassification(embeddings: number[]): Promise<number[]> {
|
||||
return softmax(logits.data as Float32Array);
|
||||
}
|
||||
|
||||
// 导出分类函数
|
||||
export async function classifyVideo(
|
||||
title: string,
|
||||
description: string,
|
||||
tags: string,
|
||||
author_info: string,
|
||||
): Promise<number[]> {
|
||||
aid: number
|
||||
): Promise<number> {
|
||||
if (!sessionEmbedding) {
|
||||
throw new Error("Embedding session is not initialized. Call initializeModels() first.");
|
||||
}
|
||||
@ -101,7 +97,7 @@ export async function classifyVideo(
|
||||
tags,
|
||||
author_info,
|
||||
], sessionEmbedding);
|
||||
|
||||
const probabilities = await runClassification(embeddings);
|
||||
return probabilities;
|
||||
logger.log(`Prediction result for aid: ${aid}: [${probabilities.map((p) => p.toFixed(5))}]`, "ml")
|
||||
return probabilities.indexOf(Math.max(...probabilities));
|
||||
}
|
||||
|
47
lib/mq/exec/classifyVideo.ts
Normal file
47
lib/mq/exec/classifyVideo.ts
Normal file
@ -0,0 +1,47 @@
|
||||
import { Job } from "bullmq";
|
||||
import { db } from "lib/db/init.ts";
|
||||
import { getUnlabeledVideos, getVideoInfoFromAllData, insertVideoLabel} from "lib/db/allData.ts";
|
||||
import { classifyVideo, initializeModels } from "lib/ml/filter_inference.ts";
|
||||
import { ClassifyVideoQueue } from "lib/mq/index.ts";
|
||||
import logger from "lib/log/logger.ts";
|
||||
|
||||
export const classifyVideoWorker = async (job: Job) => {
|
||||
const client = await db.connect();
|
||||
const aid = job.data.aid;
|
||||
if (!aid) {
|
||||
return 3;
|
||||
}
|
||||
|
||||
const videoInfo = await getVideoInfoFromAllData(client, aid);
|
||||
const title = videoInfo.title?.trim() || "untitled";
|
||||
const description = videoInfo.description?.trim() || "N/A";
|
||||
const tags = videoInfo.tags?.trim() || "empty";
|
||||
const authorInfo = "No";
|
||||
const label = await classifyVideo(title, description, tags, authorInfo, aid);
|
||||
if (label == -1) {
|
||||
logger.warn(`Failed to classify video ${aid}`, "ml");
|
||||
}
|
||||
insertVideoLabel(client, aid, label);
|
||||
|
||||
client.release();
|
||||
|
||||
job.updateData({
|
||||
...job.data, label: label,
|
||||
});
|
||||
|
||||
return 0;
|
||||
};
|
||||
|
||||
export const classifyVideosWorker = async () => {
|
||||
await initializeModels();
|
||||
const client = await db.connect();
|
||||
const videos = await getUnlabeledVideos(client);
|
||||
client.release();
|
||||
let i = 0;
|
||||
for (const aid of videos) {
|
||||
if (i > 200) return 10000 + i;
|
||||
await ClassifyVideoQueue.add("classifyVideo", { aid: Number(aid) });
|
||||
i++;
|
||||
}
|
||||
return 0;
|
||||
};
|
@ -8,7 +8,7 @@ import logger from "lib/log/logger.ts";
|
||||
import { getNullVideoTagsList, updateVideoTags } from "lib/db/allData.ts";
|
||||
import { getVideoTags } from "lib/net/getVideoTags.ts";
|
||||
import { NetSchedulerError } from "lib/mq/scheduler.ts";
|
||||
import { WorkerError } from "src/worker.ts";
|
||||
import { WorkerError } from "../schema.ts";
|
||||
|
||||
const delayMap = [0.5, 3, 5, 15, 30, 60];
|
||||
const getJobPriority = (diff: number) => {
|
||||
|
@ -1,22 +1,19 @@
|
||||
import { MINUTE, SECOND } from "$std/datetime/constants.ts";
|
||||
import { LatestVideosQueue, VideoTagsQueue } from "lib/mq/index.ts";
|
||||
import { ClassifyVideoQueue, LatestVideosQueue, VideoTagsQueue } from "lib/mq/index.ts";
|
||||
import logger from "lib/log/logger.ts";
|
||||
|
||||
async function configGetLatestVideos() {
|
||||
export async function initMQ() {
|
||||
await LatestVideosQueue.upsertJobScheduler("getLatestVideos", {
|
||||
every: 1 * MINUTE,
|
||||
every: 1 * MINUTE
|
||||
});
|
||||
}
|
||||
|
||||
async function configGetVideosTags() {
|
||||
await VideoTagsQueue.upsertJobScheduler("getVideosTags", {
|
||||
every: 30 * SECOND,
|
||||
immediately: true,
|
||||
});
|
||||
}
|
||||
await ClassifyVideoQueue.upsertJobScheduler("classifyVideos", {
|
||||
every: 30 * SECOND,
|
||||
immediately: true,
|
||||
})
|
||||
|
||||
export async function initMQ() {
|
||||
await configGetLatestVideos();
|
||||
await configGetVideosTags();
|
||||
logger.log("Message queue initialized.");
|
||||
}
|
||||
|
12
lib/mq/schema.ts
Normal file
12
lib/mq/schema.ts
Normal file
@ -0,0 +1,12 @@
|
||||
export class WorkerError extends Error {
|
||||
public service?: string;
|
||||
public codePath?: string;
|
||||
public rawError: Error;
|
||||
constructor(rawError: Error, service?: string, codePath?: string) {
|
||||
super(rawError.message);
|
||||
this.name = "WorkerFailure";
|
||||
this.codePath = codePath;
|
||||
this.service = service;
|
||||
this.rawError = rawError;
|
||||
}
|
||||
}
|
@ -1,7 +1,8 @@
|
||||
import { getLatestVideos } from "lib/net/getLatestVideos.ts";
|
||||
import { AllDataType } from "lib/db/schema.d.ts";
|
||||
import { SECOND } from "$std/datetime/constants.ts";
|
||||
import { VideoListVideo } from "lib/net/bilibili.d.ts";
|
||||
|
||||
export async function getVideoPositionInNewList(timestamp: number): Promise<number | null | AllDataType[]> {
|
||||
export async function getVideoPositionInNewList(timestamp: number): Promise<number | null | VideoListVideo[]> {
|
||||
const virtualPageSize = 50;
|
||||
|
||||
let lowPage = 1;
|
||||
@ -10,16 +11,15 @@ export async function getVideoPositionInNewList(timestamp: number): Promise<numb
|
||||
while (true) {
|
||||
const ps = highPage < 2 ? 50 : 1
|
||||
const pn = highPage < 2 ? 1 : highPage * virtualPageSize;
|
||||
const fetchTags = highPage < 2 ? true : false;
|
||||
const videos = await getLatestVideos(pn, ps, 250, fetchTags);
|
||||
const videos = await getLatestVideos(pn, ps);
|
||||
if (!videos || videos.length === 0) {
|
||||
break;
|
||||
}
|
||||
const lastVideo = videos[videos.length - 1];
|
||||
if (!lastVideo || !lastVideo.published_at) {
|
||||
if (!lastVideo || !lastVideo.pubdate) {
|
||||
break;
|
||||
}
|
||||
const lastTime = Date.parse(lastVideo.published_at);
|
||||
const lastTime = lastVideo.pubdate * SECOND
|
||||
if (lastTime <= timestamp && highPage == 1) {
|
||||
return videos;
|
||||
}
|
||||
@ -41,7 +41,7 @@ export async function getVideoPositionInNewList(timestamp: number): Promise<numb
|
||||
let hi = highPage;
|
||||
while (lo <= hi) {
|
||||
const mid = Math.floor((lo + hi) / 2);
|
||||
const videos = await getLatestVideos(mid * virtualPageSize, 1, 250, false);
|
||||
const videos = await getLatestVideos(mid * virtualPageSize, 1);
|
||||
if (!videos) {
|
||||
return null;
|
||||
}
|
||||
@ -50,11 +50,11 @@ export async function getVideoPositionInNewList(timestamp: number): Promise<numb
|
||||
continue;
|
||||
}
|
||||
const lastVideo = videos[videos.length - 1];
|
||||
if (!lastVideo || !lastVideo.published_at) {
|
||||
if (!lastVideo || !lastVideo.pubdate) {
|
||||
hi = mid - 1;
|
||||
continue;
|
||||
}
|
||||
const lastTime = Date.parse(lastVideo.published_at);
|
||||
const lastTime = lastVideo.pubdate * SECOND
|
||||
if (lastTime > timestamp) {
|
||||
lo = mid + 1;
|
||||
} else {
|
||||
@ -63,15 +63,15 @@ export async function getVideoPositionInNewList(timestamp: number): Promise<numb
|
||||
}
|
||||
}
|
||||
|
||||
const boundaryVideos = await getLatestVideos(boundaryPage, virtualPageSize, 250, false);
|
||||
const boundaryVideos = await getLatestVideos(boundaryPage, virtualPageSize);
|
||||
let indexInPage = 0;
|
||||
if (boundaryVideos && boundaryVideos.length > 0) {
|
||||
for (let i = 0; i < boundaryVideos.length; i++) {
|
||||
const video = boundaryVideos[i];
|
||||
if (!video.published_at) {
|
||||
if (!video.pubdate) {
|
||||
continue;
|
||||
}
|
||||
const videoTime = Date.parse(video.published_at);
|
||||
const videoTime = video.pubdate * SECOND
|
||||
if (videoTime > timestamp) {
|
||||
indexInPage++;
|
||||
} else {
|
||||
|
@ -1,15 +1,10 @@
|
||||
import { VideoListResponse } from "lib/net/bilibili.d.ts";
|
||||
import { formatTimestampToPsql as formatPublishedAt } from "lib/utils/formatTimestampToPostgre.ts";
|
||||
import { AllDataType } from "lib/db/schema.d.ts";
|
||||
import { VideoListResponse, VideoListVideo } from "lib/net/bilibili.d.ts";
|
||||
import logger from "lib/log/logger.ts";
|
||||
import { HOUR, SECOND } from "$std/datetime/constants.ts";
|
||||
|
||||
export async function getLatestVideos(
|
||||
page: number = 1,
|
||||
pageSize: number = 10,
|
||||
sleepRate: number = 250,
|
||||
fetchTags: boolean = true,
|
||||
): Promise<AllDataType[] | null> {
|
||||
pageSize: number = 10
|
||||
): Promise<VideoListVideo[] | null> {
|
||||
try {
|
||||
const response = await fetch(
|
||||
`https://api.bilibili.com/x/web-interface/newlist?rid=30&ps=${pageSize}&pn=${page}`,
|
||||
@ -26,18 +21,7 @@ export async function getLatestVideos(
|
||||
return [];
|
||||
}
|
||||
|
||||
return data.data.archives.map((video) => {
|
||||
const published_at = formatPublishedAt(video.pubdate * SECOND + 8 * HOUR);
|
||||
return {
|
||||
aid: video.aid,
|
||||
bvid: video.bvid,
|
||||
description: video.desc,
|
||||
uid: video.owner.mid,
|
||||
tags: null,
|
||||
title: video.title,
|
||||
published_at: published_at,
|
||||
} as AllDataType;
|
||||
});
|
||||
return data.data.archives;
|
||||
} catch (error) {
|
||||
logger.error(error as Error, "net", "getLatestVideos");
|
||||
return null;
|
||||
|
@ -9,7 +9,6 @@ import logger from "lib/log/logger.ts";
|
||||
export async function insertLatestVideos(
|
||||
client: Client,
|
||||
pageSize: number = 10,
|
||||
sleepRate: number = 250,
|
||||
intervalRate: number = 4000,
|
||||
): Promise<number | null> {
|
||||
const latestVideoTimestamp = await getLatestVideoTimestampFromAllData(client);
|
||||
@ -27,7 +26,7 @@ export async function insertLatestVideos(
|
||||
for (const video of videoIndex) {
|
||||
const videoExists = await videoExistsInAllData(client, video.aid);
|
||||
if (!videoExists) {
|
||||
insertIntoAllData(client, video);
|
||||
await insertIntoAllData(client, video);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
@ -37,7 +36,7 @@ export async function insertLatestVideos(
|
||||
const insertedVideos = new Set();
|
||||
while (true) {
|
||||
try {
|
||||
const videos = await getLatestVideos(page, pageSize, sleepRate);
|
||||
const videos = await getLatestVideos(page, pageSize);
|
||||
if (videos == null) {
|
||||
failCount++;
|
||||
if (failCount > 5) {
|
||||
@ -53,7 +52,7 @@ export async function insertLatestVideos(
|
||||
for (const video of videos) {
|
||||
const videoExists = await videoExistsInAllData(client, video.aid);
|
||||
if (!videoExists) {
|
||||
insertIntoAllData(client, video);
|
||||
await insertIntoAllData(client, video);
|
||||
insertedVideos.add(video.aid);
|
||||
}
|
||||
}
|
||||
@ -68,7 +67,7 @@ export async function insertLatestVideos(
|
||||
if (failCount > 5) {
|
||||
return null;
|
||||
}
|
||||
continue;
|
||||
|
||||
} finally {
|
||||
await sleep(Math.random() * intervalRate + failCount * 3 * SECOND + SECOND);
|
||||
}
|
||||
|
@ -2,13 +2,13 @@ import express from "express";
|
||||
import { createBullBoard } from "@bull-board/api";
|
||||
import { BullMQAdapter } from "@bull-board/api/bullMQAdapter.js";
|
||||
import { ExpressAdapter } from "@bull-board/express";
|
||||
import { LatestVideosQueue, VideoTagsQueue } from "lib/mq/index.ts";
|
||||
import { ClassifyVideoQueue, LatestVideosQueue, VideoTagsQueue } from "lib/mq/index.ts";
|
||||
|
||||
const serverAdapter = new ExpressAdapter();
|
||||
serverAdapter.setBasePath("/");
|
||||
|
||||
createBullBoard({
|
||||
queues: [new BullMQAdapter(LatestVideosQueue), new BullMQAdapter(VideoTagsQueue)],
|
||||
queues: [new BullMQAdapter(LatestVideosQueue), new BullMQAdapter(VideoTagsQueue), new BullMQAdapter(ClassifyVideoQueue)],
|
||||
serverAdapter: serverAdapter,
|
||||
});
|
||||
|
||||
|
@ -1,25 +1,26 @@
|
||||
import { Job, Worker } from "bullmq";
|
||||
import { redis } from "lib/db/redis.ts";
|
||||
import logger from "lib/log/logger.ts";
|
||||
import { WorkerError } from "src/worker.ts";
|
||||
import { classifyVideosWorker, classifyVideoWorker } from "lib/mq/exec/classifyVideo.ts";
|
||||
import { WorkerError } from "../lib/mq/schema.ts";
|
||||
|
||||
const filterWorker = new Worker(
|
||||
"classifyVideo",
|
||||
async (job: Job) => {
|
||||
switch (job.name) {
|
||||
case "classifyVideo":
|
||||
return await getVideoTagsWorker(job);
|
||||
return await classifyVideoWorker(job);
|
||||
case "classifyVideos":
|
||||
return await getVideoTagsInitializer();
|
||||
return await classifyVideosWorker();
|
||||
default:
|
||||
break;
|
||||
}
|
||||
},
|
||||
{ connection: redis, concurrency: 1, removeOnComplete: { count: 1440 } },
|
||||
{ connection: redis, concurrency: 4, removeOnComplete: { count: 1000 } },
|
||||
);
|
||||
|
||||
filterWorker.on("active", () => {
|
||||
logger.log("Worker activated.", "mq");
|
||||
logger.log("Worker (filter) activated.", "mq");
|
||||
});
|
||||
|
||||
filterWorker.on("error", (err) => {
|
||||
|
@ -5,19 +5,7 @@ import logger from "lib/log/logger.ts";
|
||||
import { getVideoTagsWorker } from "lib/mq/exec/getVideoTags.ts";
|
||||
import { getVideoTagsInitializer } from "lib/mq/exec/getVideoTags.ts";
|
||||
import { lockManager } from "lib/mq/lockManager.ts";
|
||||
|
||||
export class WorkerError extends Error {
|
||||
public service?: string;
|
||||
public codePath?: string;
|
||||
public rawError: Error;
|
||||
constructor(rawError: Error, service?: string, codePath?: string) {
|
||||
super(rawError.message);
|
||||
this.name = "WorkerFailure";
|
||||
this.codePath = codePath;
|
||||
this.service = service;
|
||||
this.rawError = rawError;
|
||||
}
|
||||
}
|
||||
import { WorkerError } from "../lib/mq/schema.ts";
|
||||
|
||||
Deno.addSignalListener("SIGINT", async () => {
|
||||
logger.log("SIGINT Received: Shutting down workers...", "mq");
|
||||
@ -48,7 +36,7 @@ const latestVideoWorker = new Worker(
|
||||
);
|
||||
|
||||
latestVideoWorker.on("active", () => {
|
||||
logger.log("Worker activated.", "mq");
|
||||
logger.log("Worker (latestVideos) activated.", "mq");
|
||||
});
|
||||
|
||||
latestVideoWorker.on("error", (err) => {
|
||||
@ -82,7 +70,7 @@ const videoTagsWorker = new Worker(
|
||||
);
|
||||
|
||||
videoTagsWorker.on("active", () => {
|
||||
logger.log("Worker activated.", "mq");
|
||||
logger.log("Worker (videoTags) activated.", "mq");
|
||||
});
|
||||
|
||||
videoTagsWorker.on("error", (err) => {
|
||||
|
Loading…
Reference in New Issue
Block a user