diff --git a/deno.json b/deno.json index 7392dae..07a17bd 100644 --- a/deno.json +++ b/deno.json @@ -10,7 +10,7 @@ "build": "deno run -A dev.ts build", "preview": "deno run -A main.ts", "update": "deno run -A -r https://fresh.deno.dev/update .", - "worker:main": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/worker.ts", + "worker:main": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write --allow-run ./src/worker.ts", "worker:filter": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/filterWorker.ts", "adder": "deno run --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts", "bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts", diff --git a/filter/modelV3.py b/filter/modelV3.py deleted file mode 100644 index 24bf951..0000000 --- a/filter/modelV3.py +++ /dev/null @@ -1,58 +0,0 @@ -import torch -import torch.nn as nn - -class VideoClassifierV3(nn.Module): - def __init__(self, embedding_dim=1024, hidden_dim=256, output_dim=3): - super().__init__() - self.num_channels = 4 - self.channel_names = ['title', 'description', 'tags', 'author_info'] - - # 改进1:带温度系数的通道权重(比原始固定权重更灵活) - self.channel_weights = nn.Parameter(torch.ones(self.num_channels)) - self.temperature = 2.0 # 可调节的平滑系数 - - # 改进2:更稳健的全连接结构 - self.fc = nn.Sequential( - nn.Linear(embedding_dim * self.num_channels, hidden_dim*2), - nn.BatchNorm1d(hidden_dim*2), - nn.Dropout(0.2), - nn.ReLU(), - nn.Linear(hidden_dim*2, hidden_dim), - nn.LayerNorm(hidden_dim), - nn.Linear(hidden_dim, output_dim) - ) - - # 改进3:输出层初始化 - nn.init.xavier_uniform_(self.fc[-1].weight) - nn.init.zeros_(self.fc[-1].bias) - - def forward(self, input_texts, sentence_transformer): - # 合并所有通道文本进行批量编码 - all_texts = [text for channel in self.channel_names for text in input_texts[channel]] - - # 使用SentenceTransformer生成嵌入(保持冻结) - with torch.no_grad(): - task = "classification" - embeddings = torch.tensor( - sentence_transformer.encode(all_texts, task=task), - device=next(self.parameters()).device - ) - - # 分割嵌入并加权 - split_sizes = [len(input_texts[name]) for name in self.channel_names] - channel_features = torch.split(embeddings, split_sizes, dim=0) - channel_features = torch.stack(channel_features, dim=1) # [batch, 4, 1024] - - # 改进4:带温度系数的softmax加权 - weights = torch.softmax(self.channel_weights / self.temperature, dim=0) - weighted_features = channel_features * weights.unsqueeze(0).unsqueeze(-1) - - # 拼接特征 - combined = weighted_features.view(weighted_features.size(0), -1) - - # 全连接层 - return self.fc(combined) - - def get_channel_weights(self): - """获取各通道权重(带温度调节)""" - return torch.softmax(self.channel_weights / self.temperature, dim=0).detach().cpu().numpy() \ No newline at end of file diff --git a/lib/db/schema.d.ts b/lib/db/schema.d.ts index 3c02c3f..d93f736 100644 --- a/lib/db/schema.d.ts +++ b/lib/db/schema.d.ts @@ -18,3 +18,16 @@ export interface BiliUserType { desc: string; fans: number; } + +export interface VideoSnapshotType { + id: number; + created_at: string; + views: number; + coins: number; + likes: number; + favorites: number; + shares: number; + danmakus: number; + aid: bigint; + replies: number; +} diff --git a/lib/db/snapshot.ts b/lib/db/snapshot.ts new file mode 100644 index 0000000..9002894 --- /dev/null +++ b/lib/db/snapshot.ts @@ -0,0 +1,57 @@ +import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; +import { VideoSnapshotType } from "lib/db/schema.d.ts"; + +export async function getSongsNearMilestone(client: Client) { + const queryResult = await client.queryObject(` + WITH max_views_per_aid AS ( + -- 找出每个 aid 的最大 views 值,并确保 aid 存在于 songs 表中 + SELECT + vs.aid, + MAX(vs.views) AS max_views + FROM + video_snapshot vs + INNER JOIN + songs s + ON + vs.aid = s.aid + GROUP BY + vs.aid + ), + filtered_max_views AS ( + -- 筛选出满足条件的最大 views + SELECT + aid, + max_views + FROM + max_views_per_aid + WHERE + (max_views >= 90000 AND max_views < 100000) OR + (max_views >= 900000 AND max_views < 1000000) + ) + -- 获取符合条件的完整行数据 + SELECT + vs.* + FROM + video_snapshot vs + INNER JOIN + filtered_max_views fmv + ON + vs.aid = fmv.aid AND vs.views = fmv.max_views + `); + return queryResult.rows.map((row) => { + return { + ...row, + aid: Number(row.aid), + } + }); +} + +export async function getUnsnapshotedSongs(client: Client) { + const queryResult = await client.queryObject<{aid: bigint}>(` + SELECT DISTINCT s.aid + FROM songs s + LEFT JOIN video_snapshot v ON s.aid = v.aid + WHERE v.aid IS NULL; + `); + return queryResult.rows.map(row => Number(row.aid)); +} diff --git a/lib/mq/exec/getLatestVideos.ts b/lib/mq/exec/getLatestVideos.ts index 65067cd..34b5d1a 100644 --- a/lib/mq/exec/getLatestVideos.ts +++ b/lib/mq/exec/getLatestVideos.ts @@ -1,7 +1,7 @@ import { Job } from "bullmq"; import { queueLatestVideos } from "lib/mq/task/queueLatestVideo.ts"; import { db } from "lib/db/init.ts"; -import { insertVideoInfo } from "lib/mq/task/getVideoInfo.ts"; +import { insertVideoInfo } from "lib/mq/task/getVideoDetails.ts"; import { collectSongs } from "lib/mq/task/collectSongs.ts"; export const getLatestVideosWorker = async (_job: Job): Promise => { diff --git a/lib/mq/exec/snapshotTick.ts b/lib/mq/exec/snapshotTick.ts new file mode 100644 index 0000000..ad4faa4 --- /dev/null +++ b/lib/mq/exec/snapshotTick.ts @@ -0,0 +1,176 @@ +import { Job } from "bullmq"; +import { MINUTE, SECOND } from "$std/datetime/constants.ts"; +import { db } from "lib/db/init.ts"; +import { getSongsNearMilestone, getUnsnapshotedSongs } from "lib/db/snapshot.ts"; +import { SnapshotQueue } from "lib/mq/index.ts"; +import { insertVideoStats } from "lib/mq/task/getVideoStats.ts"; +import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts"; +import { redis } from "lib/db/redis.ts"; +import { NetSchedulerError } from "lib/mq/scheduler.ts"; +import logger from "lib/log/logger.ts"; + +async function snapshotScheduled(aid: number) { + try { + return await redis.exists(`cvsa:snapshot:${aid}`); + } catch { + logger.error(`Failed to check scheduled status for ${aid}`, "mq"); + return false; + } +} + +async function setSnapshotScheduled(aid: number, value: boolean, exp: number) { + try { + if (value) { + await redis.set(`cvsa:snapshot:${aid}`, 1, "EX", exp); + } else { + await redis.del(`cvsa:snapshot:${aid}`); + } + } catch { + logger.error(`Failed to set scheduled status to ${value} for ${aid}`, "mq"); + } +} + +interface SongNearMilestone { + aid: number; + id: number; + created_at: string; + views: number; + coins: number; + likes: number; + favorites: number; + shares: number; + danmakus: number; + replies: number; +} + +async function processMilestoneSnapshots(vidoesNearMilestone: SongNearMilestone[]) { + let i = 0; + for (const snapshot of vidoesNearMilestone) { + if (await snapshotScheduled(snapshot.aid)) { + continue; + } + const factor = Math.floor(i / 8); + const delayTime = factor * SECOND * 2; + SnapshotQueue.add("snapshotMilestoneVideo", { + aid: snapshot.aid, + currentViews: snapshot.views, + snapshotedAt: snapshot.created_at, + }, { delay: delayTime }); + await setSnapshotScheduled(snapshot.aid, true, 20 * 60); + i++; + } +} + +async function processUnsnapshotedVideos(unsnapshotedVideos: number[]) { + let i = 0; + for (const aid of unsnapshotedVideos) { + if (await snapshotScheduled(aid)) { + continue; + } + const factor = Math.floor(i / 5); + const delayTime = factor * SECOND * 4; + SnapshotQueue.add("snapshotVideo", { + aid, + }, { delay: delayTime }); + await setSnapshotScheduled(aid, true, 6 * 60 * 60); + i++; + } +} + +export const snapshotTickWorker = async (_job: Job) => { + const client = await db.connect(); + try { + const vidoesNearMilestone = await getSongsNearMilestone(client); + await processMilestoneSnapshots(vidoesNearMilestone); + + const unsnapshotedVideos = await getUnsnapshotedSongs(client); + await processUnsnapshotedVideos(unsnapshotedVideos); + } finally { + client.release(); + } +}; + +export const takeSnapshotForMilestoneVideoWorker = async (job: Job) => { + const client = await db.connect(); + await setSnapshotScheduled(job.data.aid, true, 20 * 60); + try { + const { aid, currentViews, lastSnapshoted } = job.data; + const stat = await insertVideoStats(client, aid, "snapshotMilestoneVideo"); + if (stat == null) { + setSnapshotScheduled(aid, false, 0); + return; + } + const nextMilestone = currentViews >= 100000 ? 1000000 : 100000; + if (stat.views >= nextMilestone) { + setSnapshotScheduled(aid, false, 0); + return; + } + const intervalSeconds = (Date.now() - parseTimestampFromPsql(lastSnapshoted)) / SECOND; + const viewsIncrement = stat.views - currentViews; + const incrementSpeed = viewsIncrement / intervalSeconds; + const viewsToIncrease = nextMilestone - stat.views; + const eta = viewsToIncrease / incrementSpeed; + const scheduledNextSnapshotDelay = eta * SECOND / 3; + const maxInterval = 20 * MINUTE; + const delay = Math.min(scheduledNextSnapshotDelay, maxInterval); + SnapshotQueue.add("snapshotMilestoneVideo", { + aid, + currentViews: stat.views, + snapshotedAt: stat.time, + }, { delay }); + } catch (e) { + if (e instanceof NetSchedulerError && e.code === "NO_AVAILABLE_PROXY") { + logger.warn( + `No available proxy for aid ${job.data.aid}.`, + "mq", + "fn:takeSnapshotForMilestoneVideoWorker", + ); + SnapshotQueue.add("snapshotMilestoneVideo", { + aid: job.data.aid, + currentViews: job.data.currentViews, + snapshotedAt: job.data.snapshotedAt, + }, { delay: 5 * SECOND }); + return; + } + throw e; + } finally { + client.release(); + } +}; + +export const takeSnapshotForVideoWorker = async (job: Job) => { + const client = await db.connect(); + await setSnapshotScheduled(job.data.aid, true, 6 * 60 * 60); + try { + const { aid } = job.data; + const stat = await insertVideoStats(client, aid, "getVideoInfo"); + if (stat == null) { + setSnapshotScheduled(aid, false, 0); + return; + } + const nearMilestone = (stat.views >= 90000 && stat.views < 100000) || + (stat.views >= 900000 && stat.views < 1000000); + if (nearMilestone) { + SnapshotQueue.add("snapshotMilestoneVideo", { + aid, + currentViews: stat.views, + snapshotedAt: stat.time, + }, { delay: 0 }); + } + } catch (e) { + if (e instanceof NetSchedulerError && e.code === "NO_AVAILABLE_PROXY") { + logger.warn( + `No available proxy for aid ${job.data.aid}.`, + "mq", + "fn:takeSnapshotForMilestoneVideoWorker", + ); + SnapshotQueue.add("snapshotVideo", { + aid: job.data.aid, + }, { delay: 10 * SECOND }); + return; + } + throw e; + } finally { + client.release(); + } +}; diff --git a/lib/mq/index.ts b/lib/mq/index.ts index 9a22495..ef8b0f2 100644 --- a/lib/mq/index.ts +++ b/lib/mq/index.ts @@ -3,3 +3,5 @@ import { Queue } from "bullmq"; export const LatestVideosQueue = new Queue("latestVideos"); export const ClassifyVideoQueue = new Queue("classifyVideo"); + +export const SnapshotQueue = new Queue("snapshot"); diff --git a/lib/mq/init.ts b/lib/mq/init.ts index 1073471..df9c94d 100644 --- a/lib/mq/init.ts +++ b/lib/mq/init.ts @@ -1,17 +1,21 @@ import { MINUTE } from "$std/datetime/constants.ts"; -import { ClassifyVideoQueue, LatestVideosQueue } from "lib/mq/index.ts"; +import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "lib/mq/index.ts"; import logger from "lib/log/logger.ts"; export async function initMQ() { - await LatestVideosQueue.upsertJobScheduler("getLatestVideos", { - every: 1 * MINUTE, - immediately: true, - }); - await ClassifyVideoQueue.upsertJobScheduler("classifyVideos", { - every: 5 * MINUTE, - immediately: true, - }); - await LatestVideosQueue.upsertJobScheduler("collectSongs", { + // await LatestVideosQueue.upsertJobScheduler("getLatestVideos", { + // every: 1 * MINUTE, + // immediately: true, + // }); + // await ClassifyVideoQueue.upsertJobScheduler("classifyVideos", { + // every: 5 * MINUTE, + // immediately: true, + // }); + // await LatestVideosQueue.upsertJobScheduler("collectSongs", { + // every: 3 * MINUTE, + // immediately: true, + // }); + await SnapshotQueue.upsertJobScheduler("scheduleSnapshotTick", { every: 3 * MINUTE, immediately: true, }); diff --git a/lib/mq/scheduler.ts b/lib/mq/scheduler.ts index 70a45c0..aa2d11f 100644 --- a/lib/mq/scheduler.ts +++ b/lib/mq/scheduler.ts @@ -110,11 +110,10 @@ class NetScheduler { async triggerLimiter(task: string, proxy: string): Promise { const limiterId = "proxy-" + proxy + "-" + task; - if (!this.proxyLimiters[limiterId]) { - return; - } + const providerLimiterId = "provider-" + proxy + "-" + this.tasks[task].provider; try { - await this.proxyLimiters[limiterId].trigger(); + await this.proxyLimiters[limiterId]?.trigger(); + await this.providerLimiters[providerLimiterId]?.trigger(); } catch (e) { const error = e as Error; if (e instanceof Redis.ReplyError) { @@ -215,8 +214,10 @@ class NetScheduler { const provider = task.provider; const proxyLimiterId = "proxy-" + proxyName + "-" + task; const providerLimiterId = "provider-" + proxyName + "-" + provider; - if (!this.proxyLimiters[proxyLimiterId] || !this.providerLimiters[providerLimiterId]) { - return true; + if (!this.proxyLimiters[proxyLimiterId]) { + const providerLimiter = this.providerLimiters[providerLimiterId]; + const providerAvailable = await providerLimiter.getAvailability(); + return providerAvailable; } const proxyLimiter = this.proxyLimiters[proxyLimiterId]; const providerLimiter = this.providerLimiters[providerLimiterId]; @@ -253,23 +254,23 @@ class NetScheduler { } private async alicloudFcRequest(url: string, region: string): Promise { - const decoder = new TextDecoder(); - const output = await new Deno.Command("aliyun", { - args: [ - "fc", - "POST", - `/2023-03-30/functions/proxy-${region}/invocations`, - "--qualifier", - "LATEST", - "--header", - "Content-Type=application/json;x-fc-invocation-type=Sync;x-fc-log-type=None;", - "--body", - JSON.stringify({ url: url }), - "--profile", - `CVSA-${region}`, - ], - }).output(); try { + const decoder = new TextDecoder(); + const output = await new Deno.Command("aliyun", { + args: [ + "fc", + "POST", + `/2023-03-30/functions/proxy-${region}/invocations`, + "--qualifier", + "LATEST", + "--header", + "Content-Type=application/json;x-fc-invocation-type=Sync;x-fc-log-type=None;", + "--body", + JSON.stringify({ url: url }), + "--profile", + `CVSA-${region}`, + ], + }).output(); const out = decoder.decode(output.stdout); const rawData = JSON.parse(out); if (rawData.statusCode !== 200) { @@ -278,7 +279,7 @@ class NetScheduler { "ALICLOUD_PROXY_ERR", ); } else { - return JSON.parse(rawData.body) as R; + return JSON.parse(JSON.parse(rawData.body)) as R; } } catch (e) { throw new NetSchedulerError(`Unhandled error: Cannot proxy ${url} to ali-fc.`, "ALICLOUD_PROXY_ERR", e); @@ -308,7 +309,7 @@ const videoInfoRateLimiterConfig: RateLimiterConfig[] = [ const biliLimiterConfig: RateLimiterConfig[] = [ { window: new SlidingWindow(redis, 1), - max: 5, + max: 6, }, { window: new SlidingWindow(redis, 30), @@ -316,14 +317,51 @@ const biliLimiterConfig: RateLimiterConfig[] = [ }, { window: new SlidingWindow(redis, 5 * 60), - max: 180, + max: 200, }, ]; + +/* +Execution order for setup: + +1. addProxy(proxyName, type, data): + - Must be called first. Registers proxies in the system, making them available for tasks. + - Define all proxies before proceeding to define tasks or set up limiters. +2. addTask(taskName, provider, proxies): + - Call after addProxy. Defines tasks and associates them with providers and proxies. + - Relies on proxies being already added. + - Must be called before setting task-specific or provider-specific limiters. +3. setTaskLimiter(taskName, config): + - Call after addProxy and addTask. Configures rate limiters specifically for tasks and their associated proxies. + - Depends on tasks and proxies being defined to apply limiters correctly. +4. setProviderLimiter(providerName, config): + - Call after addProxy and addTask. Sets rate limiters at the provider level, affecting all proxies used by tasks of that provider. + - Depends on tasks and proxies being defined to identify which proxies to apply provider-level limiters to. + +In summary: addProxy -> addTask -> (setTaskLimiter and/or setProviderLimiter). +The order of setTaskLimiter and setProviderLimiter relative to each other is flexible, +but both should come after addProxy and addTask to ensure proper setup and dependencies are met. +*/ + netScheduler.addProxy("native", "native", ""); -netScheduler.addTask("getVideoInfo", "bilibili", "all"); -netScheduler.addTask("getLatestVideos", "bilibili", "all"); +for (const region of ["shanghai", "hangzhou", "qingdao", "beijing", "zhangjiakou", "chengdu", "shenzhen", "hohhot"]) { + netScheduler.addProxy(`alicloud-${region}`, "alicloud-fc", region); +} +netScheduler.addTask("getVideoInfo", "bilibili", ["native"]); +netScheduler.addTask("getLatestVideos", "bilibili", ["native"]); +netScheduler.addTask("snapshotMilestoneVideo", "bilibili", "all"); +netScheduler.addTask("snapshotVideo", "bilibili", [ + "alicloud-qingdao", + "alicloud-shanghai", + "alicloud-zhangjiakou", + "alicloud-chengdu", + "alicloud-shenzhen", + "alicloud-hohhot", +]); netScheduler.setTaskLimiter("getVideoInfo", videoInfoRateLimiterConfig); netScheduler.setTaskLimiter("getLatestVideos", null); +netScheduler.setTaskLimiter("snapshotMilestoneVideo", null); +netScheduler.setTaskLimiter("snapshotVideo", videoInfoRateLimiterConfig); netScheduler.setProviderLimiter("bilibili", biliLimiterConfig); export default netScheduler; diff --git a/lib/mq/task/getVideoInfo.ts b/lib/mq/task/getVideoDetails.ts similarity index 94% rename from lib/mq/task/getVideoInfo.ts rename to lib/mq/task/getVideoDetails.ts index 6f0ba58..ead8dd0 100644 --- a/lib/mq/task/getVideoInfo.ts +++ b/lib/mq/task/getVideoDetails.ts @@ -1,5 +1,5 @@ import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; -import { getVideoInfo } from "lib/net/getVideoInfo.ts"; +import { getVideoDetails } from "lib/net/getVideoDetails.ts"; import { formatTimestampToPsql } from "lib/utils/formatTimestampToPostgre.ts"; import logger from "lib/log/logger.ts"; import { ClassifyVideoQueue } from "lib/mq/index.ts"; @@ -10,7 +10,7 @@ export async function insertVideoInfo(client: Client, aid: number) { if (videoExists) { return; } - const data = await getVideoInfo(aid); + const data = await getVideoDetails(aid); if (data === null) { return null; } diff --git a/lib/mq/task/getVideoStats.ts b/lib/mq/task/getVideoStats.ts new file mode 100644 index 0000000..dd9d1f4 --- /dev/null +++ b/lib/mq/task/getVideoStats.ts @@ -0,0 +1,32 @@ +import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; +import { getVideoInfo } from "lib/net/getVideoInfo.ts"; + +export async function insertVideoStats(client: Client, aid: number, task: string) { + const data = await getVideoInfo(aid, task); + const time = new Date().getTime(); + if (data === null) { + return null; + } + const views = data.stat.view; + const danmakus = data.stat.danmaku; + const replies = data.stat.reply; + const likes = data.stat.like; + const coins = data.stat.coin; + const shares = data.stat.share; + const favorites = data.stat.favorite; + await client.queryObject(` + INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + `, [aid, views, danmakus, replies, likes, coins, shares, favorites]); + return { + aid, + views, + danmakus, + replies, + likes, + coins, + shares, + favorites, + time + } +} diff --git a/lib/net/bilibili.d.ts b/lib/net/bilibili.d.ts index 16c70a0..209b566 100644 --- a/lib/net/bilibili.d.ts +++ b/lib/net/bilibili.d.ts @@ -8,6 +8,27 @@ interface BaseResponse { export type VideoListResponse = BaseResponse; export type VideoDetailsResponse = BaseResponse; export type VideoTagsResponse = BaseResponse; +export type VideoInfoResponse = BaseResponse; + +interface VideoInfoData { + bvid: string; + aid: number; + copyright: number; + pic: string; + title: string; + pubdate: number; + ctime: number; + desc: string; + desc_v2: string; + state: number; + duration: number; + owner: { + mid: number; + name: string; + face: string; + }, + stat: VideoStats, +} interface VideoDetailsData { View: { diff --git a/lib/net/getVideoDetails.ts b/lib/net/getVideoDetails.ts new file mode 100644 index 0000000..1cd7a01 --- /dev/null +++ b/lib/net/getVideoDetails.ts @@ -0,0 +1,14 @@ +import netScheduler from "lib/mq/scheduler.ts"; +import { VideoDetailsData, VideoDetailsResponse } from "lib/net/bilibili.d.ts"; +import logger from "lib/log/logger.ts"; + +export async function getVideoDetails(aid: number): Promise { + const url = `https://api.bilibili.com/x/web-interface/view/detail?aid=${aid}`; + const data = await netScheduler.request(url, "getVideoInfo"); + const errMessage = `Error fetching metadata for ${aid}:`; + if (data.code !== 0) { + logger.error(errMessage + data.message, "net", "fn:getVideoInfo"); + return null; + } + return data.data; +} diff --git a/lib/net/getVideoInfo.ts b/lib/net/getVideoInfo.ts index 0cd78ac..6540f05 100644 --- a/lib/net/getVideoInfo.ts +++ b/lib/net/getVideoInfo.ts @@ -1,10 +1,10 @@ import netScheduler from "lib/mq/scheduler.ts"; -import { VideoDetailsData, VideoDetailsResponse } from "lib/net/bilibili.d.ts"; +import { VideoInfoData, VideoInfoResponse } from "lib/net/bilibili.d.ts"; import logger from "lib/log/logger.ts"; -export async function getVideoInfo(aid: number): Promise { - const url = `https://api.bilibili.com/x/web-interface/view/detail?aid=${aid}`; - const data = await netScheduler.request(url, "getVideoInfo"); +export async function getVideoInfo(aid: number, task: string): Promise { + const url = `https://api.bilibili.com/x/web-interface/view?aid=${aid}`; + const data = await netScheduler.request(url, task); const errMessage = `Error fetching metadata for ${aid}:`; if (data.code !== 0) { logger.error(errMessage + data.message, "net", "fn:getVideoInfo"); diff --git a/src/bullui.ts b/src/bullui.ts index 407d1c5..acf8d3f 100644 --- a/src/bullui.ts +++ b/src/bullui.ts @@ -2,7 +2,7 @@ import express from "express"; import { createBullBoard } from "@bull-board/api"; import { BullMQAdapter } from "@bull-board/api/bullMQAdapter.js"; import { ExpressAdapter } from "@bull-board/express"; -import { ClassifyVideoQueue, LatestVideosQueue } from "lib/mq/index.ts"; +import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "lib/mq/index.ts"; const serverAdapter = new ExpressAdapter(); serverAdapter.setBasePath("/"); @@ -11,6 +11,7 @@ createBullBoard({ queues: [ new BullMQAdapter(LatestVideosQueue), new BullMQAdapter(ClassifyVideoQueue), + new BullMQAdapter(SnapshotQueue), ], serverAdapter: serverAdapter, }); diff --git a/src/worker.ts b/src/worker.ts index bc9af5b..51d3bf8 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -5,6 +5,7 @@ import logger from "lib/log/logger.ts"; import { lockManager } from "lib/mq/lockManager.ts"; import { WorkerError } from "lib/mq/schema.ts"; import { getVideoInfoWorker } from "lib/mq/exec/getLatestVideos.ts"; +import { snapshotTickWorker, takeSnapshotForMilestoneVideoWorker, takeSnapshotForVideoWorker } from "lib/mq/exec/snapshotTick.ts"; Deno.addSignalListener("SIGINT", async () => { logger.log("SIGINT Received: Shutting down workers...", "mq"); @@ -50,3 +51,32 @@ latestVideoWorker.on("error", (err) => { latestVideoWorker.on("closed", async () => { await lockManager.releaseLock("getLatestVideos"); }); + +const snapshotWorker = new Worker( + "snapshot", + async (job: Job) => { + switch (job.name) { + case "scheduleSnapshotTick": + await snapshotTickWorker(job); + break; + case "snapshotMilestoneVideo": + await takeSnapshotForMilestoneVideoWorker(job); + break; + case "snapshotVideo": + await takeSnapshotForVideoWorker(job); + break; + default: + break; + } + }, + { connection: redis, concurrency: 20, removeOnComplete: { count: 1440 } }, +); + +snapshotWorker.on("active", () => { + logger.log("Worker (snapshot) activated.", "mq"); +}) + +snapshotWorker.on("error", (err) => { + const e = err as WorkerError; + logger.error(e.rawError, e.service, e.codePath); +})