add: snapshot feat for new videos and songs near milestone

This commit is contained in:
alikia2x (寒寒) 2025-03-09 05:35:20 +08:00
parent fa414e89ce
commit 5cc2afeb0e
Signed by: alikia2x
GPG Key ID: 56209E0CCD8420C6
16 changed files with 434 additions and 104 deletions

View File

@ -10,7 +10,7 @@
"build": "deno run -A dev.ts build",
"preview": "deno run -A main.ts",
"update": "deno run -A -r https://fresh.deno.dev/update .",
"worker:main": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/worker.ts",
"worker:main": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write --allow-run ./src/worker.ts",
"worker:filter": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/filterWorker.ts",
"adder": "deno run --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts",
"bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts",

View File

@ -1,58 +0,0 @@
import torch
import torch.nn as nn
class VideoClassifierV3(nn.Module):
def __init__(self, embedding_dim=1024, hidden_dim=256, output_dim=3):
super().__init__()
self.num_channels = 4
self.channel_names = ['title', 'description', 'tags', 'author_info']
# 改进1带温度系数的通道权重比原始固定权重更灵活
self.channel_weights = nn.Parameter(torch.ones(self.num_channels))
self.temperature = 2.0 # 可调节的平滑系数
# 改进2更稳健的全连接结构
self.fc = nn.Sequential(
nn.Linear(embedding_dim * self.num_channels, hidden_dim*2),
nn.BatchNorm1d(hidden_dim*2),
nn.Dropout(0.2),
nn.ReLU(),
nn.Linear(hidden_dim*2, hidden_dim),
nn.LayerNorm(hidden_dim),
nn.Linear(hidden_dim, output_dim)
)
# 改进3输出层初始化
nn.init.xavier_uniform_(self.fc[-1].weight)
nn.init.zeros_(self.fc[-1].bias)
def forward(self, input_texts, sentence_transformer):
# 合并所有通道文本进行批量编码
all_texts = [text for channel in self.channel_names for text in input_texts[channel]]
# 使用SentenceTransformer生成嵌入保持冻结
with torch.no_grad():
task = "classification"
embeddings = torch.tensor(
sentence_transformer.encode(all_texts, task=task),
device=next(self.parameters()).device
)
# 分割嵌入并加权
split_sizes = [len(input_texts[name]) for name in self.channel_names]
channel_features = torch.split(embeddings, split_sizes, dim=0)
channel_features = torch.stack(channel_features, dim=1) # [batch, 4, 1024]
# 改进4带温度系数的softmax加权
weights = torch.softmax(self.channel_weights / self.temperature, dim=0)
weighted_features = channel_features * weights.unsqueeze(0).unsqueeze(-1)
# 拼接特征
combined = weighted_features.view(weighted_features.size(0), -1)
# 全连接层
return self.fc(combined)
def get_channel_weights(self):
"""获取各通道权重(带温度调节)"""
return torch.softmax(self.channel_weights / self.temperature, dim=0).detach().cpu().numpy()

13
lib/db/schema.d.ts vendored
View File

@ -18,3 +18,16 @@ export interface BiliUserType {
desc: string;
fans: number;
}
export interface VideoSnapshotType {
id: number;
created_at: string;
views: number;
coins: number;
likes: number;
favorites: number;
shares: number;
danmakus: number;
aid: bigint;
replies: number;
}

57
lib/db/snapshot.ts Normal file
View File

@ -0,0 +1,57 @@
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { VideoSnapshotType } from "lib/db/schema.d.ts";
export async function getSongsNearMilestone(client: Client) {
const queryResult = await client.queryObject<VideoSnapshotType>(`
WITH max_views_per_aid AS (
-- aid views aid songs
SELECT
vs.aid,
MAX(vs.views) AS max_views
FROM
video_snapshot vs
INNER JOIN
songs s
ON
vs.aid = s.aid
GROUP BY
vs.aid
),
filtered_max_views AS (
-- views
SELECT
aid,
max_views
FROM
max_views_per_aid
WHERE
(max_views >= 90000 AND max_views < 100000) OR
(max_views >= 900000 AND max_views < 1000000)
)
--
SELECT
vs.*
FROM
video_snapshot vs
INNER JOIN
filtered_max_views fmv
ON
vs.aid = fmv.aid AND vs.views = fmv.max_views
`);
return queryResult.rows.map((row) => {
return {
...row,
aid: Number(row.aid),
}
});
}
export async function getUnsnapshotedSongs(client: Client) {
const queryResult = await client.queryObject<{aid: bigint}>(`
SELECT DISTINCT s.aid
FROM songs s
LEFT JOIN video_snapshot v ON s.aid = v.aid
WHERE v.aid IS NULL;
`);
return queryResult.rows.map(row => Number(row.aid));
}

View File

@ -1,7 +1,7 @@
import { Job } from "bullmq";
import { queueLatestVideos } from "lib/mq/task/queueLatestVideo.ts";
import { db } from "lib/db/init.ts";
import { insertVideoInfo } from "lib/mq/task/getVideoInfo.ts";
import { insertVideoInfo } from "lib/mq/task/getVideoDetails.ts";
import { collectSongs } from "lib/mq/task/collectSongs.ts";
export const getLatestVideosWorker = async (_job: Job): Promise<void> => {

176
lib/mq/exec/snapshotTick.ts Normal file
View File

@ -0,0 +1,176 @@
import { Job } from "bullmq";
import { MINUTE, SECOND } from "$std/datetime/constants.ts";
import { db } from "lib/db/init.ts";
import { getSongsNearMilestone, getUnsnapshotedSongs } from "lib/db/snapshot.ts";
import { SnapshotQueue } from "lib/mq/index.ts";
import { insertVideoStats } from "lib/mq/task/getVideoStats.ts";
import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts";
import { redis } from "lib/db/redis.ts";
import { NetSchedulerError } from "lib/mq/scheduler.ts";
import logger from "lib/log/logger.ts";
async function snapshotScheduled(aid: number) {
try {
return await redis.exists(`cvsa:snapshot:${aid}`);
} catch {
logger.error(`Failed to check scheduled status for ${aid}`, "mq");
return false;
}
}
async function setSnapshotScheduled(aid: number, value: boolean, exp: number) {
try {
if (value) {
await redis.set(`cvsa:snapshot:${aid}`, 1, "EX", exp);
} else {
await redis.del(`cvsa:snapshot:${aid}`);
}
} catch {
logger.error(`Failed to set scheduled status to ${value} for ${aid}`, "mq");
}
}
interface SongNearMilestone {
aid: number;
id: number;
created_at: string;
views: number;
coins: number;
likes: number;
favorites: number;
shares: number;
danmakus: number;
replies: number;
}
async function processMilestoneSnapshots(vidoesNearMilestone: SongNearMilestone[]) {
let i = 0;
for (const snapshot of vidoesNearMilestone) {
if (await snapshotScheduled(snapshot.aid)) {
continue;
}
const factor = Math.floor(i / 8);
const delayTime = factor * SECOND * 2;
SnapshotQueue.add("snapshotMilestoneVideo", {
aid: snapshot.aid,
currentViews: snapshot.views,
snapshotedAt: snapshot.created_at,
}, { delay: delayTime });
await setSnapshotScheduled(snapshot.aid, true, 20 * 60);
i++;
}
}
async function processUnsnapshotedVideos(unsnapshotedVideos: number[]) {
let i = 0;
for (const aid of unsnapshotedVideos) {
if (await snapshotScheduled(aid)) {
continue;
}
const factor = Math.floor(i / 5);
const delayTime = factor * SECOND * 4;
SnapshotQueue.add("snapshotVideo", {
aid,
}, { delay: delayTime });
await setSnapshotScheduled(aid, true, 6 * 60 * 60);
i++;
}
}
export const snapshotTickWorker = async (_job: Job) => {
const client = await db.connect();
try {
const vidoesNearMilestone = await getSongsNearMilestone(client);
await processMilestoneSnapshots(vidoesNearMilestone);
const unsnapshotedVideos = await getUnsnapshotedSongs(client);
await processUnsnapshotedVideos(unsnapshotedVideos);
} finally {
client.release();
}
};
export const takeSnapshotForMilestoneVideoWorker = async (job: Job) => {
const client = await db.connect();
await setSnapshotScheduled(job.data.aid, true, 20 * 60);
try {
const { aid, currentViews, lastSnapshoted } = job.data;
const stat = await insertVideoStats(client, aid, "snapshotMilestoneVideo");
if (stat == null) {
setSnapshotScheduled(aid, false, 0);
return;
}
const nextMilestone = currentViews >= 100000 ? 1000000 : 100000;
if (stat.views >= nextMilestone) {
setSnapshotScheduled(aid, false, 0);
return;
}
const intervalSeconds = (Date.now() - parseTimestampFromPsql(lastSnapshoted)) / SECOND;
const viewsIncrement = stat.views - currentViews;
const incrementSpeed = viewsIncrement / intervalSeconds;
const viewsToIncrease = nextMilestone - stat.views;
const eta = viewsToIncrease / incrementSpeed;
const scheduledNextSnapshotDelay = eta * SECOND / 3;
const maxInterval = 20 * MINUTE;
const delay = Math.min(scheduledNextSnapshotDelay, maxInterval);
SnapshotQueue.add("snapshotMilestoneVideo", {
aid,
currentViews: stat.views,
snapshotedAt: stat.time,
}, { delay });
} catch (e) {
if (e instanceof NetSchedulerError && e.code === "NO_AVAILABLE_PROXY") {
logger.warn(
`No available proxy for aid ${job.data.aid}.`,
"mq",
"fn:takeSnapshotForMilestoneVideoWorker",
);
SnapshotQueue.add("snapshotMilestoneVideo", {
aid: job.data.aid,
currentViews: job.data.currentViews,
snapshotedAt: job.data.snapshotedAt,
}, { delay: 5 * SECOND });
return;
}
throw e;
} finally {
client.release();
}
};
export const takeSnapshotForVideoWorker = async (job: Job) => {
const client = await db.connect();
await setSnapshotScheduled(job.data.aid, true, 6 * 60 * 60);
try {
const { aid } = job.data;
const stat = await insertVideoStats(client, aid, "getVideoInfo");
if (stat == null) {
setSnapshotScheduled(aid, false, 0);
return;
}
const nearMilestone = (stat.views >= 90000 && stat.views < 100000) ||
(stat.views >= 900000 && stat.views < 1000000);
if (nearMilestone) {
SnapshotQueue.add("snapshotMilestoneVideo", {
aid,
currentViews: stat.views,
snapshotedAt: stat.time,
}, { delay: 0 });
}
} catch (e) {
if (e instanceof NetSchedulerError && e.code === "NO_AVAILABLE_PROXY") {
logger.warn(
`No available proxy for aid ${job.data.aid}.`,
"mq",
"fn:takeSnapshotForMilestoneVideoWorker",
);
SnapshotQueue.add("snapshotVideo", {
aid: job.data.aid,
}, { delay: 10 * SECOND });
return;
}
throw e;
} finally {
client.release();
}
};

View File

@ -3,3 +3,5 @@ import { Queue } from "bullmq";
export const LatestVideosQueue = new Queue("latestVideos");
export const ClassifyVideoQueue = new Queue("classifyVideo");
export const SnapshotQueue = new Queue("snapshot");

View File

@ -1,17 +1,21 @@
import { MINUTE } from "$std/datetime/constants.ts";
import { ClassifyVideoQueue, LatestVideosQueue } from "lib/mq/index.ts";
import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "lib/mq/index.ts";
import logger from "lib/log/logger.ts";
export async function initMQ() {
await LatestVideosQueue.upsertJobScheduler("getLatestVideos", {
every: 1 * MINUTE,
immediately: true,
});
await ClassifyVideoQueue.upsertJobScheduler("classifyVideos", {
every: 5 * MINUTE,
immediately: true,
});
await LatestVideosQueue.upsertJobScheduler("collectSongs", {
// await LatestVideosQueue.upsertJobScheduler("getLatestVideos", {
// every: 1 * MINUTE,
// immediately: true,
// });
// await ClassifyVideoQueue.upsertJobScheduler("classifyVideos", {
// every: 5 * MINUTE,
// immediately: true,
// });
// await LatestVideosQueue.upsertJobScheduler("collectSongs", {
// every: 3 * MINUTE,
// immediately: true,
// });
await SnapshotQueue.upsertJobScheduler("scheduleSnapshotTick", {
every: 3 * MINUTE,
immediately: true,
});

View File

@ -110,11 +110,10 @@ class NetScheduler {
async triggerLimiter(task: string, proxy: string): Promise<void> {
const limiterId = "proxy-" + proxy + "-" + task;
if (!this.proxyLimiters[limiterId]) {
return;
}
const providerLimiterId = "provider-" + proxy + "-" + this.tasks[task].provider;
try {
await this.proxyLimiters[limiterId].trigger();
await this.proxyLimiters[limiterId]?.trigger();
await this.providerLimiters[providerLimiterId]?.trigger();
} catch (e) {
const error = e as Error;
if (e instanceof Redis.ReplyError) {
@ -215,8 +214,10 @@ class NetScheduler {
const provider = task.provider;
const proxyLimiterId = "proxy-" + proxyName + "-" + task;
const providerLimiterId = "provider-" + proxyName + "-" + provider;
if (!this.proxyLimiters[proxyLimiterId] || !this.providerLimiters[providerLimiterId]) {
return true;
if (!this.proxyLimiters[proxyLimiterId]) {
const providerLimiter = this.providerLimiters[providerLimiterId];
const providerAvailable = await providerLimiter.getAvailability();
return providerAvailable;
}
const proxyLimiter = this.proxyLimiters[proxyLimiterId];
const providerLimiter = this.providerLimiters[providerLimiterId];
@ -253,6 +254,7 @@ class NetScheduler {
}
private async alicloudFcRequest<R>(url: string, region: string): Promise<R> {
try {
const decoder = new TextDecoder();
const output = await new Deno.Command("aliyun", {
args: [
@ -269,7 +271,6 @@ class NetScheduler {
`CVSA-${region}`,
],
}).output();
try {
const out = decoder.decode(output.stdout);
const rawData = JSON.parse(out);
if (rawData.statusCode !== 200) {
@ -278,7 +279,7 @@ class NetScheduler {
"ALICLOUD_PROXY_ERR",
);
} else {
return JSON.parse(rawData.body) as R;
return JSON.parse(JSON.parse(rawData.body)) as R;
}
} catch (e) {
throw new NetSchedulerError(`Unhandled error: Cannot proxy ${url} to ali-fc.`, "ALICLOUD_PROXY_ERR", e);
@ -308,7 +309,7 @@ const videoInfoRateLimiterConfig: RateLimiterConfig[] = [
const biliLimiterConfig: RateLimiterConfig[] = [
{
window: new SlidingWindow(redis, 1),
max: 5,
max: 6,
},
{
window: new SlidingWindow(redis, 30),
@ -316,14 +317,51 @@ const biliLimiterConfig: RateLimiterConfig[] = [
},
{
window: new SlidingWindow(redis, 5 * 60),
max: 180,
max: 200,
},
];
/*
Execution order for setup:
1. addProxy(proxyName, type, data):
- Must be called first. Registers proxies in the system, making them available for tasks.
- Define all proxies before proceeding to define tasks or set up limiters.
2. addTask(taskName, provider, proxies):
- Call after addProxy. Defines tasks and associates them with providers and proxies.
- Relies on proxies being already added.
- Must be called before setting task-specific or provider-specific limiters.
3. setTaskLimiter(taskName, config):
- Call after addProxy and addTask. Configures rate limiters specifically for tasks and their associated proxies.
- Depends on tasks and proxies being defined to apply limiters correctly.
4. setProviderLimiter(providerName, config):
- Call after addProxy and addTask. Sets rate limiters at the provider level, affecting all proxies used by tasks of that provider.
- Depends on tasks and proxies being defined to identify which proxies to apply provider-level limiters to.
In summary: addProxy -> addTask -> (setTaskLimiter and/or setProviderLimiter).
The order of setTaskLimiter and setProviderLimiter relative to each other is flexible,
but both should come after addProxy and addTask to ensure proper setup and dependencies are met.
*/
netScheduler.addProxy("native", "native", "");
netScheduler.addTask("getVideoInfo", "bilibili", "all");
netScheduler.addTask("getLatestVideos", "bilibili", "all");
for (const region of ["shanghai", "hangzhou", "qingdao", "beijing", "zhangjiakou", "chengdu", "shenzhen", "hohhot"]) {
netScheduler.addProxy(`alicloud-${region}`, "alicloud-fc", region);
}
netScheduler.addTask("getVideoInfo", "bilibili", ["native"]);
netScheduler.addTask("getLatestVideos", "bilibili", ["native"]);
netScheduler.addTask("snapshotMilestoneVideo", "bilibili", "all");
netScheduler.addTask("snapshotVideo", "bilibili", [
"alicloud-qingdao",
"alicloud-shanghai",
"alicloud-zhangjiakou",
"alicloud-chengdu",
"alicloud-shenzhen",
"alicloud-hohhot",
]);
netScheduler.setTaskLimiter("getVideoInfo", videoInfoRateLimiterConfig);
netScheduler.setTaskLimiter("getLatestVideos", null);
netScheduler.setTaskLimiter("snapshotMilestoneVideo", null);
netScheduler.setTaskLimiter("snapshotVideo", videoInfoRateLimiterConfig);
netScheduler.setProviderLimiter("bilibili", biliLimiterConfig);
export default netScheduler;

View File

@ -1,5 +1,5 @@
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { getVideoInfo } from "lib/net/getVideoInfo.ts";
import { getVideoDetails } from "lib/net/getVideoDetails.ts";
import { formatTimestampToPsql } from "lib/utils/formatTimestampToPostgre.ts";
import logger from "lib/log/logger.ts";
import { ClassifyVideoQueue } from "lib/mq/index.ts";
@ -10,7 +10,7 @@ export async function insertVideoInfo(client: Client, aid: number) {
if (videoExists) {
return;
}
const data = await getVideoInfo(aid);
const data = await getVideoDetails(aid);
if (data === null) {
return null;
}

View File

@ -0,0 +1,32 @@
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { getVideoInfo } from "lib/net/getVideoInfo.ts";
export async function insertVideoStats(client: Client, aid: number, task: string) {
const data = await getVideoInfo(aid, task);
const time = new Date().getTime();
if (data === null) {
return null;
}
const views = data.stat.view;
const danmakus = data.stat.danmaku;
const replies = data.stat.reply;
const likes = data.stat.like;
const coins = data.stat.coin;
const shares = data.stat.share;
const favorites = data.stat.favorite;
await client.queryObject(`
INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`, [aid, views, danmakus, replies, likes, coins, shares, favorites]);
return {
aid,
views,
danmakus,
replies,
likes,
coins,
shares,
favorites,
time
}
}

21
lib/net/bilibili.d.ts vendored
View File

@ -8,6 +8,27 @@ interface BaseResponse<T> {
export type VideoListResponse = BaseResponse<VideoListData>;
export type VideoDetailsResponse = BaseResponse<VideoDetailsData>;
export type VideoTagsResponse = BaseResponse<VideoTagsData>;
export type VideoInfoResponse = BaseResponse<VideoInfoData>;
interface VideoInfoData {
bvid: string;
aid: number;
copyright: number;
pic: string;
title: string;
pubdate: number;
ctime: number;
desc: string;
desc_v2: string;
state: number;
duration: number;
owner: {
mid: number;
name: string;
face: string;
},
stat: VideoStats,
}
interface VideoDetailsData {
View: {

View File

@ -0,0 +1,14 @@
import netScheduler from "lib/mq/scheduler.ts";
import { VideoDetailsData, VideoDetailsResponse } from "lib/net/bilibili.d.ts";
import logger from "lib/log/logger.ts";
export async function getVideoDetails(aid: number): Promise<VideoDetailsData | null> {
const url = `https://api.bilibili.com/x/web-interface/view/detail?aid=${aid}`;
const data = await netScheduler.request<VideoDetailsResponse>(url, "getVideoInfo");
const errMessage = `Error fetching metadata for ${aid}:`;
if (data.code !== 0) {
logger.error(errMessage + data.message, "net", "fn:getVideoInfo");
return null;
}
return data.data;
}

View File

@ -1,10 +1,10 @@
import netScheduler from "lib/mq/scheduler.ts";
import { VideoDetailsData, VideoDetailsResponse } from "lib/net/bilibili.d.ts";
import { VideoInfoData, VideoInfoResponse } from "lib/net/bilibili.d.ts";
import logger from "lib/log/logger.ts";
export async function getVideoInfo(aid: number): Promise<VideoDetailsData | null> {
const url = `https://api.bilibili.com/x/web-interface/view/detail?aid=${aid}`;
const data = await netScheduler.request<VideoDetailsResponse>(url, "getVideoInfo");
export async function getVideoInfo(aid: number, task: string): Promise<VideoInfoData | null> {
const url = `https://api.bilibili.com/x/web-interface/view?aid=${aid}`;
const data = await netScheduler.request<VideoInfoResponse>(url, task);
const errMessage = `Error fetching metadata for ${aid}:`;
if (data.code !== 0) {
logger.error(errMessage + data.message, "net", "fn:getVideoInfo");

View File

@ -2,7 +2,7 @@ import express from "express";
import { createBullBoard } from "@bull-board/api";
import { BullMQAdapter } from "@bull-board/api/bullMQAdapter.js";
import { ExpressAdapter } from "@bull-board/express";
import { ClassifyVideoQueue, LatestVideosQueue } from "lib/mq/index.ts";
import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "lib/mq/index.ts";
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath("/");
@ -11,6 +11,7 @@ createBullBoard({
queues: [
new BullMQAdapter(LatestVideosQueue),
new BullMQAdapter(ClassifyVideoQueue),
new BullMQAdapter(SnapshotQueue),
],
serverAdapter: serverAdapter,
});

View File

@ -5,6 +5,7 @@ import logger from "lib/log/logger.ts";
import { lockManager } from "lib/mq/lockManager.ts";
import { WorkerError } from "lib/mq/schema.ts";
import { getVideoInfoWorker } from "lib/mq/exec/getLatestVideos.ts";
import { snapshotTickWorker, takeSnapshotForMilestoneVideoWorker, takeSnapshotForVideoWorker } from "lib/mq/exec/snapshotTick.ts";
Deno.addSignalListener("SIGINT", async () => {
logger.log("SIGINT Received: Shutting down workers...", "mq");
@ -50,3 +51,32 @@ latestVideoWorker.on("error", (err) => {
latestVideoWorker.on("closed", async () => {
await lockManager.releaseLock("getLatestVideos");
});
const snapshotWorker = new Worker(
"snapshot",
async (job: Job) => {
switch (job.name) {
case "scheduleSnapshotTick":
await snapshotTickWorker(job);
break;
case "snapshotMilestoneVideo":
await takeSnapshotForMilestoneVideoWorker(job);
break;
case "snapshotVideo":
await takeSnapshotForVideoWorker(job);
break;
default:
break;
}
},
{ connection: redis, concurrency: 20, removeOnComplete: { count: 1440 } },
);
snapshotWorker.on("active", () => {
logger.log("Worker (snapshot) activated.", "mq");
})
snapshotWorker.on("error", (err) => {
const e = err as WorkerError;
logger.error(e.rawError, e.service, e.codePath);
})