diff --git a/.zed/settings.json b/.zed/settings.json index 97f9ab8..a58d028 100644 --- a/.zed/settings.json +++ b/.zed/settings.json @@ -3,33 +3,33 @@ // For a full list of overridable settings, and general information on folder-specific settings, // see the documentation: https://zed.dev/docs/configuring-zed#settings-files { - "lsp": { - "deno": { - "settings": { - "deno": { - "enable": true - } - } - } - }, - "languages": { - "TypeScript": { - "language_servers": [ - "deno", - "!typescript-language-server", - "!vtsls", - "!eslint" - ], - "formatter": "language_server" - }, - "TSX": { - "language_servers": [ - "deno", - "!typescript-language-server", - "!vtsls", - "!eslint" - ], - "formatter": "language_server" - } - } + "lsp": { + "deno": { + "settings": { + "deno": { + "enable": true + } + } + } + }, + "languages": { + "TypeScript": { + "language_servers": [ + "deno", + "!typescript-language-server", + "!vtsls", + "!eslint" + ], + "formatter": "language_server" + }, + "TSX": { + "language_servers": [ + "deno", + "!typescript-language-server", + "!vtsls", + "!eslint" + ], + "formatter": "language_server" + } + } } diff --git a/lib/db/allData.ts b/lib/db/allData.ts index ddcb804..00fe22e 100644 --- a/lib/db/allData.ts +++ b/lib/db/allData.ts @@ -3,7 +3,10 @@ import { AllDataType, BiliUserType } from "lib/db/schema.d.ts"; import Akari from "lib/ml/akari.ts"; export async function videoExistsInAllData(client: Client, aid: number) { - return await client.queryObject<{ exists: boolean }>(`SELECT EXISTS(SELECT 1 FROM bilibili_metadata WHERE aid = $1)`, [aid]) + return await client.queryObject<{ exists: boolean }>( + `SELECT EXISTS(SELECT 1 FROM bilibili_metadata WHERE aid = $1)`, + [aid], + ) .then((result) => result.rows[0].exists); } diff --git a/lib/db/schema.d.ts b/lib/db/schema.d.ts index d93f736..983389c 100644 --- a/lib/db/schema.d.ts +++ b/lib/db/schema.d.ts @@ -31,3 +31,13 @@ export interface VideoSnapshotType { aid: bigint; replies: number; } + +export interface SnapshotScheduleType { + id: number; + aid: number; + type?: string; + created_at: string; + started_at?: string; + finished_at?: string; + status: string; +} diff --git a/lib/db/snapshotSchedule.ts b/lib/db/snapshotSchedule.ts index 583d06a..8fd54fc 100644 --- a/lib/db/snapshotSchedule.ts +++ b/lib/db/snapshotSchedule.ts @@ -1,4 +1,7 @@ +import { DAY, MINUTE } from "$std/datetime/constants.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; +import { formatTimestampToPsql } from "lib/utils/formatTimestampToPostgre.ts"; +import { SnapshotScheduleType } from "./schema.d.ts"; /* Returns true if the specified `aid` has at least one record with "pending" or "processing" status. @@ -22,11 +25,12 @@ export async function findClosestSnapshot( targetTime: Date, ): Promise { const query = ` - SELECT created_at, views FROM video_snapshot + SELECT created_at, views + FROM video_snapshot WHERE aid = $1 - ORDER BY ABS(EXTRACT(EPOCH FROM (created_at - $2::timestamptz))) ASC + ORDER BY ABS(EXTRACT(EPOCH FROM (created_at - $2::timestamptz))) LIMIT 1 - `; + `; const result = await client.queryObject<{ created_at: string; views: number }>( query, [aid, targetTime.toISOString()], @@ -39,7 +43,7 @@ export async function findClosestSnapshot( }; } -export async function getLatestSnapshot(client: Client, aid: number): Promise{ +export async function getLatestSnapshot(client: Client, aid: number): Promise { const res = await client.queryObject<{ created_at: string; views: number }>( `SELECT created_at, views FROM video_snapshot WHERE aid = $1 ORDER BY created_at DESC LIMIT 1`, [aid], @@ -49,5 +53,112 @@ export async function getLatestSnapshot(client: Client, aid: number): Promise(query, [startTimeString, endTimeString]); + return res.rows[0].count; +} + +/* + * Creates a new snapshot schedule record. + * @param client The database client. + * @param aid The aid of the video. + * @param targetTime Scheduled time for snapshot. (Timestamp in milliseconds) + */ +export async function scheduleSnapshot(client: Client, aid: number, type: string, targetTime: number) { + const ajustedTime = await adjustSnapshotTime(client, new Date(targetTime)); + return client.queryObject( + `INSERT INTO snapshot_schedule (aid, type, started_at) VALUES ($1, $2, $3)`, + [aid, type, ajustedTime.toISOString()], + ); +} + +/** + * Adjust the trigger time of the snapshot to ensure it does not exceed the frequency limit + * @param client PostgreSQL client + * @param expectedStartTime The expected snapshot time + * @returns The adjusted actual snapshot time + */ +export async function adjustSnapshotTime( + client: Client, + expectedStartTime: Date, +): Promise { + const findWindowQuery = ` + WITH windows AS ( + SELECT generate_series( + $1::timestamp, -- Start time: current time truncated to the nearest 5-minute window + $2::timestamp, -- End time: 24 hours after the target time window starts + INTERVAL '5 MINUTES' + ) AS window_start + ) + SELECT w.window_start + FROM windows w + LEFT JOIN snapshot_schedule s ON s.started_at >= w.window_start + AND s.started_at < w.window_start + INTERVAL '5 MINUTES' + AND s.status = 'pending' + GROUP BY w.window_start + HAVING COUNT(s.*) < 2000 + ORDER BY w.window_start + LIMIT 1; + `; + for (let i = 0; i < 7; i++) { + const now = new Date(new Date().getTime() + 5 * MINUTE); + const nowTruncated = truncateTo5MinInterval(now); + const currentWindowStart = truncateTo5MinInterval(expectedStartTime); + const end = new Date(currentWindowStart.getTime() + 1 * DAY); + + const windowResult = await client.queryObject<{ window_start: Date }>( + findWindowQuery, + [nowTruncated, end], + ); + + const windowStart = windowResult.rows[0]?.window_start; + if (!windowStart) { + continue; + } + + return windowStart; + } + return expectedStartTime; +} + +/** + * Truncate the timestamp to the nearest 5-minute interval + * @param timestamp The timestamp + * @returns The truncated time + */ +function truncateTo5MinInterval(timestamp: Date): Date { + const minutes = timestamp.getMinutes() - (timestamp.getMinutes() % 5); + return new Date( + timestamp.getFullYear(), + timestamp.getMonth(), + timestamp.getDate(), + timestamp.getHours(), + minutes, + 0, + 0, + ); +} + +export async function getSnapshotsInNextSecond(client: Client) { + const res = await client.queryObject( + `SELECT * FROM cvsa.public.snapshot_schedule WHERE started_at <= NOW() + INTERVAL '1 second'`, + [], + ); + return res.rows; } diff --git a/lib/ml/benchmark.ts b/lib/ml/benchmark.ts index 3911c31..3fc76ac 100644 --- a/lib/ml/benchmark.ts +++ b/lib/ml/benchmark.ts @@ -1,7 +1,6 @@ import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers"; import * as ort from "onnxruntime"; - function softmax(logits: Float32Array): number[] { const maxLogit = Math.max(...logits); const exponents = logits.map((logit) => Math.exp(logit - maxLogit)); diff --git a/lib/ml/manager.ts b/lib/ml/manager.ts index 8f15513..8230fcf 100644 --- a/lib/ml/manager.ts +++ b/lib/ml/manager.ts @@ -21,10 +21,10 @@ export class AIManager { } } - public getModelSession(key: string): ort.InferenceSession { - if (this.sessions[key] === undefined) { - throw new WorkerError(new Error(`Model ${key} not found / not initialized.`), "ml", "fn:getModelSession"); - } + public getModelSession(key: string): ort.InferenceSession { + if (this.sessions[key] === undefined) { + throw new WorkerError(new Error(`Model ${key} not found / not initialized.`), "ml", "fn:getModelSession"); + } return this.sessions[key]; } diff --git a/lib/ml/mantis.ts b/lib/ml/mantis.ts index 59bc09a..6960be9 100644 --- a/lib/ml/mantis.ts +++ b/lib/ml/mantis.ts @@ -6,19 +6,16 @@ import { WorkerError } from "lib/mq/schema.ts"; const modelPath = "./model/model.onnx"; class MantisProto extends AIManager { - constructor() { super(); - this.models = { - "predictor": modelPath, - } + this.models = { + "predictor": modelPath, + }; } - public override async init(): Promise { - await super.init(); - } - - + public override async init(): Promise { + await super.init(); + } } const Mantis = new MantisProto(); diff --git a/lib/mq/exec/snapshotTick.ts b/lib/mq/exec/snapshotTick.ts index e72d11f..86de99b 100644 --- a/lib/mq/exec/snapshotTick.ts +++ b/lib/mq/exec/snapshotTick.ts @@ -1,21 +1,37 @@ import { Job } from "bullmq"; import { db } from "lib/db/init.ts"; import { getVideosNearMilestone } from "lib/db/snapshot.ts"; -import { findClosestSnapshot, getLatestSnapshot, videoHasActiveSchedule } from "lib/db/snapshotSchedule.ts"; +import { + findClosestSnapshot, + getLatestSnapshot, + getSnapshotsInNextSecond, + scheduleSnapshot, + videoHasActiveSchedule, +} from "lib/db/snapshotSchedule.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { HOUR, MINUTE } from "$std/datetime/constants.ts"; +import logger from "lib/log/logger.ts"; +import { SnapshotQueue } from "lib/mq/index.ts"; + +const priorityMap: { [key: string]: number } = { + "milestone": 1, +}; export const snapshotTickWorker = async (_job: Job) => { const client = await db.connect(); try { - // TODO: implement + const schedules = await getSnapshotsInNextSecond(client); + for (const schedule of schedules) { + let priority = 3; + if (schedule.type && priorityMap[schedule.type]) + priority = priorityMap[schedule.type]; + await SnapshotQueue.add("snapshotVideo", { aid: schedule.aid, priority }); + } } finally { client.release(); } }; -const log = (a: number, b: number = 10) => Math.log(a) / Math.log(b); - export const closetMilestone = (views: number) => { if (views < 100000) return 100000; if (views < 1000000) return 1000000; @@ -24,6 +40,12 @@ export const closetMilestone = (views: number) => { const log = (value: number, base: number = 10) => Math.log(value) / Math.log(base); +/* + * Returns the minimum ETA in hours for the next snapshot + * @param client - Postgres client + * @param aid - aid of the video + * @returns ETA in hours + */ const getAdjustedShortTermETA = async (client: Client, aid: number) => { const latestSnapshot = await getLatestSnapshot(client, aid); // Immediately dispatch a snapshot if there is no snapshot yet @@ -61,10 +83,12 @@ export const collectMilestoneSnapshotsWorker = async (_job: Job) => { if (await videoHasActiveSchedule(client, video.aid)) continue; const eta = await getAdjustedShortTermETA(client, video.aid); if (eta > 72) continue; - // TODO: dispatch snapshot job + const now = Date.now(); + const targetTime = now + eta * HOUR; + await scheduleSnapshot(client, video.aid, "milestone", targetTime); } - } catch (_e) { - // + } catch (e) { + logger.error(e as Error, "mq", "fn:collectMilestoneSnapshotsWorker"); } finally { client.release(); } diff --git a/lib/mq/scheduler.ts b/lib/mq/scheduler.ts index 00c3a4e..94c9361 100644 --- a/lib/mq/scheduler.ts +++ b/lib/mq/scheduler.ts @@ -288,7 +288,11 @@ class NetScheduler { const fileId = randomUUID(); await Deno.writeFile(`./logs/files/${fileId}.stdout`, output.stdout); await Deno.writeFile(`./logs/files/${fileId}.stderr`, output.stderr); - logger.log(`Returned non-200 status code. Raw ouput saved to ./logs/files/${fileId}.stdout/stderr`, "net", "fn:alicloudFcRequest") + logger.log( + `Returned non-200 status code. Raw ouput saved to ./logs/files/${fileId}.stdout/stderr`, + "net", + "fn:alicloudFcRequest", + ); throw new NetSchedulerError( `Error proxying ${url} to ali-fc region ${region}, code: ${rawData.statusCode}.`, "ALICLOUD_PROXY_ERR", @@ -301,7 +305,11 @@ class NetScheduler { const fileId = randomUUID(); rawOutput && await Deno.writeFile(`./logs/files/${fileId}.stdout`, rawOutput); rawErr && await Deno.writeFile(`./logs/files/${fileId}.stderr`, rawErr); - logger.log(`Error occurred. Raw ouput saved to ./logs/files/${fileId}.stdout/stderr`, "net", "fn:alicloudFcRequest") + logger.log( + `Error occurred. Raw ouput saved to ./logs/files/${fileId}.stdout/stderr`, + "net", + "fn:alicloudFcRequest", + ); } logger.error(e as Error, "net", "fn:alicloudFcRequest"); throw new NetSchedulerError(`Unhandled error: Cannot proxy ${url} to ali-fc.`, "ALICLOUD_PROXY_ERR", e); diff --git a/lib/mq/task/getVideoStats.ts b/lib/mq/task/getVideoStats.ts index 274d1bb..6f85035 100644 --- a/lib/mq/task/getVideoStats.ts +++ b/lib/mq/task/getVideoStats.ts @@ -3,30 +3,33 @@ import { getVideoInfo } from "lib/net/getVideoInfo.ts"; export async function insertVideoStats(client: Client, aid: number, task: string) { const data = await getVideoInfo(aid, task); - const time = new Date().getTime(); - if (typeof data == 'number') { + const time = new Date().getTime(); + if (typeof data == "number") { return data; } - const views = data.stat.view; - const danmakus = data.stat.danmaku; - const replies = data.stat.reply; - const likes = data.stat.like; - const coins = data.stat.coin; - const shares = data.stat.share; - const favorites = data.stat.favorite; - await client.queryObject(` + const views = data.stat.view; + const danmakus = data.stat.danmaku; + const replies = data.stat.reply; + const likes = data.stat.like; + const coins = data.stat.coin; + const shares = data.stat.share; + const favorites = data.stat.favorite; + await client.queryObject( + ` INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) - `, [aid, views, danmakus, replies, likes, coins, shares, favorites]); - return { - aid, - views, - danmakus, - replies, - likes, - coins, - shares, - favorites, - time - } + `, + [aid, views, danmakus, replies, likes, coins, shares, favorites], + ); + return { + aid, + views, + danmakus, + replies, + likes, + coins, + shares, + favorites, + time, + }; } diff --git a/lib/net/bilibili.d.ts b/lib/net/bilibili.d.ts index 209b566..6a66ecc 100644 --- a/lib/net/bilibili.d.ts +++ b/lib/net/bilibili.d.ts @@ -26,8 +26,8 @@ interface VideoInfoData { mid: number; name: string; face: string; - }, - stat: VideoStats, + }; + stat: VideoStats; } interface VideoDetailsData { diff --git a/lib/utils/formatSeconds.ts b/lib/utils/formatSeconds.ts index 694f94c..491dfd6 100644 --- a/lib/utils/formatSeconds.ts +++ b/lib/utils/formatSeconds.ts @@ -1,6 +1,6 @@ export const formatSeconds = (seconds: number) => { if (seconds < 60) { - return `${(seconds).toFixed(1)}s`; + return `${seconds.toFixed(1)}s`; } if (seconds < 3600) { return `${Math.floor(seconds / 60)}m${(seconds % 60).toFixed(1)}s`; diff --git a/src/worker.ts b/src/worker.ts index 9523a42..da14706 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -5,7 +5,11 @@ import logger from "lib/log/logger.ts"; import { lockManager } from "lib/mq/lockManager.ts"; import { WorkerError } from "lib/mq/schema.ts"; import { getVideoInfoWorker } from "lib/mq/exec/getLatestVideos.ts"; -import { snapshotTickWorker, collectMilestoneSnapshotsWorker, takeSnapshotForVideoWorker } from "lib/mq/exec/snapshotTick.ts"; +import { + collectMilestoneSnapshotsWorker, + snapshotTickWorker, + takeSnapshotForVideoWorker, +} from "lib/mq/exec/snapshotTick.ts"; Deno.addSignalListener("SIGINT", async () => { logger.log("SIGINT Received: Shutting down workers...", "mq"); @@ -75,4 +79,4 @@ const snapshotWorker = new Worker( snapshotWorker.on("error", (err) => { const e = err as WorkerError; logger.error(e.rawError, e.service, e.codePath); -}) +}); diff --git a/test/ml/akari.json b/test/ml/akari.json index 7345078..9de1219 100644 --- a/test/ml/akari.json +++ b/test/ml/akari.json @@ -1,22 +1,22 @@ { - "test1": [ - { - "title": "【洛天依】《一花依世界》(2024重调版)|“抬头仰望,夜空多安详”【原创PV付】", - "desc": "本家:BV1Vs411H7JH\n作曲:LS\n作词:杏花包子\n调教:鬼面P\n混音:虎皮猫P\n演唱:洛天依\n曲绘:山下鸭鸭窝\n映像:阿妍\n——————————————————————\n本稿为同人二创,非本家重制", - "tags": "发现《一花依世界》, Vsinger创作激励计划, 洛天依, VOCALOID CHINA, 翻唱, 原创PV付, ACE虚拟歌姬, 中文VOCALOID, 国风电子, 一花依世界, ACE Studio, Vsinger创作激励计划2024冬季物语", - "label": 2 - }, - { - "title": "【鏡音レン】アカシア【VOCALOID Cover】", - "desc": "鏡音リン・レン 13th Anniversary\n\nMusic:BUMP OF CHICKEN https://youtu.be/BoZ0Zwab6Oc\nust:Maplestyle sm37853236\nOff Vocal: https://youtu.be/YMzrUzq1uX0\nSinger:鏡音レン\n\n氷雨ハルカ\nYoutube :https://t.co/8zuv6g7Acm\nniconico:https://t.co/C6DRfdYAp0\ntwitter :https://twitter.com/hisame_haruka\n\n転載禁止\nPlease do not reprint without my permission.", - "tags": "鏡音レン", - "label": 0 - }, - { - "title": "【洛天依原创曲】谪星【姆斯塔之谕】", - "desc": "谪星\n\n策划/世界观:听雨\n作词:听雨\n作曲/编曲:太白\n混音:虎皮猫\n人设:以木\n曲绘:Ar极光\n调校:哈士奇p\n视频:苏卿白", - "tags": "2025虚拟歌手贺岁纪, 洛天依, 原创歌曲, VOCALOID, 虚拟歌手, 原创音乐, 姆斯塔, 中文VOCALOID", - "label": 1 - } - ] + "test1": [ + { + "title": "【洛天依】《一花依世界》(2024重调版)|“抬头仰望,夜空多安详”【原创PV付】", + "desc": "本家:BV1Vs411H7JH\n作曲:LS\n作词:杏花包子\n调教:鬼面P\n混音:虎皮猫P\n演唱:洛天依\n曲绘:山下鸭鸭窝\n映像:阿妍\n——————————————————————\n本稿为同人二创,非本家重制", + "tags": "发现《一花依世界》, Vsinger创作激励计划, 洛天依, VOCALOID CHINA, 翻唱, 原创PV付, ACE虚拟歌姬, 中文VOCALOID, 国风电子, 一花依世界, ACE Studio, Vsinger创作激励计划2024冬季物语", + "label": 2 + }, + { + "title": "【鏡音レン】アカシア【VOCALOID Cover】", + "desc": "鏡音リン・レン 13th Anniversary\n\nMusic:BUMP OF CHICKEN https://youtu.be/BoZ0Zwab6Oc\nust:Maplestyle sm37853236\nOff Vocal: https://youtu.be/YMzrUzq1uX0\nSinger:鏡音レン\n\n氷雨ハルカ\nYoutube :https://t.co/8zuv6g7Acm\nniconico:https://t.co/C6DRfdYAp0\ntwitter :https://twitter.com/hisame_haruka\n\n転載禁止\nPlease do not reprint without my permission.", + "tags": "鏡音レン", + "label": 0 + }, + { + "title": "【洛天依原创曲】谪星【姆斯塔之谕】", + "desc": "谪星\n\n策划/世界观:听雨\n作词:听雨\n作曲/编曲:太白\n混音:虎皮猫\n人设:以木\n曲绘:Ar极光\n调校:哈士奇p\n视频:苏卿白", + "tags": "2025虚拟歌手贺岁纪, 洛天依, 原创歌曲, VOCALOID, 虚拟歌手, 原创音乐, 姆斯塔, 中文VOCALOID", + "label": 1 + } + ] } diff --git a/test/ml/akari.test.ts b/test/ml/akari.test.ts index 958f34d..f254a01 100644 --- a/test/ml/akari.test.ts +++ b/test/ml/akari.test.ts @@ -4,43 +4,43 @@ import { join } from "$std/path/join.ts"; import { SECOND } from "$std/datetime/constants.ts"; Deno.test("Akari AI - normal cases accuracy test", async () => { - const path = import.meta.dirname!; - const dataPath = join(path, "akari.json"); - const rawData = await Deno.readTextFile(dataPath); - const data = JSON.parse(rawData); - await Akari.init(); - for (const testCase of data.test1) { - const result = await Akari.classifyVideo( - testCase.title, - testCase.desc, - testCase.tags - ); - assertEquals(result, testCase.label); - } + const path = import.meta.dirname!; + const dataPath = join(path, "akari.json"); + const rawData = await Deno.readTextFile(dataPath); + const data = JSON.parse(rawData); + await Akari.init(); + for (const testCase of data.test1) { + const result = await Akari.classifyVideo( + testCase.title, + testCase.desc, + testCase.tags, + ); + assertEquals(result, testCase.label); + } }); Deno.test("Akari AI - performance test", async () => { - const path = import.meta.dirname!; - const dataPath = join(path, "akari.json"); - const rawData = await Deno.readTextFile(dataPath); - const data = JSON.parse(rawData); - await Akari.init(); - const N = 200; - const testCase = data.test1[0]; - const title = testCase.title; - const desc = testCase.desc; - const tags = testCase.tags; - const time = performance.now(); - for (let i = 0; i < N; i++){ - await Akari.classifyVideo( - title, - desc, - tags - ); - } - const end = performance.now(); - const elapsed = (end - time) / SECOND; - const throughput = N / elapsed; - assertGreaterOrEqual(throughput, 100); - console.log(`Akari AI throughput: ${throughput.toFixed(1)} samples / sec`) -}); \ No newline at end of file + const path = import.meta.dirname!; + const dataPath = join(path, "akari.json"); + const rawData = await Deno.readTextFile(dataPath); + const data = JSON.parse(rawData); + await Akari.init(); + const N = 200; + const testCase = data.test1[0]; + const title = testCase.title; + const desc = testCase.desc; + const tags = testCase.tags; + const time = performance.now(); + for (let i = 0; i < N; i++) { + await Akari.classifyVideo( + title, + desc, + tags, + ); + } + const end = performance.now(); + const elapsed = (end - time) / SECOND; + const throughput = N / elapsed; + assertGreaterOrEqual(throughput, 100); + console.log(`Akari AI throughput: ${throughput.toFixed(1)} samples / sec`); +});