Compare commits

...

27 Commits

Author SHA1 Message Date
35d58be8fd
fix: may accessing a non-existent schedule 2025-03-23 21:42:05 +08:00
3a0dd26c68
fix: only 1 pending task at a time 2025-03-23 21:36:40 +08:00
9e764746fb
update: better task retrieval for snapshotTick
add: logging when snapshot the video
2025-03-23 20:49:41 +08:00
33c63fc29f
fix: NaN caused by not converting date string to timestamp 2025-03-23 20:41:13 +08:00
b654eb3643
fix: potential NaN as delay time when scheduling 2025-03-23 20:30:06 +08:00
7768a202b2
fix: failed to serialize bigint correctly 2025-03-23 20:23:00 +08:00
8652ac8fb7
fix: bugs of snapshot scheduling 2025-03-23 19:45:52 +08:00
18fc9752bb
fix: several bugs of snapshot scheduling 2025-03-23 18:46:25 +08:00
2c12310e8c
feat: snapshot based on persistent schedule 2025-03-23 17:44:59 +08:00
b201bfd64d
fix: add type assertions to suppress errors 2025-03-22 23:49:48 +08:00
e38dc96275
update: insertion of snapshot schedule 2025-03-22 20:51:28 +08:00
9f9ef800d1
merge: hotfix into ref/snapshot 2025-03-22 01:48:03 +08:00
e5534cda24
fix: incorrect filter condition that causes empty tags 2025-03-22 00:58:36 +08:00
559c63b434
update: more beautiful time interval formatting 2025-03-22 00:42:37 +08:00
1895d601d9
update: dynamic delay factor for snapshotMilestoneVideo 2025-03-22 00:40:00 +08:00
fabb77d98d
fix: inefficient SQL query for getting songs close to milestone 2025-03-22 00:28:47 +08:00
8158ce10c0
fix: inserting videos into songs table regardless of classified label 2025-03-21 21:06:01 +08:00
00b52c01f7
fix: unexpected column bvid when inserting to songs table 2025-03-21 20:51:34 +08:00
2e8ed7ce70
add: ETA estimation for short-term snapshot 2025-03-20 01:57:33 +08:00
cd8aa826e1
fix: prevent videos from being crawled for too long 2025-03-17 00:33:28 +08:00
b07d0c18f9
update: preparation for snapshotSchedule 2025-03-17 00:25:31 +08:00
a9ac8de547
fix: unhandled timezone mismatch when inserting to database 2025-03-16 14:23:11 +08:00
0ff1c78dcc
fix: incorrect timestamp unit when inserting to database 2025-03-16 14:00:49 +08:00
a6c8fd7f3f
ref: code structure related to AI 2025-03-16 01:23:10 +08:00
7104a95af9
ref: rename table all_data, bili_user to bilibili_metadata, bilibili_user 2025-03-15 21:27:19 +08:00
5af2236109
temp: remove the scheduleSnapshotTick job 2025-03-15 21:25:26 +08:00
93bdddc21e
fix: unexpectedly commented code to upsert jobs 2025-03-15 16:51:07 +08:00
27 changed files with 907 additions and 605 deletions

View File

@ -1,21 +1,24 @@
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { AllDataType, BiliUserType } from "lib/db/schema.d.ts"; import { AllDataType, BiliUserType } from "lib/db/schema.d.ts";
import { modelVersion } from "lib/ml/filter_inference.ts"; import Akari from "lib/ml/akari.ts";
export async function videoExistsInAllData(client: Client, aid: number) { export async function videoExistsInAllData(client: Client, aid: number) {
return await client.queryObject<{ exists: boolean }>(`SELECT EXISTS(SELECT 1 FROM all_data WHERE aid = $1)`, [aid]) return await client.queryObject<{ exists: boolean }>(
`SELECT EXISTS(SELECT 1 FROM bilibili_metadata WHERE aid = $1)`,
[aid],
)
.then((result) => result.rows[0].exists); .then((result) => result.rows[0].exists);
} }
export async function userExistsInBiliUsers(client: Client, uid: number) { export async function userExistsInBiliUsers(client: Client, uid: number) {
return await client.queryObject<{ exists: boolean }>(`SELECT EXISTS(SELECT 1 FROM bili_user WHERE uid = $1)`, [ return await client.queryObject<{ exists: boolean }>(`SELECT EXISTS(SELECT 1 FROM bilibili_user WHERE uid = $1)`, [
uid, uid,
]); ]);
} }
export async function getUnlabelledVideos(client: Client) { export async function getUnlabelledVideos(client: Client) {
const queryResult = await client.queryObject<{ aid: number }>( const queryResult = await client.queryObject<{ aid: number }>(
`SELECT a.aid FROM all_data a LEFT JOIN labelling_result l ON a.aid = l.aid WHERE l.aid IS NULL`, `SELECT a.aid FROM bilibili_metadata a LEFT JOIN labelling_result l ON a.aid = l.aid WHERE l.aid IS NULL`,
); );
return queryResult.rows.map((row) => row.aid); return queryResult.rows.map((row) => row.aid);
} }
@ -23,20 +26,20 @@ export async function getUnlabelledVideos(client: Client) {
export async function insertVideoLabel(client: Client, aid: number, label: number) { export async function insertVideoLabel(client: Client, aid: number, label: number) {
return await client.queryObject( return await client.queryObject(
`INSERT INTO labelling_result (aid, label, model_version) VALUES ($1, $2, $3) ON CONFLICT (aid, model_version) DO NOTHING`, `INSERT INTO labelling_result (aid, label, model_version) VALUES ($1, $2, $3) ON CONFLICT (aid, model_version) DO NOTHING`,
[aid, label, modelVersion], [aid, label, Akari.getModelVersion()],
); );
} }
export async function getVideoInfoFromAllData(client: Client, aid: number) { export async function getVideoInfoFromAllData(client: Client, aid: number) {
const queryResult = await client.queryObject<AllDataType>( const queryResult = await client.queryObject<AllDataType>(
`SELECT * FROM all_data WHERE aid = $1`, `SELECT * FROM bilibili_metadata WHERE aid = $1`,
[aid], [aid],
); );
const row = queryResult.rows[0]; const row = queryResult.rows[0];
let authorInfo = ""; let authorInfo = "";
if (row.uid && await userExistsInBiliUsers(client, row.uid)) { if (row.uid && await userExistsInBiliUsers(client, row.uid)) {
const q = await client.queryObject<BiliUserType>( const q = await client.queryObject<BiliUserType>(
`SELECT * FROM bili_user WHERE uid = $1`, `SELECT * FROM bilibili_user WHERE uid = $1`,
[row.uid], [row.uid],
); );
const userRow = q.rows[0]; const userRow = q.rows[0];
@ -56,8 +59,8 @@ export async function getUnArchivedBiliUsers(client: Client) {
const queryResult = await client.queryObject<{ uid: number }>( const queryResult = await client.queryObject<{ uid: number }>(
` `
SELECT ad.uid SELECT ad.uid
FROM all_data ad FROM bilibili_metadata ad
LEFT JOIN bili_user bu ON ad.uid = bu.uid LEFT JOIN bilibili_user bu ON ad.uid = bu.uid
WHERE bu.uid IS NULL; WHERE bu.uid IS NULL;
`, `,
[], [],
@ -65,3 +68,10 @@ export async function getUnArchivedBiliUsers(client: Client) {
const rows = queryResult.rows; const rows = queryResult.rows;
return rows.map((row) => row.uid); return rows.map((row) => row.uid);
} }
export async function setBiliVideoStatus(client: Client, aid: number, status: number) {
return await client.queryObject(
`UPDATE bilibili_metadata SET status = $1 WHERE aid = $2`,
[status, aid],
);
}

22
lib/db/schema.d.ts vendored
View File

@ -31,3 +31,25 @@ export interface VideoSnapshotType {
aid: bigint; aid: bigint;
replies: number; replies: number;
} }
export interface LatestSnapshotType {
aid: number;
time: number;
views: number;
danmakus: number;
replies: number;
likes: number;
coins: number;
shares: number;
favorites: number;
}
export interface SnapshotScheduleType {
id: number;
aid: number;
type?: string;
created_at: string;
started_at?: string;
finished_at?: string;
status: string;
}

View File

@ -1,44 +1,17 @@
import { DAY, SECOND } from "$std/datetime/constants.ts";
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { VideoSnapshotType } from "lib/db/schema.d.ts"; import { LatestSnapshotType } from "lib/db/schema.d.ts";
import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts";
export async function getSongsNearMilestone(client: Client) { export async function getVideosNearMilestone(client: Client) {
const queryResult = await client.queryObject<VideoSnapshotType>(` const queryResult = await client.queryObject<LatestSnapshotType>(`
WITH max_views_per_aid AS ( SELECT ls.*
-- aid views aid songs FROM latest_video_snapshot ls
SELECT
vs.aid,
MAX(vs.views) AS max_views
FROM
video_snapshot vs
INNER JOIN INNER JOIN
songs s songs s ON ls.aid = s.aid
ON
vs.aid = s.aid
GROUP BY
vs.aid
),
filtered_max_views AS (
-- views
SELECT
aid,
max_views
FROM
max_views_per_aid
WHERE WHERE
(max_views >= 90000 AND max_views < 100000) OR s.deleted = false AND
(max_views >= 900000 AND max_views < 1000000) (views >= 90000 AND views < 100000) OR
) (views >= 900000 AND views < 1000000) OR
-- (views >= 9900000 AND views < 10000000)
SELECT
vs.*
FROM
video_snapshot vs
INNER JOIN
filtered_max_views fmv
ON
vs.aid = fmv.aid AND vs.views = fmv.max_views
`); `);
return queryResult.rows.map((row) => { return queryResult.rows.map((row) => {
return { return {
@ -48,143 +21,20 @@ export async function getSongsNearMilestone(client: Client) {
}); });
} }
export async function getUnsnapshotedSongs(client: Client) { export async function getLatestVideoSnapshot(client: Client, aid: number): Promise<null | LatestSnapshotType> {
const queryResult = await client.queryObject<{ aid: bigint }>(` const queryResult = await client.queryObject<LatestSnapshotType>(`
SELECT DISTINCT s.aid SELECT *
FROM songs s FROM latest_video_snapshot
LEFT JOIN video_snapshot v ON s.aid = v.aid
WHERE v.aid IS NULL;
`);
return queryResult.rows.map((row) => Number(row.aid));
}
export async function getSongSnapshotCount(client: Client, aid: number) {
const queryResult = await client.queryObject<{ count: number }>(
`
SELECT COUNT(*) AS count
FROM video_snapshot
WHERE aid = $1;
`,
[aid],
);
return queryResult.rows[0].count;
}
export async function getShortTermEtaPrediction(client: Client, aid: number) {
const queryResult = await client.queryObject<{eta: number}>(
`
WITH old_snapshot AS (
SELECT created_at, views
FROM video_snapshot
WHERE aid = $1 AND
NOW() - created_at > '20 min'
ORDER BY created_at DESC
LIMIT 1
),
new_snapshot AS (
SELECT created_at, views
FROM video_snapshot
WHERE aid = $1 WHERE aid = $1
ORDER BY created_at DESC `, [aid]);
LIMIT 1
)
SELECT
CASE
WHEN n.views > 100000
THEN
(1000000 - n.views) -- Views remaining
/
(
(n.views - o.views) -- Views delta
/
(EXTRACT(EPOCH FROM (n.created_at - o.created_at)) + 0.001) -- Time delta in seconds
+ 0.001
) -- Increment per second
ELSE
(100000 - n.views) -- Views remaining
/
(
(n.views - o.views) -- Views delta
/
(EXTRACT(EPOCH FROM (n.created_at - o.created_at)) + 0.001) -- Time delta in seconds
+ 0.001
) -- Increment per second
END AS eta
FROM old_snapshot o, new_snapshot n;
`,
[aid],
);
if (queryResult.rows.length === 0) { if (queryResult.rows.length === 0) {
return null; return null;
} }
return queryResult.rows[0].eta; return queryResult.rows.map((row) => {
return {
...row,
aid: Number(row.aid),
time: new Date(row.time).getTime(),
} }
})[0];
export async function songEligibleForMilestoneSnapshot(client: Client, aid: number) {
const count = await getSongSnapshotCount(client, aid);
if (count < 2) {
return true;
}
const queryResult = await client.queryObject<
{ views1: number; created_at1: string; views2: number; created_at2: string }
>(
`
WITH latest_snapshot AS (
SELECT
aid,
views,
created_at
FROM video_snapshot
WHERE aid = $1
ORDER BY created_at DESC
LIMIT 1
),
pairs AS (
SELECT
a.views AS views1,
a.created_at AS created_at1,
b.views AS views2,
b.created_at AS created_at2,
(b.created_at - a.created_at) AS interval
FROM video_snapshot a
JOIN latest_snapshot b
ON a.aid = b.aid
AND a.created_at < b.created_at
)
SELECT
views1,
created_at1,
views2,
created_at2
FROM (
SELECT
*,
ROW_NUMBER() OVER (
ORDER BY
CASE WHEN interval <= INTERVAL '3 days' THEN 0 ELSE 1 END,
CASE WHEN interval <= INTERVAL '3 days' THEN -interval ELSE interval END
) AS rn
FROM pairs
) ranked
WHERE rn = 1;
`,
[aid],
);
if (queryResult.rows.length === 0) {
return true;
}
const recentViewsData = queryResult.rows[0];
const time1 = parseTimestampFromPsql(recentViewsData.created_at1);
const time2 = parseTimestampFromPsql(recentViewsData.created_at2);
const intervalSec = (time2 - time1) / SECOND;
const views1 = recentViewsData.views1;
const views2 = recentViewsData.views2;
const viewsDiff = views2 - views1;
if (viewsDiff == 0) {
return false;
}
const nextMilestone = views2 >= 100000 ? 1000000 : 100000;
const expectedViewsDiff = nextMilestone - views2;
const expectedIntervalSec = expectedViewsDiff / viewsDiff * intervalSec;
return expectedIntervalSec <= 3 * DAY;
} }

228
lib/db/snapshotSchedule.ts Normal file
View File

@ -0,0 +1,228 @@
import {DAY, HOUR, MINUTE} from "$std/datetime/constants.ts";
import {Client} from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import {formatTimestampToPsql} from "lib/utils/formatTimestampToPostgre.ts";
import {SnapshotScheduleType} from "./schema.d.ts";
import logger from "lib/log/logger.ts";
export async function snapshotScheduleExists(client: Client, id: number) {
const res = await client.queryObject<{ id: number }>(
`SELECT id FROM snapshot_schedule WHERE id = $1`,
[id],
);
return res.rows.length > 0;
}
/*
Returns true if the specified `aid` has at least one record with "pending" or "processing" status.
*/
export async function videoHasActiveSchedule(client: Client, aid: number) {
const res = await client.queryObject<{ status: string }>(
`SELECT status FROM snapshot_schedule WHERE aid = $1 AND (status = 'pending' OR status = 'processing')`,
[aid],
);
return res.rows.length > 0;
}
export async function videoHasProcessingSchedule(client: Client, aid: number) {
const res = await client.queryObject<{ status: string }>(
`SELECT status FROM snapshot_schedule WHERE aid = $1 AND status = 'processing'`,
[aid],
);
return res.rows.length > 0;
}
interface Snapshot {
created_at: number;
views: number;
}
export async function findClosestSnapshot(
client: Client,
aid: number,
targetTime: Date,
): Promise<Snapshot | null> {
const query = `
SELECT created_at, views
FROM video_snapshot
WHERE aid = $1
ORDER BY ABS(EXTRACT(EPOCH FROM (created_at - $2::timestamptz)))
LIMIT 1
`;
const result = await client.queryObject<{ created_at: string; views: number }>(
query,
[aid, targetTime.toISOString()],
);
if (result.rows.length === 0) return null;
const row = result.rows[0];
return {
created_at: new Date(row.created_at).getTime(),
views: row.views,
};
}
export async function hasAtLeast2Snapshots(client: Client, aid: number) {
const res = await client.queryObject<{ count: number }>(
`SELECT COUNT(*) FROM video_snapshot WHERE aid = $1`,
[aid],
);
return res.rows[0].count >= 2;
}
export async function getLatestSnapshot(client: Client, aid: number): Promise<Snapshot | null> {
const res = await client.queryObject<{ created_at: string; views: number }>(
`SELECT created_at, views FROM video_snapshot WHERE aid = $1 ORDER BY created_at DESC LIMIT 1`,
[aid],
);
if (res.rows.length === 0) return null;
const row = res.rows[0];
return {
created_at: new Date(row.created_at).getTime(),
views: row.views,
};
}
/*
* Returns the number of snapshot schedules within the specified range.
* @param client The database client.
* @param start The start time of the range. (Timestamp in milliseconds)
* @param end The end time of the range. (Timestamp in milliseconds)
*/
export async function getSnapshotScheduleCountWithinRange(client: Client, start: number, end: number) {
const startTimeString = formatTimestampToPsql(start);
const endTimeString = formatTimestampToPsql(end);
const query = `
SELECT COUNT(*) FROM snapshot_schedule
WHERE started_at BETWEEN $1 AND $2
AND status = 'pending'
`;
const res = await client.queryObject<{ count: number }>(query, [startTimeString, endTimeString]);
return res.rows[0].count;
}
/*
* Creates a new snapshot schedule record.
* @param client The database client.
* @param aid The aid of the video.
* @param targetTime Scheduled time for snapshot. (Timestamp in milliseconds)
*/
export async function scheduleSnapshot(client: Client, aid: number, type: string, targetTime: number) {
if (await videoHasActiveSchedule(client, aid)) return;
const allowedCount = type === "milestone" ? 2000 : 800;
const adjustedTime = await adjustSnapshotTime(client, new Date(targetTime), allowedCount);
logger.log(`Scheduled snapshot for ${aid} at ${adjustedTime.toISOString()}`, "mq", "fn:scheduleSnapshot");
return client.queryObject(
`INSERT INTO snapshot_schedule (aid, type, started_at) VALUES ($1, $2, $3)`,
[aid, type, adjustedTime.toISOString()],
);
}
/**
* Adjust the trigger time of the snapshot to ensure it does not exceed the frequency limit
* @param client PostgreSQL client
* @param expectedStartTime The expected snapshot time
* @param allowedCounts The number of snapshots allowed in the 5-minutes windows.
* @returns The adjusted actual snapshot time
*/
export async function adjustSnapshotTime(
client: Client,
expectedStartTime: Date,
allowedCounts: number = 2000
): Promise<Date> {
const findWindowQuery = `
WITH windows AS (
SELECT generate_series(
$1::timestamp, -- Start time: current time truncated to the nearest 5-minute window
$2::timestamp, -- End time: 24 hours after the target time window starts
INTERVAL '5 MINUTES'
) AS window_start
)
SELECT w.window_start
FROM windows w
LEFT JOIN snapshot_schedule s ON s.started_at >= w.window_start
AND s.started_at < w.window_start + INTERVAL '5 MINUTES'
AND s.status = 'pending'
GROUP BY w.window_start
HAVING COUNT(s.*) < ${allowedCounts}
ORDER BY w.window_start
LIMIT 1;
`;
const now = new Date();
const targetTime = expectedStartTime.getTime();
let start = new Date(targetTime - 2 * HOUR);
if (start.getTime() <= now.getTime()) {
start = now;
}
const startTruncated = truncateTo5MinInterval(start);
const end = new Date(startTruncated.getTime() + 1 * DAY);
const windowResult = await client.queryObject<{ window_start: Date }>(
findWindowQuery,
[startTruncated, end],
);
const windowStart = windowResult.rows[0]?.window_start;
if (!windowStart) {
return expectedStartTime;
}
if (windowStart.getTime() > new Date().getTime() + 5 * MINUTE) {
const randomDelay = Math.floor(Math.random() * 5 * MINUTE);
return new Date(windowStart.getTime() + randomDelay);
} else {
return expectedStartTime;
}
}
/**
* Truncate the timestamp to the nearest 5-minute interval
* @param timestamp The timestamp
* @returns The truncated time
*/
function truncateTo5MinInterval(timestamp: Date): Date {
const minutes = timestamp.getMinutes() - (timestamp.getMinutes() % 5);
return new Date(
timestamp.getFullYear(),
timestamp.getMonth(),
timestamp.getDate(),
timestamp.getHours(),
minutes,
0,
0,
);
}
export async function getSnapshotsInNextSecond(client: Client) {
const query = `
SELECT *
FROM snapshot_schedule
WHERE started_at <= NOW() + INTERVAL '1 seconds' AND status = 'pending'
ORDER BY
CASE
WHEN type = 'milestone' THEN 0
ELSE 1
END,
started_at
LIMIT 3;
`;
const res = await client.queryObject<SnapshotScheduleType>(query, []);
return res.rows;
}
export async function setSnapshotStatus(client: Client, id: number, status: string) {
return await client.queryObject(
`UPDATE snapshot_schedule SET status = $2 WHERE id = $1`,
[id, status],
);
}
export async function getVideosWithoutActiveSnapshotSchedule(client: Client) {
const query: string = `
SELECT s.aid
FROM songs s
LEFT JOIN snapshot_schedule ss ON s.aid = ss.aid AND (ss.status = 'pending' OR ss.status = 'processing')
WHERE ss.aid IS NULL
`;
const res = await client.queryObject<{ aid: number }>(query, []);
return res.rows.map((r) => Number(r.aid));
}

107
lib/ml/akari.ts Normal file
View File

@ -0,0 +1,107 @@
import { AIManager } from "lib/ml/manager.ts";
import * as ort from "onnxruntime";
import logger from "lib/log/logger.ts";
import { WorkerError } from "lib/mq/schema.ts";
import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers";
const tokenizerModel = "alikia2x/jina-embedding-v3-m2v-1024";
const onnxClassifierPath = "./model/akari/3.17.onnx";
const onnxEmbeddingPath = "./model/embedding/model.onnx";
class AkariProto extends AIManager {
private tokenizer: PreTrainedTokenizer | null = null;
private readonly modelVersion = "3.17";
constructor() {
super();
this.models = {
"classifier": onnxClassifierPath,
"embedding": onnxEmbeddingPath,
};
}
public override async init(): Promise<void> {
await super.init();
await this.initJinaTokenizer();
}
private tokenizerInitialized(): boolean {
return this.tokenizer !== null;
}
private getTokenizer(): PreTrainedTokenizer {
if (!this.tokenizerInitialized()) {
throw new Error("Tokenizer is not initialized. Call init() first.");
}
return this.tokenizer!;
}
private async initJinaTokenizer(): Promise<void> {
if (this.tokenizerInitialized()) {
return;
}
try {
this.tokenizer = await AutoTokenizer.from_pretrained(tokenizerModel);
logger.log("Tokenizer initialized", "ml");
} catch (error) {
throw new WorkerError(error as Error, "ml", "fn:initTokenizer");
}
}
private async getJinaEmbeddings1024(texts: string[]): Promise<number[]> {
const tokenizer = this.getTokenizer();
const session = this.getModelSession("embedding");
const { input_ids } = await tokenizer(texts, {
add_special_tokens: false,
return_tensor: false,
});
const cumsum = (arr: number[]): number[] =>
arr.reduce((acc: number[], num: number, i: number) => [...acc, num + (acc[i - 1] || 0)], []);
const offsets: number[] = [0, ...cumsum(input_ids.slice(0, -1).map((x: string) => x.length))];
const flattened_input_ids = input_ids.flat();
const inputs = {
input_ids: new ort.Tensor("int64", new BigInt64Array(flattened_input_ids.map(BigInt)), [
flattened_input_ids.length,
]),
offsets: new ort.Tensor("int64", new BigInt64Array(offsets.map(BigInt)), [offsets.length]),
};
const { embeddings } = await session.run(inputs);
return Array.from(embeddings.data as Float32Array);
}
private async runClassification(embeddings: number[]): Promise<number[]> {
const session = this.getModelSession("classifier");
const inputTensor = new ort.Tensor(
Float32Array.from(embeddings),
[1, 3, 1024],
);
const { logits } = await session.run({ channel_features: inputTensor });
return this.softmax(logits.data as Float32Array);
}
public async classifyVideo(title: string, description: string, tags: string, aid?: number): Promise<number> {
const embeddings = await this.getJinaEmbeddings1024([
title,
description,
tags,
]);
const probabilities = await this.runClassification(embeddings);
if (aid) {
logger.log(`Prediction result for aid: ${aid}: [${probabilities.map((p) => p.toFixed(5))}]`, "ml");
}
return probabilities.indexOf(Math.max(...probabilities));
}
public getModelVersion(): string {
return this.modelVersion;
}
}
const Akari = new AkariProto();
export default Akari;

View File

@ -1,6 +1,12 @@
import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers"; import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers";
import * as ort from "onnxruntime"; import * as ort from "onnxruntime";
import { softmax } from "lib/ml/filter_inference.ts";
function softmax(logits: Float32Array): number[] {
const maxLogit = Math.max(...logits);
const exponents = logits.map((logit) => Math.exp(logit - maxLogit));
const sumOfExponents = exponents.reduce((sum, exp) => sum + exp, 0);
return Array.from(exponents.map((exp) => exp / sumOfExponents));
}
// 配置参数 // 配置参数
const sentenceTransformerModelName = "alikia2x/jina-embedding-v3-m2v-1024"; const sentenceTransformerModelName = "alikia2x/jina-embedding-v3-m2v-1024";

View File

@ -1,99 +0,0 @@
import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers";
import * as ort from "onnxruntime";
import logger from "lib/log/logger.ts";
import { WorkerError } from "lib/mq/schema.ts";
const tokenizerModel = "alikia2x/jina-embedding-v3-m2v-1024";
const onnxClassifierPath = "./model/video_classifier_v3_17.onnx";
const onnxEmbeddingOriginalPath = "./model/model.onnx";
export const modelVersion = "3.17";
let sessionClassifier: ort.InferenceSession | null = null;
let sessionEmbedding: ort.InferenceSession | null = null;
let tokenizer: PreTrainedTokenizer | null = null;
export async function initializeModels() {
if (tokenizer && sessionClassifier && sessionEmbedding) {
return;
}
try {
tokenizer = await AutoTokenizer.from_pretrained(tokenizerModel);
const [classifierSession, embeddingSession] = await Promise.all([
ort.InferenceSession.create(onnxClassifierPath),
ort.InferenceSession.create(onnxEmbeddingOriginalPath),
]);
sessionClassifier = classifierSession;
sessionEmbedding = embeddingSession;
logger.log("Filter models initialized", "ml");
} catch (error) {
throw new WorkerError(error as Error, "ml", "fn:initializeModels");
}
}
export function softmax(logits: Float32Array): number[] {
const maxLogit = Math.max(...logits);
const exponents = logits.map((logit) => Math.exp(logit - maxLogit));
const sumOfExponents = exponents.reduce((sum, exp) => sum + exp, 0);
return Array.from(exponents.map((exp) => exp / sumOfExponents));
}
async function getONNXEmbeddings(texts: string[], session: ort.InferenceSession): Promise<number[]> {
if (!tokenizer) {
throw new Error("Tokenizer is not initialized. Call initializeModels() first.");
}
const { input_ids } = await tokenizer(texts, {
add_special_tokens: false,
return_tensor: false,
});
const cumsum = (arr: number[]): number[] =>
arr.reduce((acc: number[], num: number, i: number) => [...acc, num + (acc[i - 1] || 0)], []);
const offsets: number[] = [0, ...cumsum(input_ids.slice(0, -1).map((x: string) => x.length))];
const flattened_input_ids = input_ids.flat();
const inputs = {
input_ids: new ort.Tensor("int64", new BigInt64Array(flattened_input_ids.map(BigInt)), [
flattened_input_ids.length,
]),
offsets: new ort.Tensor("int64", new BigInt64Array(offsets.map(BigInt)), [offsets.length]),
};
const { embeddings } = await session.run(inputs);
return Array.from(embeddings.data as Float32Array);
}
async function runClassification(embeddings: number[]): Promise<number[]> {
if (!sessionClassifier) {
throw new Error("Classifier session is not initialized. Call initializeModels() first.");
}
const inputTensor = new ort.Tensor(
Float32Array.from(embeddings),
[1, 3, 1024],
);
const { logits } = await sessionClassifier.run({ channel_features: inputTensor });
return softmax(logits.data as Float32Array);
}
export async function classifyVideo(
title: string,
description: string,
tags: string,
aid: number,
): Promise<number> {
if (!sessionEmbedding) {
throw new Error("Embedding session is not initialized. Call initializeModels() first.");
}
const embeddings = await getONNXEmbeddings([
title,
description,
tags,
], sessionEmbedding);
const probabilities = await runClassification(embeddings);
logger.log(`Prediction result for aid: ${aid}: [${probabilities.map((p) => p.toFixed(5))}]`, "ml");
return probabilities.indexOf(Math.max(...probabilities));
}

37
lib/ml/manager.ts Normal file
View File

@ -0,0 +1,37 @@
import * as ort from "onnxruntime";
import logger from "lib/log/logger.ts";
import { WorkerError } from "lib/mq/schema.ts";
export class AIManager {
public sessions: { [key: string]: ort.InferenceSession } = {};
public models: { [key: string]: string } = {};
constructor() {
}
public async init() {
const modelKeys = Object.keys(this.models);
for (const key of modelKeys) {
try {
this.sessions[key] = await ort.InferenceSession.create(this.models[key]);
logger.log(`Model ${key} initialized`, "ml");
} catch (error) {
throw new WorkerError(error as Error, "ml", "fn:init");
}
}
}
public getModelSession(key: string): ort.InferenceSession {
if (this.sessions[key] === undefined) {
throw new WorkerError(new Error(`Model ${key} not found / not initialized.`), "ml", "fn:getModelSession");
}
return this.sessions[key];
}
public softmax(logits: Float32Array): number[] {
const maxLogit = Math.max(...logits);
const exponents = logits.map((logit) => Math.exp(logit - maxLogit));
const sumOfExponents = exponents.reduce((sum, exp) => sum + exp, 0);
return Array.from(exponents.map((exp) => exp / sumOfExponents));
}
}

22
lib/ml/mantis.ts Normal file
View File

@ -0,0 +1,22 @@
import { AIManager } from "lib/ml/manager.ts";
import * as ort from "onnxruntime";
import logger from "lib/log/logger.ts";
import { WorkerError } from "lib/mq/schema.ts";
const modelPath = "./model/model.onnx";
class MantisProto extends AIManager {
constructor() {
super();
this.models = {
"predictor": modelPath,
};
}
public override async init(): Promise<void> {
await super.init();
}
}
const Mantis = new MantisProto();
export default Mantis;

View File

@ -1,6 +1,12 @@
import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers"; import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers";
import * as ort from "onnxruntime"; import * as ort from "onnxruntime";
import { softmax } from "lib/ml/filter_inference.ts";
function softmax(logits: Float32Array): number[] {
const maxLogit = Math.max(...logits);
const exponents = logits.map((logit) => Math.exp(logit - maxLogit));
const sumOfExponents = exponents.reduce((sum, exp) => sum + exp, 0);
return Array.from(exponents.map((exp) => exp / sumOfExponents));
}
// 配置参数 // 配置参数
const sentenceTransformerModelName = "alikia2x/jina-embedding-v3-m2v-1024"; const sentenceTransformerModelName = "alikia2x/jina-embedding-v3-m2v-1024";

View File

@ -1,7 +1,7 @@
import { Job } from "bullmq"; import { Job } from "bullmq";
import { db } from "lib/db/init.ts"; import { db } from "lib/db/init.ts";
import { getUnlabelledVideos, getVideoInfoFromAllData, insertVideoLabel } from "lib/db/allData.ts"; import { getUnlabelledVideos, getVideoInfoFromAllData, insertVideoLabel } from "lib/db/allData.ts";
import { classifyVideo } from "lib/ml/filter_inference.ts"; import Akari from "lib/ml/akari.ts";
import { ClassifyVideoQueue } from "lib/mq/index.ts"; import { ClassifyVideoQueue } from "lib/mq/index.ts";
import logger from "lib/log/logger.ts"; import logger from "lib/log/logger.ts";
import { lockManager } from "lib/mq/lockManager.ts"; import { lockManager } from "lib/mq/lockManager.ts";
@ -19,14 +19,14 @@ export const classifyVideoWorker = async (job: Job) => {
const title = videoInfo.title?.trim() || "untitled"; const title = videoInfo.title?.trim() || "untitled";
const description = videoInfo.description?.trim() || "N/A"; const description = videoInfo.description?.trim() || "N/A";
const tags = videoInfo.tags?.trim() || "empty"; const tags = videoInfo.tags?.trim() || "empty";
const label = await classifyVideo(title, description, tags, aid); const label = await Akari.classifyVideo(title, description, tags, aid);
if (label == -1) { if (label == -1) {
logger.warn(`Failed to classify video ${aid}`, "ml"); logger.warn(`Failed to classify video ${aid}`, "ml");
} }
await insertVideoLabel(client, aid, label); await insertVideoLabel(client, aid, label);
const exists = await aidExistsInSongs(client, aid); const exists = await aidExistsInSongs(client, aid);
if (!exists) { if (!exists && label !== 0) {
await insertIntoSongs(client, aid); await insertIntoSongs(client, aid);
} }

View File

@ -1,228 +1,199 @@
import { Job } from "bullmq"; import { Job } from "bullmq";
import { MINUTE, SECOND } from "$std/datetime/constants.ts";
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { db } from "lib/db/init.ts"; import { db } from "lib/db/init.ts";
import {getLatestVideoSnapshot, getVideosNearMilestone} from "lib/db/snapshot.ts";
import { import {
getShortTermEtaPrediction, findClosestSnapshot,
getSongsNearMilestone, getLatestSnapshot,
getUnsnapshotedSongs, getSnapshotsInNextSecond, getVideosWithoutActiveSnapshotSchedule,
songEligibleForMilestoneSnapshot, hasAtLeast2Snapshots,
} from "lib/db/snapshot.ts"; scheduleSnapshot,
import { SnapshotQueue } from "lib/mq/index.ts"; setSnapshotStatus,
import { insertVideoStats } from "lib/mq/task/getVideoStats.ts"; snapshotScheduleExists,
import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts"; videoHasProcessingSchedule,
import { redis } from "lib/db/redis.ts"; } from "lib/db/snapshotSchedule.ts";
import { NetSchedulerError } from "lib/mq/scheduler.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { WEEK, HOUR, MINUTE, SECOND } from "$std/datetime/constants.ts";
import logger from "lib/log/logger.ts"; import logger from "lib/log/logger.ts";
import { formatSeconds } from "lib/utils/formatSeconds.ts"; import { SnapshotQueue } from "lib/mq/index.ts";
import { insertVideoSnapshot } from "lib/mq/task/getVideoStats.ts";
import { NetSchedulerError } from "lib/mq/scheduler.ts";
import { setBiliVideoStatus } from "lib/db/allData.ts";
import { truncate } from "lib/utils/truncate.ts"; import { truncate } from "lib/utils/truncate.ts";
async function snapshotScheduled(aid: number) { const priorityMap: { [key: string]: number } = {
try { "milestone": 1,
return await redis.exists(`cvsa:snapshot:${aid}`); "normal": 3,
} catch { };
logger.error(`Failed to check scheduled status for ${aid}`, "mq");
return false;
}
}
async function setSnapshotScheduled(aid: number, value: boolean, exp: number) { const snapshotTypeToTaskMap: { [key: string]: string } = {
try { "milestone": "snapshotMilestoneVideo",
if (value) { "normal": "snapshotVideo",
await redis.set(`cvsa:snapshot:${aid}`, 1, "EX", exp); };
} else {
await redis.del(`cvsa:snapshot:${aid}`);
}
} catch {
logger.error(`Failed to set scheduled status to ${value} for ${aid}`, "mq");
}
}
interface SongNearMilestone {
aid: number;
id: number;
created_at: string;
views: number;
coins: number;
likes: number;
favorites: number;
shares: number;
danmakus: number;
replies: number;
}
async function processMilestoneSnapshots(client: Client, vidoesNearMilestone: SongNearMilestone[]) {
let i = 0;
for (const snapshot of vidoesNearMilestone) {
if (await snapshotScheduled(snapshot.aid)) {
logger.silly(
`Video ${snapshot.aid} is already scheduled for snapshot`,
"mq",
"fn:processMilestoneSnapshots",
);
continue;
}
if (await songEligibleForMilestoneSnapshot(client, snapshot.aid) === false) {
logger.silly(
`Video ${snapshot.aid} is not eligible for milestone snapshot`,
"mq",
"fn:processMilestoneSnapshots",
);
continue;
}
const factor = Math.floor(i / 8);
const delayTime = factor * SECOND * 2;
await SnapshotQueue.add("snapshotMilestoneVideo", {
aid: snapshot.aid,
currentViews: snapshot.views,
snapshotedAt: snapshot.created_at,
}, { delay: delayTime, priority: 1 });
await setSnapshotScheduled(snapshot.aid, true, 20 * 60);
i++;
}
}
async function processUnsnapshotedVideos(unsnapshotedVideos: number[]) {
let i = 0;
for (const aid of unsnapshotedVideos) {
if (await snapshotScheduled(aid)) {
logger.silly(`Video ${aid} is already scheduled for snapshot`, "mq", "fn:processUnsnapshotedVideos");
continue;
}
const factor = Math.floor(i / 5);
const delayTime = factor * SECOND * 4;
await SnapshotQueue.add("snapshotVideo", {
aid,
}, { delay: delayTime, priority: 3 });
await setSnapshotScheduled(aid, true, 6 * 60 * 60);
i++;
}
}
export const snapshotTickWorker = async (_job: Job) => { export const snapshotTickWorker = async (_job: Job) => {
const client = await db.connect(); const client = await db.connect();
try { try {
const vidoesNearMilestone = await getSongsNearMilestone(client); const schedules = await getSnapshotsInNextSecond(client);
await processMilestoneSnapshots(client, vidoesNearMilestone); for (const schedule of schedules) {
let priority = 3;
const unsnapshotedVideos = await getUnsnapshotedSongs(client); if (schedule.type && priorityMap[schedule.type]) {
await processUnsnapshotedVideos(unsnapshotedVideos); priority = priorityMap[schedule.type];
}
const aid = Number(schedule.aid);
await SnapshotQueue.add("snapshotVideo", {
aid: aid,
id: Number(schedule.id),
type: schedule.type ?? "normal",
}, { priority });
}
} catch (e) {
logger.error(e as Error);
} finally { } finally {
client.release(); client.release();
} }
}; };
export const takeSnapshotForMilestoneVideoWorker = async (job: Job) => { export const closetMilestone = (views: number) => {
if (views < 100000) return 100000;
if (views < 1000000) return 1000000;
return 10000000;
};
const log = (value: number, base: number = 10) => Math.log(value) / Math.log(base);
/*
* Returns the minimum ETA in hours for the next snapshot
* @param client - Postgres client
* @param aid - aid of the video
* @returns ETA in hours
*/
const getAdjustedShortTermETA = async (client: Client, aid: number) => {
const latestSnapshot = await getLatestSnapshot(client, aid);
// Immediately dispatch a snapshot if there is no snapshot yet
if (!latestSnapshot) return 0;
const snapshotsEnough = await hasAtLeast2Snapshots(client, aid);
if (!snapshotsEnough) return 0;
const currentTimestamp = new Date().getTime();
const timeIntervals = [3 * MINUTE, 20 * MINUTE, 1 * HOUR, 3 * HOUR, 6 * HOUR];
const DELTA = 0.00001;
let minETAHours = Infinity;
for (const timeInterval of timeIntervals) {
const date = new Date(currentTimestamp - timeInterval);
const snapshot = await findClosestSnapshot(client, aid, date);
if (!snapshot) continue;
const hoursDiff = (latestSnapshot.created_at - snapshot.created_at) / HOUR;
const viewsDiff = latestSnapshot.views - snapshot.views;
if (viewsDiff <= 0) continue;
const speed = viewsDiff / (hoursDiff + DELTA);
const target = closetMilestone(latestSnapshot.views);
const viewsToIncrease = target - latestSnapshot.views;
const eta = viewsToIncrease / (speed + DELTA);
let factor = log(2.97 / log(viewsToIncrease + 1), 1.14);
factor = truncate(factor, 3, 100);
const adjustedETA = eta / factor;
if (adjustedETA < minETAHours) {
minETAHours = adjustedETA;
}
}
if (isNaN(minETAHours)) {
minETAHours = Infinity;
}
return minETAHours;
};
export const collectMilestoneSnapshotsWorker = async (_job: Job) => {
const client = await db.connect(); const client = await db.connect();
await setSnapshotScheduled(job.data.aid, true, 20 * 60);
try { try {
const aid: number = job.data.aid; const videos = await getVideosNearMilestone(client);
const currentViews: number = job.data.currentViews; for (const video of videos) {
const lastSnapshoted: string = job.data.snapshotedAt; const aid = Number(video.aid);
const stat = await insertVideoStats(client, aid, "snapshotMilestoneVideo"); const eta = await getAdjustedShortTermETA(client, aid);
if (typeof stat === "number") { if (eta > 72) continue;
if (stat === -404 || stat === 62002 || stat == 62012) { const now = Date.now();
await setSnapshotScheduled(aid, true, 6 * 60 * 60); const scheduledNextSnapshotDelay = eta * HOUR;
} else { const maxInterval = 4 * HOUR;
await setSnapshotScheduled(aid, false, 0);
}
return;
}
const nextMilestone = currentViews >= 100000 ? 1000000 : 100000;
if (stat.views >= nextMilestone) {
await setSnapshotScheduled(aid, false, 0);
return;
}
let eta = await getShortTermEtaPrediction(client, aid);
if (eta === null) {
const DELTA = 0.001;
const intervalSeconds = (Date.now() - parseTimestampFromPsql(lastSnapshoted)) / SECOND;
const viewsIncrement = stat.views - currentViews;
const incrementSpeed = viewsIncrement / (intervalSeconds + DELTA);
const viewsToIncrease = nextMilestone - stat.views;
eta = viewsToIncrease / (incrementSpeed + DELTA);
}
const scheduledNextSnapshotDelay = eta * SECOND / 3;
const maxInterval = 20 * MINUTE;
const minInterval = 1 * SECOND; const minInterval = 1 * SECOND;
const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval); const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval);
await SnapshotQueue.add("snapshotMilestoneVideo", { const targetTime = now + delay;
aid, await scheduleSnapshot(client, aid, "milestone", targetTime);
currentViews: stat.views,
snapshotedAt: stat.time,
}, { delay, priority: 1 });
await job.updateData({
...job.data,
updatedViews: stat.views,
updatedTime: new Date(stat.time).toISOString(),
etaInMins: eta / 60,
});
logger.log(
`Scheduled next milestone snapshot for ${aid} in ${
formatSeconds(delay / 1000)
}, current views: ${stat.views}`,
"mq",
);
} catch (e) {
if (e instanceof NetSchedulerError && e.code === "NO_AVAILABLE_PROXY") {
logger.warn(
`No available proxy for aid ${job.data.aid}.`,
"mq",
"fn:takeSnapshotForMilestoneVideoWorker",
);
await SnapshotQueue.add("snapshotMilestoneVideo", {
aid: job.data.aid,
currentViews: job.data.currentViews,
snapshotedAt: job.data.snapshotedAt,
}, { delay: 5 * SECOND, priority: 1 });
return;
} }
throw e; } catch (e) {
logger.error(e as Error, "mq", "fn:collectMilestoneSnapshotsWorker");
} finally {
client.release();
}
};
export const regularSnapshotsWorker = async (_job: Job) => {
const client = await db.connect();
try {
const aids = await getVideosWithoutActiveSnapshotSchedule(client);
for (const rawAid of aids) {
const aid = Number(rawAid);
const latestSnapshot = await getLatestVideoSnapshot(client, aid);
const now = Date.now();
const lastSnapshotedAt = latestSnapshot?.time ?? now;
const targetTime = truncate(lastSnapshotedAt + 24 * HOUR, now + 1, now + 100000 * WEEK);
await scheduleSnapshot(client, aid, "normal", targetTime);
}
} catch (e) {
logger.error(e as Error, "mq", "fn:regularSnapshotsWorker");
} finally { } finally {
client.release(); client.release();
} }
}; };
export const takeSnapshotForVideoWorker = async (job: Job) => { export const takeSnapshotForVideoWorker = async (job: Job) => {
const id = job.data.id;
const aid = Number(job.data.aid);
const type = job.data.type;
const task = snapshotTypeToTaskMap[type] ?? "snapshotVideo";
const client = await db.connect(); const client = await db.connect();
await setSnapshotScheduled(job.data.aid, true, 6 * 60 * 60); const retryInterval = type === "milestone" ? 5 * SECOND : 2 * MINUTE;
const exists = await snapshotScheduleExists(client, id);
if (!exists) {
return;
}
try { try {
const { aid } = job.data; if (await videoHasProcessingSchedule(client, aid)) {
const stat = await insertVideoStats(client, aid, "getVideoInfo"); return `ALREADY_PROCESSING`;
}
await setSnapshotStatus(client, id, "processing");
const stat = await insertVideoSnapshot(client, aid, task);
if (typeof stat === "number") { if (typeof stat === "number") {
if (stat === -404 || stat === 62002 || stat == 62012) { await setBiliVideoStatus(client, aid, stat);
await setSnapshotScheduled(aid, true, 6 * 60 * 60); await setSnapshotStatus(client, id, "completed");
} else { return `BILI_STATUS_${stat}`;
await setSnapshotScheduled(aid, false, 0);
} }
return; await setSnapshotStatus(client, id, "completed");
if (type === "normal") {
await scheduleSnapshot(client, aid, type, Date.now() + 24 * HOUR);
return `DONE`;
} }
logger.log(`Taken snapshot for ${aid}`, "mq"); if (type !== "milestone") return `DONE`;
if (stat == null) { const eta = await getAdjustedShortTermETA(client, aid);
setSnapshotScheduled(aid, false, 0); if (eta > 72) return "ETA_TOO_LONG";
return; const now = Date.now();
} const targetTime = now + eta * HOUR;
await job.updateData({ await scheduleSnapshot(client, aid, type, targetTime);
...job.data, return `DONE`;
updatedViews: stat.views,
updatedTime: new Date(stat.time).toISOString(),
});
const nearMilestone = (stat.views >= 90000 && stat.views < 100000) ||
(stat.views >= 900000 && stat.views < 1000000);
if (nearMilestone) {
await SnapshotQueue.add("snapshotMilestoneVideo", {
aid,
currentViews: stat.views,
snapshotedAt: stat.time,
}, { delay: 0, priority: 1 });
}
await setSnapshotScheduled(aid, false, 0);
} catch (e) { } catch (e) {
if (e instanceof NetSchedulerError && e.code === "NO_AVAILABLE_PROXY") { if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") {
await setSnapshotScheduled(job.data.aid, false, 0); logger.warn(
`No available proxy for aid ${job.data.aid}.`,
"mq",
"fn:takeSnapshotForVideoWorker",
);
await setSnapshotStatus(client, id, "completed");
await scheduleSnapshot(client, aid, type, Date.now() + retryInterval);
return; return;
} }
throw e; logger.error(e as Error, "mq", "fn:takeSnapshotForVideoWorker");
await setSnapshotStatus(client, id, "failed");
} finally { } finally {
client.release(); client.release();
} }

View File

@ -1,24 +1,39 @@
import { MINUTE } from "$std/datetime/constants.ts"; import { MINUTE, SECOND } from "$std/datetime/constants.ts";
import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "lib/mq/index.ts"; import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "lib/mq/index.ts";
import logger from "lib/log/logger.ts"; import logger from "lib/log/logger.ts";
export async function initMQ() { export async function initMQ() {
// await LatestVideosQueue.upsertJobScheduler("getLatestVideos", { await LatestVideosQueue.upsertJobScheduler("getLatestVideos", {
// every: 1 * MINUTE, every: 1 * MINUTE,
// immediately: true, immediately: true,
// }); });
// await ClassifyVideoQueue.upsertJobScheduler("classifyVideos", { await ClassifyVideoQueue.upsertJobScheduler("classifyVideos", {
// every: 5 * MINUTE, every: 5 * MINUTE,
// immediately: true, immediately: true,
// }); });
// await LatestVideosQueue.upsertJobScheduler("collectSongs", { await LatestVideosQueue.upsertJobScheduler("collectSongs", {
// every: 3 * MINUTE,
// immediately: true,
// });
await SnapshotQueue.upsertJobScheduler("scheduleSnapshotTick", {
every: 3 * MINUTE, every: 3 * MINUTE,
immediately: true, immediately: true,
}); });
await SnapshotQueue.upsertJobScheduler("snapshotTick", {
every: 1 * SECOND,
immediately: true,
}, {
opts: {
removeOnComplete: 1,
removeOnFail: 1,
},
});
await SnapshotQueue.upsertJobScheduler("collectMilestoneSnapshots", {
every: 5 * MINUTE,
immediately: true,
});
await SnapshotQueue.upsertJobScheduler("dispatchRegularSnapshots", {
every: 30 * MINUTE,
immediately: true,
});
logger.log("Message queue initialized."); logger.log("Message queue initialized.");
} }

View File

@ -21,7 +21,7 @@ interface ProxiesMap {
} }
type NetSchedulerErrorCode = type NetSchedulerErrorCode =
| "NO_AVAILABLE_PROXY" | "NO_PROXY_AVAILABLE"
| "PROXY_RATE_LIMITED" | "PROXY_RATE_LIMITED"
| "PROXY_NOT_FOUND" | "PROXY_NOT_FOUND"
| "FETCH_ERROR" | "FETCH_ERROR"
@ -143,10 +143,10 @@ class NetScheduler {
* @param {string} method - The HTTP method to use for the request. Default is "GET". * @param {string} method - The HTTP method to use for the request. Default is "GET".
* @returns {Promise<any>} - A promise that resolves to the response body. * @returns {Promise<any>} - A promise that resolves to the response body.
* @throws {NetSchedulerError} - The error will be thrown in following cases: * @throws {NetSchedulerError} - The error will be thrown in following cases:
* - No available proxy currently: with error code NO_AVAILABLE_PROXY * - No proxy is available currently: with error code `NO_PROXY_AVAILABLE`
* - Proxy is under rate limit: with error code PROXY_RATE_LIMITED * - The native `fetch` function threw an error: with error code `FETCH_ERROR`
* - The native `fetch` function threw an error: with error code FETCH_ERROR * - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
* - The proxy type is not supported: with error code NOT_IMPLEMENTED * - The proxy type is not supported: with error code `NOT_IMPLEMENTED`
*/ */
async request<R>(url: string, task: string, method: string = "GET"): Promise<R> { async request<R>(url: string, task: string, method: string = "GET"): Promise<R> {
// find a available proxy // find a available proxy
@ -156,7 +156,7 @@ class NetScheduler {
return await this.proxyRequest<R>(url, proxyName, task, method); return await this.proxyRequest<R>(url, proxyName, task, method);
} }
} }
throw new NetSchedulerError("No available proxy currently.", "NO_AVAILABLE_PROXY"); throw new NetSchedulerError("No proxy is available currently.", "NO_PROXY_AVAILABLE");
} }
/* /*
@ -168,10 +168,11 @@ class NetScheduler {
* @param {boolean} force - If true, the request will be made even if the proxy is rate limited. Default is false. * @param {boolean} force - If true, the request will be made even if the proxy is rate limited. Default is false.
* @returns {Promise<any>} - A promise that resolves to the response body. * @returns {Promise<any>} - A promise that resolves to the response body.
* @throws {NetSchedulerError} - The error will be thrown in following cases: * @throws {NetSchedulerError} - The error will be thrown in following cases:
* - Proxy not found: with error code PROXY_NOT_FOUND * - Proxy not found: with error code `PROXY_NOT_FOUND`
* - Proxy is under rate limit: with error code PROXY_RATE_LIMITED * - Proxy is under rate limit: with error code `PROXY_RATE_LIMITED`
* - The native `fetch` function threw an error: with error code FETCH_ERROR * - The native `fetch` function threw an error: with error code `FETCH_ERROR`
* - The proxy type is not supported: with error code NOT_IMPLEMENTED * - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
* - The proxy type is not supported: with error code `NOT_IMPLEMENTED`
*/ */
async proxyRequest<R>( async proxyRequest<R>(
url: string, url: string,
@ -255,8 +256,6 @@ class NetScheduler {
} }
private async alicloudFcRequest<R>(url: string, region: string): Promise<R> { private async alicloudFcRequest<R>(url: string, region: string): Promise<R> {
let rawOutput: null | Uint8Array = null;
let rawErr: null | Uint8Array = null;
try { try {
const decoder = new TextDecoder(); const decoder = new TextDecoder();
const output = await new Deno.Command("aliyun", { const output = await new Deno.Command("aliyun", {
@ -280,15 +279,9 @@ class NetScheduler {
`CVSA-${region}`, `CVSA-${region}`,
], ],
}).output(); }).output();
rawOutput = output.stdout;
rawErr = output.stderr;
const out = decoder.decode(output.stdout); const out = decoder.decode(output.stdout);
const rawData = JSON.parse(out); const rawData = JSON.parse(out);
if (rawData.statusCode !== 200) { if (rawData.statusCode !== 200) {
const fileId = randomUUID();
await Deno.writeFile(`./logs/files/${fileId}.stdout`, output.stdout);
await Deno.writeFile(`./logs/files/${fileId}.stderr`, output.stderr);
logger.log(`Returned non-200 status code. Raw ouput saved to ./logs/files/${fileId}.stdout/stderr`, "net", "fn:alicloudFcRequest")
throw new NetSchedulerError( throw new NetSchedulerError(
`Error proxying ${url} to ali-fc region ${region}, code: ${rawData.statusCode}.`, `Error proxying ${url} to ali-fc region ${region}, code: ${rawData.statusCode}.`,
"ALICLOUD_PROXY_ERR", "ALICLOUD_PROXY_ERR",
@ -297,12 +290,6 @@ class NetScheduler {
return JSON.parse(JSON.parse(rawData.body)) as R; return JSON.parse(JSON.parse(rawData.body)) as R;
} }
} catch (e) { } catch (e) {
if (rawOutput !== null || rawErr !== null) {
const fileId = randomUUID();
rawOutput && await Deno.writeFile(`./logs/files/${fileId}.stdout`, rawOutput);
rawErr && await Deno.writeFile(`./logs/files/${fileId}.stderr`, rawErr);
logger.log(`Error occurred. Raw ouput saved to ./logs/files/${fileId}.stdout/stderr`, "net", "fn:alicloudFcRequest")
}
logger.error(e as Error, "net", "fn:alicloudFcRequest"); logger.error(e as Error, "net", "fn:alicloudFcRequest");
throw new NetSchedulerError(`Unhandled error: Cannot proxy ${url} to ali-fc.`, "ALICLOUD_PROXY_ERR", e); throw new NetSchedulerError(`Unhandled error: Cannot proxy ${url} to ali-fc.`, "ALICLOUD_PROXY_ERR", e);
} }
@ -361,7 +348,8 @@ Execution order for setup:
- Call after addProxy and addTask. Configures rate limiters specifically for tasks and their associated proxies. - Call after addProxy and addTask. Configures rate limiters specifically for tasks and their associated proxies.
- Depends on tasks and proxies being defined to apply limiters correctly. - Depends on tasks and proxies being defined to apply limiters correctly.
4. setProviderLimiter(providerName, config): 4. setProviderLimiter(providerName, config):
- Call after addProxy and addTask. Sets rate limiters at the provider level, affecting all proxies used by tasks of that provider. - Call after addProxy and addTask.
- It sets rate limiters at the provider level, affecting all proxies used by tasks of that provider.
- Depends on tasks and proxies being defined to identify which proxies to apply provider-level limiters to. - Depends on tasks and proxies being defined to identify which proxies to apply provider-level limiters to.
In summary: addProxy -> addTask -> (setTaskLimiter and/or setProviderLimiter). In summary: addProxy -> addTask -> (setTaskLimiter and/or setProviderLimiter).

View File

@ -15,12 +15,11 @@ export async function collectSongs(client: Client) {
export async function insertIntoSongs(client: Client, aid: number) { export async function insertIntoSongs(client: Client, aid: number) {
await client.queryObject( await client.queryObject(
` `
INSERT INTO songs (aid, bvid, published_at, duration) INSERT INTO songs (aid, published_at, duration)
VALUES ( VALUES (
$1, $1,
(SELECT bvid FROM all_data WHERE aid = $1), (SELECT published_at FROM bilibili_metadata WHERE aid = $1),
(SELECT published_at FROM all_data WHERE aid = $1), (SELECT duration FROM bilibili_metadata WHERE aid = $1)
(SELECT duration FROM all_data WHERE aid = $1)
) )
ON CONFLICT DO NOTHING ON CONFLICT DO NOTHING
`, `,

View File

@ -4,6 +4,7 @@ import { formatTimestampToPsql } from "lib/utils/formatTimestampToPostgre.ts";
import logger from "lib/log/logger.ts"; import logger from "lib/log/logger.ts";
import { ClassifyVideoQueue } from "lib/mq/index.ts"; import { ClassifyVideoQueue } from "lib/mq/index.ts";
import { userExistsInBiliUsers, videoExistsInAllData } from "lib/db/allData.ts"; import { userExistsInBiliUsers, videoExistsInAllData } from "lib/db/allData.ts";
import { HOUR, SECOND } from "$std/datetime/constants.ts";
export async function insertVideoInfo(client: Client, aid: number) { export async function insertVideoInfo(client: Client, aid: number) {
const videoExists = await videoExistsInAllData(client, aid); const videoExists = await videoExistsInAllData(client, aid);
@ -18,25 +19,25 @@ export async function insertVideoInfo(client: Client, aid: number) {
const desc = data.View.desc; const desc = data.View.desc;
const uid = data.View.owner.mid; const uid = data.View.owner.mid;
const tags = data.Tags const tags = data.Tags
.filter((tag) => tag.tag_type in ["old_channel", "topic"]) .filter((tag) => !["old_channel", "topic"].indexOf(tag.tag_type))
.map((tag) => tag.tag_name).join(","); .map((tag) => tag.tag_name).join(",");
const title = data.View.title; const title = data.View.title;
const published_at = formatTimestampToPsql(data.View.pubdate); const published_at = formatTimestampToPsql(data.View.pubdate * SECOND + 8 * HOUR);
const duration = data.View.duration; const duration = data.View.duration;
await client.queryObject( await client.queryObject(
`INSERT INTO all_data (aid, bvid, description, uid, tags, title, published_at, duration) `INSERT INTO bilibili_metadata (aid, bvid, description, uid, tags, title, published_at, duration)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
[aid, bvid, desc, uid, tags, title, published_at, duration], [aid, bvid, desc, uid, tags, title, published_at, duration],
); );
const userExists = await userExistsInBiliUsers(client, aid); const userExists = await userExistsInBiliUsers(client, aid);
if (!userExists) { if (!userExists) {
await client.queryObject( await client.queryObject(
`INSERT INTO bili_user (uid, username, "desc", fans) VALUES ($1, $2, $3, $4)`, `INSERT INTO bilibili_user (uid, username, "desc", fans) VALUES ($1, $2, $3, $4)`,
[uid, data.View.owner.name, data.Card.card.sign, data.Card.follower], [uid, data.View.owner.name, data.Card.card.sign, data.Card.follower],
); );
} else { } else {
await client.queryObject( await client.queryObject(
`UPDATE bili_user SET fans = $1 WHERE uid = $2`, `UPDATE bilibili_user SET fans = $1 WHERE uid = $2`,
[data.Card.follower, uid], [data.Card.follower, uid],
); );
} }

View File

@ -1,12 +1,28 @@
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { getVideoInfo } from "lib/net/getVideoInfo.ts"; import { getVideoInfo } from "lib/net/getVideoInfo.ts";
import { LatestSnapshotType } from "lib/db/schema.d.ts";
import logger from "lib/log/logger.ts";
export async function insertVideoStats(client: Client, aid: number, task: string) { /*
* Fetch video stats from bilibili API and insert into database
* @returns {Promise<number|VideoSnapshot>}
* A number indicating the status code when receiving non-0 status code from bilibili,
* otherwise an VideoSnapshot object containing the video stats
* @throws {NetSchedulerError} - The error will be thrown in following cases:
* - No proxy is available currently: with error code `NO_PROXY_AVAILABLE`
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
*/
export async function insertVideoSnapshot(
client: Client,
aid: number,
task: string,
): Promise<number | LatestSnapshotType> {
const data = await getVideoInfo(aid, task); const data = await getVideoInfo(aid, task);
const time = new Date().getTime(); if (typeof data == "number") {
if (typeof data == 'number') {
return data; return data;
} }
const time = new Date().getTime();
const views = data.stat.view; const views = data.stat.view;
const danmakus = data.stat.danmaku; const danmakus = data.stat.danmaku;
const replies = data.stat.reply; const replies = data.stat.reply;
@ -14,11 +30,19 @@ export async function insertVideoStats(client: Client, aid: number, task: string
const coins = data.stat.coin; const coins = data.stat.coin;
const shares = data.stat.share; const shares = data.stat.share;
const favorites = data.stat.favorite; const favorites = data.stat.favorite;
await client.queryObject(`
const query: string = `
INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites) INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`, [aid, views, danmakus, replies, likes, coins, shares, favorites]); `;
return { await client.queryObject(
query,
[aid, views, danmakus, replies, likes, coins, shares, favorites],
);
logger.log(`Taken snapshot for video ${aid}.`, "net", "fn:insertVideoSnapshot");
const snapshot: LatestSnapshotType = {
aid, aid,
views, views,
danmakus, danmakus,
@ -27,6 +51,8 @@ export async function insertVideoStats(client: Client, aid: number, task: string
coins, coins,
shares, shares,
favorites, favorites,
time time,
} };
return snapshot;
} }

View File

@ -26,8 +26,8 @@ interface VideoInfoData {
mid: number; mid: number;
name: string; name: string;
face: string; face: string;
}, };
stat: VideoStats, stat: VideoStats;
} }
interface VideoDetailsData { interface VideoDetailsData {

View File

@ -2,6 +2,19 @@ import netScheduler from "lib/mq/scheduler.ts";
import { VideoInfoData, VideoInfoResponse } from "lib/net/bilibili.d.ts"; import { VideoInfoData, VideoInfoResponse } from "lib/net/bilibili.d.ts";
import logger from "lib/log/logger.ts"; import logger from "lib/log/logger.ts";
/*
* Fetch video metadata from bilibili API
* @param {number} aid - The video's aid
* @param {string} task - The task name used in scheduler. It can be one of the following:
* - snapshotVideo
* - getVideoInfo
* - snapshotMilestoneVideo
* @returns {Promise<VideoInfoData | number>} VideoInfoData or the error code returned by bilibili API
* @throws {NetSchedulerError} - The error will be thrown in following cases:
* - No proxy is available currently: with error code `NO_PROXY_AVAILABLE`
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
*/
export async function getVideoInfo(aid: number, task: string): Promise<VideoInfoData | number> { export async function getVideoInfo(aid: number, task: string): Promise<VideoInfoData | number> {
const url = `https://api.bilibili.com/x/web-interface/view?aid=${aid}`; const url = `https://api.bilibili.com/x/web-interface/view?aid=${aid}`;
const data = await netScheduler.request<VideoInfoResponse>(url, task); const data = await netScheduler.request<VideoInfoResponse>(url, task);

View File

@ -1,9 +1,9 @@
export const formatSeconds = (seconds: number) => { export const formatSeconds = (seconds: number) => {
if (seconds < 60) { if (seconds < 60) {
return `${(seconds).toFixed(1)}s`; return `${seconds.toFixed(1)}s`;
} }
if (seconds < 3600) { if (seconds < 3600) {
return `${Math.floor(seconds / 60)}m${seconds % 60}s`; return `${Math.floor(seconds / 60)}m${(seconds % 60).toFixed(1)}s`;
} }
return `${Math.floor(seconds / 3600)}h ${((seconds % 3600) / 60).toFixed(2)}m`; return `${Math.floor(seconds / 3600)}h ${((seconds % 3600) / 60).toFixed(2)}m`;
}; };

View File

@ -4,20 +4,20 @@ from model import CompactPredictor
import torch import torch
def main(): def main():
model = CompactPredictor(16).to('cpu', dtype=torch.float32) model = CompactPredictor(10).to('cpu', dtype=torch.float32)
model.load_state_dict(torch.load('./pred/checkpoints/model_20250315_0530.pt')) model.load_state_dict(torch.load('./pred/checkpoints/long_term.pt'))
model.eval() model.eval()
# inference # inference
initial = 999269 initial = 997029
last = initial last = initial
start_time = '2025-03-15 01:03:21' start_time = '2025-03-17 00:13:17'
for i in range(1, 48): for i in range(1, 120):
hour = i / 0.5 hour = i / 0.5
sec = hour * 3600 sec = hour * 3600
time_d = np.log2(sec) time_d = np.log2(sec)
data = [time_d, np.log2(initial+1), # time_delta, current_views data = [time_d, np.log2(initial+1), # time_delta, current_views
2.801318, 3.455128, 3.903391, 3.995577, 4.641488, 5.75131, 6.723868, 6.105322, 8.141023, 9.576701, 10.665067, # grows_feat 6.111542, 8.404707, 10.071566, 11.55888, 12.457823,# grows_feat
0.043993, 0.72057, 28.000902 # time_feat 0.009225, 0.001318, 28.001814# time_feat
] ]
np_arr = np.array([data]) np_arr = np.array([data])
tensor = torch.from_numpy(np_arr).to('cpu', dtype=torch.float32) tensor = torch.from_numpy(np_arr).to('cpu', dtype=torch.float32)
@ -25,7 +25,7 @@ def main():
num = output.detach().numpy()[0][0] num = output.detach().numpy()[0][0]
views_pred = int(np.exp2(num)) + initial views_pred = int(np.exp2(num)) + initial
current_time = datetime.datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S') + datetime.timedelta(hours=hour) current_time = datetime.datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S') + datetime.timedelta(hours=hour)
print(current_time.strftime('%m-%d %H:%M'), views_pred, views_pred - last) print(current_time.strftime('%m-%d %H:%M:%S'), views_pred, views_pred - last)
last = views_pred last = views_pred
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -1,10 +1,10 @@
import { Job, Worker } from "bullmq"; import { ConnectionOptions, Job, Worker } from "bullmq";
import { redis } from "lib/db/redis.ts"; import { redis } from "lib/db/redis.ts";
import logger from "lib/log/logger.ts"; import logger from "lib/log/logger.ts";
import { classifyVideosWorker, classifyVideoWorker } from "lib/mq/exec/classifyVideo.ts"; import { classifyVideosWorker, classifyVideoWorker } from "lib/mq/exec/classifyVideo.ts";
import { WorkerError } from "lib/mq/schema.ts"; import { WorkerError } from "lib/mq/schema.ts";
import { lockManager } from "lib/mq/lockManager.ts"; import { lockManager } from "lib/mq/lockManager.ts";
import { initializeModels } from "lib/ml/filter_inference.ts"; import Akari from "lib/ml/akari.ts";
Deno.addSignalListener("SIGINT", async () => { Deno.addSignalListener("SIGINT", async () => {
logger.log("SIGINT Received: Shutting down workers...", "mq"); logger.log("SIGINT Received: Shutting down workers...", "mq");
@ -18,7 +18,7 @@ Deno.addSignalListener("SIGTERM", async () => {
Deno.exit(); Deno.exit();
}); });
await initializeModels(); Akari.init();
const filterWorker = new Worker( const filterWorker = new Worker(
"classifyVideo", "classifyVideo",
@ -32,7 +32,7 @@ const filterWorker = new Worker(
break; break;
} }
}, },
{ connection: redis, concurrency: 2, removeOnComplete: { count: 1000 } }, { connection: redis as ConnectionOptions, concurrency: 2, removeOnComplete: { count: 1000 } },
); );
filterWorker.on("active", () => { filterWorker.on("active", () => {

View File

@ -1,21 +1,27 @@
import { Job, Worker } from "bullmq"; import { ConnectionOptions, Job, Worker } from "bullmq";
import { collectSongsWorker, getLatestVideosWorker } from "lib/mq/executors.ts"; import { collectSongsWorker, getLatestVideosWorker } from "lib/mq/executors.ts";
import { redis } from "lib/db/redis.ts"; import { redis } from "lib/db/redis.ts";
import logger from "lib/log/logger.ts"; import logger from "lib/log/logger.ts";
import { lockManager } from "lib/mq/lockManager.ts"; import { lockManager } from "lib/mq/lockManager.ts";
import { WorkerError } from "lib/mq/schema.ts"; import { WorkerError } from "lib/mq/schema.ts";
import { getVideoInfoWorker } from "lib/mq/exec/getLatestVideos.ts"; import { getVideoInfoWorker } from "lib/mq/exec/getLatestVideos.ts";
import { snapshotTickWorker, takeSnapshotForMilestoneVideoWorker, takeSnapshotForVideoWorker } from "lib/mq/exec/snapshotTick.ts"; import {
collectMilestoneSnapshotsWorker, regularSnapshotsWorker,
snapshotTickWorker,
takeSnapshotForVideoWorker,
} from "lib/mq/exec/snapshotTick.ts";
Deno.addSignalListener("SIGINT", async () => { Deno.addSignalListener("SIGINT", async () => {
logger.log("SIGINT Received: Shutting down workers...", "mq"); logger.log("SIGINT Received: Shutting down workers...", "mq");
await latestVideoWorker.close(true); await latestVideoWorker.close(true);
await snapshotWorker.close(true);
Deno.exit(); Deno.exit();
}); });
Deno.addSignalListener("SIGTERM", async () => { Deno.addSignalListener("SIGTERM", async () => {
logger.log("SIGTERM Received: Shutting down workers...", "mq"); logger.log("SIGTERM Received: Shutting down workers...", "mq");
await latestVideoWorker.close(true); await latestVideoWorker.close(true);
await snapshotWorker.close(true);
Deno.exit(); Deno.exit();
}); });
@ -36,7 +42,12 @@ const latestVideoWorker = new Worker(
break; break;
} }
}, },
{ connection: redis, concurrency: 6, removeOnComplete: { count: 1440 }, removeOnFail: { count: 0 } }, {
connection: redis as ConnectionOptions,
concurrency: 6,
removeOnComplete: { count: 1440 },
removeOnFail: { count: 0 },
},
); );
latestVideoWorker.on("active", () => { latestVideoWorker.on("active", () => {
@ -56,23 +67,26 @@ const snapshotWorker = new Worker(
"snapshot", "snapshot",
async (job: Job) => { async (job: Job) => {
switch (job.name) { switch (job.name) {
case "scheduleSnapshotTick":
await snapshotTickWorker(job);
break;
case "snapshotMilestoneVideo":
await takeSnapshotForMilestoneVideoWorker(job);
break;
case "snapshotVideo": case "snapshotVideo":
await takeSnapshotForVideoWorker(job); await takeSnapshotForVideoWorker(job);
break; break;
case "snapshotTick":
await snapshotTickWorker(job);
break;
case "collectMilestoneSnapshots":
await collectMilestoneSnapshotsWorker(job);
break;
case "dispatchRegularSnapshots":
await regularSnapshotsWorker(job);
break;
default: default:
break; break;
} }
}, },
{ connection: redis, concurrency: 10, removeOnComplete: { count: 2000 } }, { connection: redis as ConnectionOptions, concurrency: 10, removeOnComplete: { count: 2000 } },
); );
snapshotWorker.on("error", (err) => { snapshotWorker.on("error", (err) => {
const e = err as WorkerError; const e = err as WorkerError;
logger.error(e.rawError, e.service, e.codePath); logger.error(e.rawError, e.service, e.codePath);
}) });

View File

@ -0,0 +1,18 @@
import { assertEquals, assertInstanceOf, assertNotEquals } from "@std/assert";
import { findClosestSnapshot } from "lib/db/snapshotSchedule.ts";
import { postgresConfig } from "lib/db/pgConfig.ts";
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
Deno.test("Snapshot Schedule - getShortTermTimeFeaturesForVideo", async () => {
const client = new Client(postgresConfig);
try {
const result = await findClosestSnapshot(client, 247308539, new Date(1741983383000));
assertNotEquals(result, null);
const created_at = result!.created_at;
const views = result!.views;
assertInstanceOf(created_at, Date);
assertEquals(typeof views, "number");
} finally {
client.end();
}
});

22
test/ml/akari.json Normal file
View File

@ -0,0 +1,22 @@
{
"test1": [
{
"title": "【洛天依】《一花依世界》2024重调版|“抬头仰望夜空多安详”【原创PV付】",
"desc": "本家BV1Vs411H7JH\n作曲LS\n作词杏花包子\n调教鬼面P\n混音虎皮猫P\n演唱洛天依\n曲绘山下鸭鸭窝\n映像阿妍\n——————————————————————\n本稿为同人二创非本家重制",
"tags": "发现《一花依世界》, Vsinger创作激励计划, 洛天依, VOCALOID CHINA, 翻唱, 原创PV付, ACE虚拟歌姬, 中文VOCALOID, 国风电子, 一花依世界, ACE Studio, Vsinger创作激励计划2024冬季物语",
"label": 2
},
{
"title": "【鏡音レン】アカシア【VOCALOID Cover】",
"desc": "鏡音リン・レン 13th Anniversary\n\nMusicBUMP OF CHICKEN https://youtu.be/BoZ0Zwab6Oc\nustMaplestyle sm37853236\nOff Vocal: https://youtu.be/YMzrUzq1uX0\nSinger鏡音レン\n\n氷雨ハルカ\nYoutube https://t.co/8zuv6g7Acm\nniconicohttps://t.co/C6DRfdYAp0\ntwitter https://twitter.com/hisame_haruka\n\n転載禁止\nPlease do not reprint without my permission.",
"tags": "鏡音レン",
"label": 0
},
{
"title": "【洛天依原创曲】谪星【姆斯塔之谕】",
"desc": "谪星\n\n策划/世界观:听雨\n作词听雨\n作曲/编曲:太白\n混音虎皮猫\n人设以木\n曲绘Ar极光\n调校哈士奇p\n视频苏卿白",
"tags": "2025虚拟歌手贺岁纪, 洛天依, 原创歌曲, VOCALOID, 虚拟歌手, 原创音乐, 姆斯塔, 中文VOCALOID",
"label": 1
}
]
}

46
test/ml/akari.test.ts Normal file
View File

@ -0,0 +1,46 @@
import Akari from "lib/ml/akari.ts";
import { assertEquals, assertGreaterOrEqual } from "jsr:@std/assert";
import { join } from "$std/path/join.ts";
import { SECOND } from "$std/datetime/constants.ts";
Deno.test("Akari AI - normal cases accuracy test", async () => {
const path = import.meta.dirname!;
const dataPath = join(path, "akari.json");
const rawData = await Deno.readTextFile(dataPath);
const data = JSON.parse(rawData);
await Akari.init();
for (const testCase of data.test1) {
const result = await Akari.classifyVideo(
testCase.title,
testCase.desc,
testCase.tags,
);
assertEquals(result, testCase.label);
}
});
Deno.test("Akari AI - performance test", async () => {
const path = import.meta.dirname!;
const dataPath = join(path, "akari.json");
const rawData = await Deno.readTextFile(dataPath);
const data = JSON.parse(rawData);
await Akari.init();
const N = 200;
const testCase = data.test1[0];
const title = testCase.title;
const desc = testCase.desc;
const tags = testCase.tags;
const time = performance.now();
for (let i = 0; i < N; i++) {
await Akari.classifyVideo(
title,
desc,
tags,
);
}
const end = performance.now();
const elapsed = (end - time) / SECOND;
const throughput = N / elapsed;
assertGreaterOrEqual(throughput, 100);
console.log(`Akari AI throughput: ${throughput.toFixed(1)} samples / sec`);
});