Compare commits

..

No commits in common. "35d58be8fd945bcf5b9576654c3e3d75b19f57c8" and "afffbd8ecb7c5dc91dbab59bc40891af497800e2" have entirely different histories.

27 changed files with 613 additions and 915 deletions

View File

@ -1,24 +1,21 @@
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { AllDataType, BiliUserType } from "lib/db/schema.d.ts"; import { AllDataType, BiliUserType } from "lib/db/schema.d.ts";
import Akari from "lib/ml/akari.ts"; import { modelVersion } from "lib/ml/filter_inference.ts";
export async function videoExistsInAllData(client: Client, aid: number) { export async function videoExistsInAllData(client: Client, aid: number) {
return await client.queryObject<{ exists: boolean }>( return await client.queryObject<{ exists: boolean }>(`SELECT EXISTS(SELECT 1 FROM all_data WHERE aid = $1)`, [aid])
`SELECT EXISTS(SELECT 1 FROM bilibili_metadata WHERE aid = $1)`,
[aid],
)
.then((result) => result.rows[0].exists); .then((result) => result.rows[0].exists);
} }
export async function userExistsInBiliUsers(client: Client, uid: number) { export async function userExistsInBiliUsers(client: Client, uid: number) {
return await client.queryObject<{ exists: boolean }>(`SELECT EXISTS(SELECT 1 FROM bilibili_user WHERE uid = $1)`, [ return await client.queryObject<{ exists: boolean }>(`SELECT EXISTS(SELECT 1 FROM bili_user WHERE uid = $1)`, [
uid, uid,
]); ]);
} }
export async function getUnlabelledVideos(client: Client) { export async function getUnlabelledVideos(client: Client) {
const queryResult = await client.queryObject<{ aid: number }>( const queryResult = await client.queryObject<{ aid: number }>(
`SELECT a.aid FROM bilibili_metadata a LEFT JOIN labelling_result l ON a.aid = l.aid WHERE l.aid IS NULL`, `SELECT a.aid FROM all_data a LEFT JOIN labelling_result l ON a.aid = l.aid WHERE l.aid IS NULL`,
); );
return queryResult.rows.map((row) => row.aid); return queryResult.rows.map((row) => row.aid);
} }
@ -26,20 +23,20 @@ export async function getUnlabelledVideos(client: Client) {
export async function insertVideoLabel(client: Client, aid: number, label: number) { export async function insertVideoLabel(client: Client, aid: number, label: number) {
return await client.queryObject( return await client.queryObject(
`INSERT INTO labelling_result (aid, label, model_version) VALUES ($1, $2, $3) ON CONFLICT (aid, model_version) DO NOTHING`, `INSERT INTO labelling_result (aid, label, model_version) VALUES ($1, $2, $3) ON CONFLICT (aid, model_version) DO NOTHING`,
[aid, label, Akari.getModelVersion()], [aid, label, modelVersion],
); );
} }
export async function getVideoInfoFromAllData(client: Client, aid: number) { export async function getVideoInfoFromAllData(client: Client, aid: number) {
const queryResult = await client.queryObject<AllDataType>( const queryResult = await client.queryObject<AllDataType>(
`SELECT * FROM bilibili_metadata WHERE aid = $1`, `SELECT * FROM all_data WHERE aid = $1`,
[aid], [aid],
); );
const row = queryResult.rows[0]; const row = queryResult.rows[0];
let authorInfo = ""; let authorInfo = "";
if (row.uid && await userExistsInBiliUsers(client, row.uid)) { if (row.uid && await userExistsInBiliUsers(client, row.uid)) {
const q = await client.queryObject<BiliUserType>( const q = await client.queryObject<BiliUserType>(
`SELECT * FROM bilibili_user WHERE uid = $1`, `SELECT * FROM bili_user WHERE uid = $1`,
[row.uid], [row.uid],
); );
const userRow = q.rows[0]; const userRow = q.rows[0];
@ -59,8 +56,8 @@ export async function getUnArchivedBiliUsers(client: Client) {
const queryResult = await client.queryObject<{ uid: number }>( const queryResult = await client.queryObject<{ uid: number }>(
` `
SELECT ad.uid SELECT ad.uid
FROM bilibili_metadata ad FROM all_data ad
LEFT JOIN bilibili_user bu ON ad.uid = bu.uid LEFT JOIN bili_user bu ON ad.uid = bu.uid
WHERE bu.uid IS NULL; WHERE bu.uid IS NULL;
`, `,
[], [],
@ -68,10 +65,3 @@ export async function getUnArchivedBiliUsers(client: Client) {
const rows = queryResult.rows; const rows = queryResult.rows;
return rows.map((row) => row.uid); return rows.map((row) => row.uid);
} }
export async function setBiliVideoStatus(client: Client, aid: number, status: number) {
return await client.queryObject(
`UPDATE bilibili_metadata SET status = $1 WHERE aid = $2`,
[status, aid],
);
}

22
lib/db/schema.d.ts vendored
View File

@ -31,25 +31,3 @@ export interface VideoSnapshotType {
aid: bigint; aid: bigint;
replies: number; replies: number;
} }
export interface LatestSnapshotType {
aid: number;
time: number;
views: number;
danmakus: number;
replies: number;
likes: number;
coins: number;
shares: number;
favorites: number;
}
export interface SnapshotScheduleType {
id: number;
aid: number;
type?: string;
created_at: string;
started_at?: string;
finished_at?: string;
status: string;
}

View File

@ -1,17 +1,44 @@
import { DAY, SECOND } from "$std/datetime/constants.ts";
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { LatestSnapshotType } from "lib/db/schema.d.ts"; import { VideoSnapshotType } from "lib/db/schema.d.ts";
import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts";
export async function getVideosNearMilestone(client: Client) { export async function getSongsNearMilestone(client: Client) {
const queryResult = await client.queryObject<LatestSnapshotType>(` const queryResult = await client.queryObject<VideoSnapshotType>(`
SELECT ls.* WITH max_views_per_aid AS (
FROM latest_video_snapshot ls -- aid views aid songs
SELECT
vs.aid,
MAX(vs.views) AS max_views
FROM
video_snapshot vs
INNER JOIN INNER JOIN
songs s ON ls.aid = s.aid songs s
ON
vs.aid = s.aid
GROUP BY
vs.aid
),
filtered_max_views AS (
-- views
SELECT
aid,
max_views
FROM
max_views_per_aid
WHERE WHERE
s.deleted = false AND (max_views >= 90000 AND max_views < 100000) OR
(views >= 90000 AND views < 100000) OR (max_views >= 900000 AND max_views < 1000000)
(views >= 900000 AND views < 1000000) OR )
(views >= 9900000 AND views < 10000000) --
SELECT
vs.*
FROM
video_snapshot vs
INNER JOIN
filtered_max_views fmv
ON
vs.aid = fmv.aid AND vs.views = fmv.max_views
`); `);
return queryResult.rows.map((row) => { return queryResult.rows.map((row) => {
return { return {
@ -21,20 +48,143 @@ export async function getVideosNearMilestone(client: Client) {
}); });
} }
export async function getLatestVideoSnapshot(client: Client, aid: number): Promise<null | LatestSnapshotType> { export async function getUnsnapshotedSongs(client: Client) {
const queryResult = await client.queryObject<LatestSnapshotType>(` const queryResult = await client.queryObject<{ aid: bigint }>(`
SELECT * SELECT DISTINCT s.aid
FROM latest_video_snapshot FROM songs s
LEFT JOIN video_snapshot v ON s.aid = v.aid
WHERE v.aid IS NULL;
`);
return queryResult.rows.map((row) => Number(row.aid));
}
export async function getSongSnapshotCount(client: Client, aid: number) {
const queryResult = await client.queryObject<{ count: number }>(
`
SELECT COUNT(*) AS count
FROM video_snapshot
WHERE aid = $1;
`,
[aid],
);
return queryResult.rows[0].count;
}
export async function getShortTermEtaPrediction(client: Client, aid: number) {
const queryResult = await client.queryObject<{eta: number}>(
`
WITH old_snapshot AS (
SELECT created_at, views
FROM video_snapshot
WHERE aid = $1 AND
NOW() - created_at > '20 min'
ORDER BY created_at DESC
LIMIT 1
),
new_snapshot AS (
SELECT created_at, views
FROM video_snapshot
WHERE aid = $1 WHERE aid = $1
`, [aid]); ORDER BY created_at DESC
LIMIT 1
)
SELECT
CASE
WHEN n.views > 100000
THEN
(1000000 - n.views) -- Views remaining
/
(
(n.views - o.views) -- Views delta
/
(EXTRACT(EPOCH FROM (n.created_at - o.created_at)) + 0.001) -- Time delta in seconds
+ 0.001
) -- Increment per second
ELSE
(100000 - n.views) -- Views remaining
/
(
(n.views - o.views) -- Views delta
/
(EXTRACT(EPOCH FROM (n.created_at - o.created_at)) + 0.001) -- Time delta in seconds
+ 0.001
) -- Increment per second
END AS eta
FROM old_snapshot o, new_snapshot n;
`,
[aid],
);
if (queryResult.rows.length === 0) { if (queryResult.rows.length === 0) {
return null; return null;
} }
return queryResult.rows.map((row) => { return queryResult.rows[0].eta;
return {
...row,
aid: Number(row.aid),
time: new Date(row.time).getTime(),
} }
})[0];
export async function songEligibleForMilestoneSnapshot(client: Client, aid: number) {
const count = await getSongSnapshotCount(client, aid);
if (count < 2) {
return true;
}
const queryResult = await client.queryObject<
{ views1: number; created_at1: string; views2: number; created_at2: string }
>(
`
WITH latest_snapshot AS (
SELECT
aid,
views,
created_at
FROM video_snapshot
WHERE aid = $1
ORDER BY created_at DESC
LIMIT 1
),
pairs AS (
SELECT
a.views AS views1,
a.created_at AS created_at1,
b.views AS views2,
b.created_at AS created_at2,
(b.created_at - a.created_at) AS interval
FROM video_snapshot a
JOIN latest_snapshot b
ON a.aid = b.aid
AND a.created_at < b.created_at
)
SELECT
views1,
created_at1,
views2,
created_at2
FROM (
SELECT
*,
ROW_NUMBER() OVER (
ORDER BY
CASE WHEN interval <= INTERVAL '3 days' THEN 0 ELSE 1 END,
CASE WHEN interval <= INTERVAL '3 days' THEN -interval ELSE interval END
) AS rn
FROM pairs
) ranked
WHERE rn = 1;
`,
[aid],
);
if (queryResult.rows.length === 0) {
return true;
}
const recentViewsData = queryResult.rows[0];
const time1 = parseTimestampFromPsql(recentViewsData.created_at1);
const time2 = parseTimestampFromPsql(recentViewsData.created_at2);
const intervalSec = (time2 - time1) / SECOND;
const views1 = recentViewsData.views1;
const views2 = recentViewsData.views2;
const viewsDiff = views2 - views1;
if (viewsDiff == 0) {
return false;
}
const nextMilestone = views2 >= 100000 ? 1000000 : 100000;
const expectedViewsDiff = nextMilestone - views2;
const expectedIntervalSec = expectedViewsDiff / viewsDiff * intervalSec;
return expectedIntervalSec <= 3 * DAY;
} }

View File

@ -1,228 +0,0 @@
import {DAY, HOUR, MINUTE} from "$std/datetime/constants.ts";
import {Client} from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import {formatTimestampToPsql} from "lib/utils/formatTimestampToPostgre.ts";
import {SnapshotScheduleType} from "./schema.d.ts";
import logger from "lib/log/logger.ts";
export async function snapshotScheduleExists(client: Client, id: number) {
const res = await client.queryObject<{ id: number }>(
`SELECT id FROM snapshot_schedule WHERE id = $1`,
[id],
);
return res.rows.length > 0;
}
/*
Returns true if the specified `aid` has at least one record with "pending" or "processing" status.
*/
export async function videoHasActiveSchedule(client: Client, aid: number) {
const res = await client.queryObject<{ status: string }>(
`SELECT status FROM snapshot_schedule WHERE aid = $1 AND (status = 'pending' OR status = 'processing')`,
[aid],
);
return res.rows.length > 0;
}
export async function videoHasProcessingSchedule(client: Client, aid: number) {
const res = await client.queryObject<{ status: string }>(
`SELECT status FROM snapshot_schedule WHERE aid = $1 AND status = 'processing'`,
[aid],
);
return res.rows.length > 0;
}
interface Snapshot {
created_at: number;
views: number;
}
export async function findClosestSnapshot(
client: Client,
aid: number,
targetTime: Date,
): Promise<Snapshot | null> {
const query = `
SELECT created_at, views
FROM video_snapshot
WHERE aid = $1
ORDER BY ABS(EXTRACT(EPOCH FROM (created_at - $2::timestamptz)))
LIMIT 1
`;
const result = await client.queryObject<{ created_at: string; views: number }>(
query,
[aid, targetTime.toISOString()],
);
if (result.rows.length === 0) return null;
const row = result.rows[0];
return {
created_at: new Date(row.created_at).getTime(),
views: row.views,
};
}
export async function hasAtLeast2Snapshots(client: Client, aid: number) {
const res = await client.queryObject<{ count: number }>(
`SELECT COUNT(*) FROM video_snapshot WHERE aid = $1`,
[aid],
);
return res.rows[0].count >= 2;
}
export async function getLatestSnapshot(client: Client, aid: number): Promise<Snapshot | null> {
const res = await client.queryObject<{ created_at: string; views: number }>(
`SELECT created_at, views FROM video_snapshot WHERE aid = $1 ORDER BY created_at DESC LIMIT 1`,
[aid],
);
if (res.rows.length === 0) return null;
const row = res.rows[0];
return {
created_at: new Date(row.created_at).getTime(),
views: row.views,
};
}
/*
* Returns the number of snapshot schedules within the specified range.
* @param client The database client.
* @param start The start time of the range. (Timestamp in milliseconds)
* @param end The end time of the range. (Timestamp in milliseconds)
*/
export async function getSnapshotScheduleCountWithinRange(client: Client, start: number, end: number) {
const startTimeString = formatTimestampToPsql(start);
const endTimeString = formatTimestampToPsql(end);
const query = `
SELECT COUNT(*) FROM snapshot_schedule
WHERE started_at BETWEEN $1 AND $2
AND status = 'pending'
`;
const res = await client.queryObject<{ count: number }>(query, [startTimeString, endTimeString]);
return res.rows[0].count;
}
/*
* Creates a new snapshot schedule record.
* @param client The database client.
* @param aid The aid of the video.
* @param targetTime Scheduled time for snapshot. (Timestamp in milliseconds)
*/
export async function scheduleSnapshot(client: Client, aid: number, type: string, targetTime: number) {
if (await videoHasActiveSchedule(client, aid)) return;
const allowedCount = type === "milestone" ? 2000 : 800;
const adjustedTime = await adjustSnapshotTime(client, new Date(targetTime), allowedCount);
logger.log(`Scheduled snapshot for ${aid} at ${adjustedTime.toISOString()}`, "mq", "fn:scheduleSnapshot");
return client.queryObject(
`INSERT INTO snapshot_schedule (aid, type, started_at) VALUES ($1, $2, $3)`,
[aid, type, adjustedTime.toISOString()],
);
}
/**
* Adjust the trigger time of the snapshot to ensure it does not exceed the frequency limit
* @param client PostgreSQL client
* @param expectedStartTime The expected snapshot time
* @param allowedCounts The number of snapshots allowed in the 5-minutes windows.
* @returns The adjusted actual snapshot time
*/
export async function adjustSnapshotTime(
client: Client,
expectedStartTime: Date,
allowedCounts: number = 2000
): Promise<Date> {
const findWindowQuery = `
WITH windows AS (
SELECT generate_series(
$1::timestamp, -- Start time: current time truncated to the nearest 5-minute window
$2::timestamp, -- End time: 24 hours after the target time window starts
INTERVAL '5 MINUTES'
) AS window_start
)
SELECT w.window_start
FROM windows w
LEFT JOIN snapshot_schedule s ON s.started_at >= w.window_start
AND s.started_at < w.window_start + INTERVAL '5 MINUTES'
AND s.status = 'pending'
GROUP BY w.window_start
HAVING COUNT(s.*) < ${allowedCounts}
ORDER BY w.window_start
LIMIT 1;
`;
const now = new Date();
const targetTime = expectedStartTime.getTime();
let start = new Date(targetTime - 2 * HOUR);
if (start.getTime() <= now.getTime()) {
start = now;
}
const startTruncated = truncateTo5MinInterval(start);
const end = new Date(startTruncated.getTime() + 1 * DAY);
const windowResult = await client.queryObject<{ window_start: Date }>(
findWindowQuery,
[startTruncated, end],
);
const windowStart = windowResult.rows[0]?.window_start;
if (!windowStart) {
return expectedStartTime;
}
if (windowStart.getTime() > new Date().getTime() + 5 * MINUTE) {
const randomDelay = Math.floor(Math.random() * 5 * MINUTE);
return new Date(windowStart.getTime() + randomDelay);
} else {
return expectedStartTime;
}
}
/**
* Truncate the timestamp to the nearest 5-minute interval
* @param timestamp The timestamp
* @returns The truncated time
*/
function truncateTo5MinInterval(timestamp: Date): Date {
const minutes = timestamp.getMinutes() - (timestamp.getMinutes() % 5);
return new Date(
timestamp.getFullYear(),
timestamp.getMonth(),
timestamp.getDate(),
timestamp.getHours(),
minutes,
0,
0,
);
}
export async function getSnapshotsInNextSecond(client: Client) {
const query = `
SELECT *
FROM snapshot_schedule
WHERE started_at <= NOW() + INTERVAL '1 seconds' AND status = 'pending'
ORDER BY
CASE
WHEN type = 'milestone' THEN 0
ELSE 1
END,
started_at
LIMIT 3;
`;
const res = await client.queryObject<SnapshotScheduleType>(query, []);
return res.rows;
}
export async function setSnapshotStatus(client: Client, id: number, status: string) {
return await client.queryObject(
`UPDATE snapshot_schedule SET status = $2 WHERE id = $1`,
[id, status],
);
}
export async function getVideosWithoutActiveSnapshotSchedule(client: Client) {
const query: string = `
SELECT s.aid
FROM songs s
LEFT JOIN snapshot_schedule ss ON s.aid = ss.aid AND (ss.status = 'pending' OR ss.status = 'processing')
WHERE ss.aid IS NULL
`;
const res = await client.queryObject<{ aid: number }>(query, []);
return res.rows.map((r) => Number(r.aid));
}

View File

@ -1,107 +0,0 @@
import { AIManager } from "lib/ml/manager.ts";
import * as ort from "onnxruntime";
import logger from "lib/log/logger.ts";
import { WorkerError } from "lib/mq/schema.ts";
import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers";
const tokenizerModel = "alikia2x/jina-embedding-v3-m2v-1024";
const onnxClassifierPath = "./model/akari/3.17.onnx";
const onnxEmbeddingPath = "./model/embedding/model.onnx";
class AkariProto extends AIManager {
private tokenizer: PreTrainedTokenizer | null = null;
private readonly modelVersion = "3.17";
constructor() {
super();
this.models = {
"classifier": onnxClassifierPath,
"embedding": onnxEmbeddingPath,
};
}
public override async init(): Promise<void> {
await super.init();
await this.initJinaTokenizer();
}
private tokenizerInitialized(): boolean {
return this.tokenizer !== null;
}
private getTokenizer(): PreTrainedTokenizer {
if (!this.tokenizerInitialized()) {
throw new Error("Tokenizer is not initialized. Call init() first.");
}
return this.tokenizer!;
}
private async initJinaTokenizer(): Promise<void> {
if (this.tokenizerInitialized()) {
return;
}
try {
this.tokenizer = await AutoTokenizer.from_pretrained(tokenizerModel);
logger.log("Tokenizer initialized", "ml");
} catch (error) {
throw new WorkerError(error as Error, "ml", "fn:initTokenizer");
}
}
private async getJinaEmbeddings1024(texts: string[]): Promise<number[]> {
const tokenizer = this.getTokenizer();
const session = this.getModelSession("embedding");
const { input_ids } = await tokenizer(texts, {
add_special_tokens: false,
return_tensor: false,
});
const cumsum = (arr: number[]): number[] =>
arr.reduce((acc: number[], num: number, i: number) => [...acc, num + (acc[i - 1] || 0)], []);
const offsets: number[] = [0, ...cumsum(input_ids.slice(0, -1).map((x: string) => x.length))];
const flattened_input_ids = input_ids.flat();
const inputs = {
input_ids: new ort.Tensor("int64", new BigInt64Array(flattened_input_ids.map(BigInt)), [
flattened_input_ids.length,
]),
offsets: new ort.Tensor("int64", new BigInt64Array(offsets.map(BigInt)), [offsets.length]),
};
const { embeddings } = await session.run(inputs);
return Array.from(embeddings.data as Float32Array);
}
private async runClassification(embeddings: number[]): Promise<number[]> {
const session = this.getModelSession("classifier");
const inputTensor = new ort.Tensor(
Float32Array.from(embeddings),
[1, 3, 1024],
);
const { logits } = await session.run({ channel_features: inputTensor });
return this.softmax(logits.data as Float32Array);
}
public async classifyVideo(title: string, description: string, tags: string, aid?: number): Promise<number> {
const embeddings = await this.getJinaEmbeddings1024([
title,
description,
tags,
]);
const probabilities = await this.runClassification(embeddings);
if (aid) {
logger.log(`Prediction result for aid: ${aid}: [${probabilities.map((p) => p.toFixed(5))}]`, "ml");
}
return probabilities.indexOf(Math.max(...probabilities));
}
public getModelVersion(): string {
return this.modelVersion;
}
}
const Akari = new AkariProto();
export default Akari;

View File

@ -1,12 +1,6 @@
import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers"; import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers";
import * as ort from "onnxruntime"; import * as ort from "onnxruntime";
import { softmax } from "lib/ml/filter_inference.ts";
function softmax(logits: Float32Array): number[] {
const maxLogit = Math.max(...logits);
const exponents = logits.map((logit) => Math.exp(logit - maxLogit));
const sumOfExponents = exponents.reduce((sum, exp) => sum + exp, 0);
return Array.from(exponents.map((exp) => exp / sumOfExponents));
}
// 配置参数 // 配置参数
const sentenceTransformerModelName = "alikia2x/jina-embedding-v3-m2v-1024"; const sentenceTransformerModelName = "alikia2x/jina-embedding-v3-m2v-1024";

View File

@ -0,0 +1,99 @@
import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers";
import * as ort from "onnxruntime";
import logger from "lib/log/logger.ts";
import { WorkerError } from "lib/mq/schema.ts";
const tokenizerModel = "alikia2x/jina-embedding-v3-m2v-1024";
const onnxClassifierPath = "./model/video_classifier_v3_17.onnx";
const onnxEmbeddingOriginalPath = "./model/model.onnx";
export const modelVersion = "3.17";
let sessionClassifier: ort.InferenceSession | null = null;
let sessionEmbedding: ort.InferenceSession | null = null;
let tokenizer: PreTrainedTokenizer | null = null;
export async function initializeModels() {
if (tokenizer && sessionClassifier && sessionEmbedding) {
return;
}
try {
tokenizer = await AutoTokenizer.from_pretrained(tokenizerModel);
const [classifierSession, embeddingSession] = await Promise.all([
ort.InferenceSession.create(onnxClassifierPath),
ort.InferenceSession.create(onnxEmbeddingOriginalPath),
]);
sessionClassifier = classifierSession;
sessionEmbedding = embeddingSession;
logger.log("Filter models initialized", "ml");
} catch (error) {
throw new WorkerError(error as Error, "ml", "fn:initializeModels");
}
}
export function softmax(logits: Float32Array): number[] {
const maxLogit = Math.max(...logits);
const exponents = logits.map((logit) => Math.exp(logit - maxLogit));
const sumOfExponents = exponents.reduce((sum, exp) => sum + exp, 0);
return Array.from(exponents.map((exp) => exp / sumOfExponents));
}
async function getONNXEmbeddings(texts: string[], session: ort.InferenceSession): Promise<number[]> {
if (!tokenizer) {
throw new Error("Tokenizer is not initialized. Call initializeModels() first.");
}
const { input_ids } = await tokenizer(texts, {
add_special_tokens: false,
return_tensor: false,
});
const cumsum = (arr: number[]): number[] =>
arr.reduce((acc: number[], num: number, i: number) => [...acc, num + (acc[i - 1] || 0)], []);
const offsets: number[] = [0, ...cumsum(input_ids.slice(0, -1).map((x: string) => x.length))];
const flattened_input_ids = input_ids.flat();
const inputs = {
input_ids: new ort.Tensor("int64", new BigInt64Array(flattened_input_ids.map(BigInt)), [
flattened_input_ids.length,
]),
offsets: new ort.Tensor("int64", new BigInt64Array(offsets.map(BigInt)), [offsets.length]),
};
const { embeddings } = await session.run(inputs);
return Array.from(embeddings.data as Float32Array);
}
async function runClassification(embeddings: number[]): Promise<number[]> {
if (!sessionClassifier) {
throw new Error("Classifier session is not initialized. Call initializeModels() first.");
}
const inputTensor = new ort.Tensor(
Float32Array.from(embeddings),
[1, 3, 1024],
);
const { logits } = await sessionClassifier.run({ channel_features: inputTensor });
return softmax(logits.data as Float32Array);
}
export async function classifyVideo(
title: string,
description: string,
tags: string,
aid: number,
): Promise<number> {
if (!sessionEmbedding) {
throw new Error("Embedding session is not initialized. Call initializeModels() first.");
}
const embeddings = await getONNXEmbeddings([
title,
description,
tags,
], sessionEmbedding);
const probabilities = await runClassification(embeddings);
logger.log(`Prediction result for aid: ${aid}: [${probabilities.map((p) => p.toFixed(5))}]`, "ml");
return probabilities.indexOf(Math.max(...probabilities));
}

View File

@ -1,37 +0,0 @@
import * as ort from "onnxruntime";
import logger from "lib/log/logger.ts";
import { WorkerError } from "lib/mq/schema.ts";
export class AIManager {
public sessions: { [key: string]: ort.InferenceSession } = {};
public models: { [key: string]: string } = {};
constructor() {
}
public async init() {
const modelKeys = Object.keys(this.models);
for (const key of modelKeys) {
try {
this.sessions[key] = await ort.InferenceSession.create(this.models[key]);
logger.log(`Model ${key} initialized`, "ml");
} catch (error) {
throw new WorkerError(error as Error, "ml", "fn:init");
}
}
}
public getModelSession(key: string): ort.InferenceSession {
if (this.sessions[key] === undefined) {
throw new WorkerError(new Error(`Model ${key} not found / not initialized.`), "ml", "fn:getModelSession");
}
return this.sessions[key];
}
public softmax(logits: Float32Array): number[] {
const maxLogit = Math.max(...logits);
const exponents = logits.map((logit) => Math.exp(logit - maxLogit));
const sumOfExponents = exponents.reduce((sum, exp) => sum + exp, 0);
return Array.from(exponents.map((exp) => exp / sumOfExponents));
}
}

View File

@ -1,22 +0,0 @@
import { AIManager } from "lib/ml/manager.ts";
import * as ort from "onnxruntime";
import logger from "lib/log/logger.ts";
import { WorkerError } from "lib/mq/schema.ts";
const modelPath = "./model/model.onnx";
class MantisProto extends AIManager {
constructor() {
super();
this.models = {
"predictor": modelPath,
};
}
public override async init(): Promise<void> {
await super.init();
}
}
const Mantis = new MantisProto();
export default Mantis;

View File

@ -1,12 +1,6 @@
import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers"; import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers";
import * as ort from "onnxruntime"; import * as ort from "onnxruntime";
import { softmax } from "lib/ml/filter_inference.ts";
function softmax(logits: Float32Array): number[] {
const maxLogit = Math.max(...logits);
const exponents = logits.map((logit) => Math.exp(logit - maxLogit));
const sumOfExponents = exponents.reduce((sum, exp) => sum + exp, 0);
return Array.from(exponents.map((exp) => exp / sumOfExponents));
}
// 配置参数 // 配置参数
const sentenceTransformerModelName = "alikia2x/jina-embedding-v3-m2v-1024"; const sentenceTransformerModelName = "alikia2x/jina-embedding-v3-m2v-1024";

View File

@ -1,7 +1,7 @@
import { Job } from "bullmq"; import { Job } from "bullmq";
import { db } from "lib/db/init.ts"; import { db } from "lib/db/init.ts";
import { getUnlabelledVideos, getVideoInfoFromAllData, insertVideoLabel } from "lib/db/allData.ts"; import { getUnlabelledVideos, getVideoInfoFromAllData, insertVideoLabel } from "lib/db/allData.ts";
import Akari from "lib/ml/akari.ts"; import { classifyVideo } from "lib/ml/filter_inference.ts";
import { ClassifyVideoQueue } from "lib/mq/index.ts"; import { ClassifyVideoQueue } from "lib/mq/index.ts";
import logger from "lib/log/logger.ts"; import logger from "lib/log/logger.ts";
import { lockManager } from "lib/mq/lockManager.ts"; import { lockManager } from "lib/mq/lockManager.ts";
@ -19,14 +19,14 @@ export const classifyVideoWorker = async (job: Job) => {
const title = videoInfo.title?.trim() || "untitled"; const title = videoInfo.title?.trim() || "untitled";
const description = videoInfo.description?.trim() || "N/A"; const description = videoInfo.description?.trim() || "N/A";
const tags = videoInfo.tags?.trim() || "empty"; const tags = videoInfo.tags?.trim() || "empty";
const label = await Akari.classifyVideo(title, description, tags, aid); const label = await classifyVideo(title, description, tags, aid);
if (label == -1) { if (label == -1) {
logger.warn(`Failed to classify video ${aid}`, "ml"); logger.warn(`Failed to classify video ${aid}`, "ml");
} }
await insertVideoLabel(client, aid, label); await insertVideoLabel(client, aid, label);
const exists = await aidExistsInSongs(client, aid); const exists = await aidExistsInSongs(client, aid);
if (!exists && label !== 0) { if (!exists) {
await insertIntoSongs(client, aid); await insertIntoSongs(client, aid);
} }

View File

@ -1,199 +1,228 @@
import { Job } from "bullmq"; import { Job } from "bullmq";
import { db } from "lib/db/init.ts"; import { MINUTE, SECOND } from "$std/datetime/constants.ts";
import {getLatestVideoSnapshot, getVideosNearMilestone} from "lib/db/snapshot.ts";
import {
findClosestSnapshot,
getLatestSnapshot,
getSnapshotsInNextSecond, getVideosWithoutActiveSnapshotSchedule,
hasAtLeast2Snapshots,
scheduleSnapshot,
setSnapshotStatus,
snapshotScheduleExists,
videoHasProcessingSchedule,
} from "lib/db/snapshotSchedule.ts";
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { WEEK, HOUR, MINUTE, SECOND } from "$std/datetime/constants.ts"; import { db } from "lib/db/init.ts";
import logger from "lib/log/logger.ts"; import {
getShortTermEtaPrediction,
getSongsNearMilestone,
getUnsnapshotedSongs,
songEligibleForMilestoneSnapshot,
} from "lib/db/snapshot.ts";
import { SnapshotQueue } from "lib/mq/index.ts"; import { SnapshotQueue } from "lib/mq/index.ts";
import { insertVideoSnapshot } from "lib/mq/task/getVideoStats.ts"; import { insertVideoStats } from "lib/mq/task/getVideoStats.ts";
import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts";
import { redis } from "lib/db/redis.ts";
import { NetSchedulerError } from "lib/mq/scheduler.ts"; import { NetSchedulerError } from "lib/mq/scheduler.ts";
import { setBiliVideoStatus } from "lib/db/allData.ts"; import logger from "lib/log/logger.ts";
import { formatSeconds } from "lib/utils/formatSeconds.ts";
import { truncate } from "lib/utils/truncate.ts"; import { truncate } from "lib/utils/truncate.ts";
const priorityMap: { [key: string]: number } = { async function snapshotScheduled(aid: number) {
"milestone": 1, try {
"normal": 3, return await redis.exists(`cvsa:snapshot:${aid}`);
}; } catch {
logger.error(`Failed to check scheduled status for ${aid}`, "mq");
return false;
}
}
const snapshotTypeToTaskMap: { [key: string]: string } = { async function setSnapshotScheduled(aid: number, value: boolean, exp: number) {
"milestone": "snapshotMilestoneVideo", try {
"normal": "snapshotVideo", if (value) {
}; await redis.set(`cvsa:snapshot:${aid}`, 1, "EX", exp);
} else {
await redis.del(`cvsa:snapshot:${aid}`);
}
} catch {
logger.error(`Failed to set scheduled status to ${value} for ${aid}`, "mq");
}
}
interface SongNearMilestone {
aid: number;
id: number;
created_at: string;
views: number;
coins: number;
likes: number;
favorites: number;
shares: number;
danmakus: number;
replies: number;
}
async function processMilestoneSnapshots(client: Client, vidoesNearMilestone: SongNearMilestone[]) {
let i = 0;
for (const snapshot of vidoesNearMilestone) {
if (await snapshotScheduled(snapshot.aid)) {
logger.silly(
`Video ${snapshot.aid} is already scheduled for snapshot`,
"mq",
"fn:processMilestoneSnapshots",
);
continue;
}
if (await songEligibleForMilestoneSnapshot(client, snapshot.aid) === false) {
logger.silly(
`Video ${snapshot.aid} is not eligible for milestone snapshot`,
"mq",
"fn:processMilestoneSnapshots",
);
continue;
}
const factor = Math.floor(i / 8);
const delayTime = factor * SECOND * 2;
await SnapshotQueue.add("snapshotMilestoneVideo", {
aid: snapshot.aid,
currentViews: snapshot.views,
snapshotedAt: snapshot.created_at,
}, { delay: delayTime, priority: 1 });
await setSnapshotScheduled(snapshot.aid, true, 20 * 60);
i++;
}
}
async function processUnsnapshotedVideos(unsnapshotedVideos: number[]) {
let i = 0;
for (const aid of unsnapshotedVideos) {
if (await snapshotScheduled(aid)) {
logger.silly(`Video ${aid} is already scheduled for snapshot`, "mq", "fn:processUnsnapshotedVideos");
continue;
}
const factor = Math.floor(i / 5);
const delayTime = factor * SECOND * 4;
await SnapshotQueue.add("snapshotVideo", {
aid,
}, { delay: delayTime, priority: 3 });
await setSnapshotScheduled(aid, true, 6 * 60 * 60);
i++;
}
}
export const snapshotTickWorker = async (_job: Job) => { export const snapshotTickWorker = async (_job: Job) => {
const client = await db.connect(); const client = await db.connect();
try { try {
const schedules = await getSnapshotsInNextSecond(client); const vidoesNearMilestone = await getSongsNearMilestone(client);
for (const schedule of schedules) { await processMilestoneSnapshots(client, vidoesNearMilestone);
let priority = 3;
if (schedule.type && priorityMap[schedule.type]) { const unsnapshotedVideos = await getUnsnapshotedSongs(client);
priority = priorityMap[schedule.type]; await processUnsnapshotedVideos(unsnapshotedVideos);
}
const aid = Number(schedule.aid);
await SnapshotQueue.add("snapshotVideo", {
aid: aid,
id: Number(schedule.id),
type: schedule.type ?? "normal",
}, { priority });
}
} catch (e) {
logger.error(e as Error);
} finally { } finally {
client.release(); client.release();
} }
}; };
export const closetMilestone = (views: number) => { export const takeSnapshotForMilestoneVideoWorker = async (job: Job) => {
if (views < 100000) return 100000;
if (views < 1000000) return 1000000;
return 10000000;
};
const log = (value: number, base: number = 10) => Math.log(value) / Math.log(base);
/*
* Returns the minimum ETA in hours for the next snapshot
* @param client - Postgres client
* @param aid - aid of the video
* @returns ETA in hours
*/
const getAdjustedShortTermETA = async (client: Client, aid: number) => {
const latestSnapshot = await getLatestSnapshot(client, aid);
// Immediately dispatch a snapshot if there is no snapshot yet
if (!latestSnapshot) return 0;
const snapshotsEnough = await hasAtLeast2Snapshots(client, aid);
if (!snapshotsEnough) return 0;
const currentTimestamp = new Date().getTime();
const timeIntervals = [3 * MINUTE, 20 * MINUTE, 1 * HOUR, 3 * HOUR, 6 * HOUR];
const DELTA = 0.00001;
let minETAHours = Infinity;
for (const timeInterval of timeIntervals) {
const date = new Date(currentTimestamp - timeInterval);
const snapshot = await findClosestSnapshot(client, aid, date);
if (!snapshot) continue;
const hoursDiff = (latestSnapshot.created_at - snapshot.created_at) / HOUR;
const viewsDiff = latestSnapshot.views - snapshot.views;
if (viewsDiff <= 0) continue;
const speed = viewsDiff / (hoursDiff + DELTA);
const target = closetMilestone(latestSnapshot.views);
const viewsToIncrease = target - latestSnapshot.views;
const eta = viewsToIncrease / (speed + DELTA);
let factor = log(2.97 / log(viewsToIncrease + 1), 1.14);
factor = truncate(factor, 3, 100);
const adjustedETA = eta / factor;
if (adjustedETA < minETAHours) {
minETAHours = adjustedETA;
}
}
if (isNaN(minETAHours)) {
minETAHours = Infinity;
}
return minETAHours;
};
export const collectMilestoneSnapshotsWorker = async (_job: Job) => {
const client = await db.connect(); const client = await db.connect();
await setSnapshotScheduled(job.data.aid, true, 20 * 60);
try { try {
const videos = await getVideosNearMilestone(client); const aid: number = job.data.aid;
for (const video of videos) { const currentViews: number = job.data.currentViews;
const aid = Number(video.aid); const lastSnapshoted: string = job.data.snapshotedAt;
const eta = await getAdjustedShortTermETA(client, aid); const stat = await insertVideoStats(client, aid, "snapshotMilestoneVideo");
if (eta > 72) continue; if (typeof stat === "number") {
const now = Date.now(); if (stat === -404 || stat === 62002 || stat == 62012) {
const scheduledNextSnapshotDelay = eta * HOUR; await setSnapshotScheduled(aid, true, 6 * 60 * 60);
const maxInterval = 4 * HOUR; } else {
await setSnapshotScheduled(aid, false, 0);
}
return;
}
const nextMilestone = currentViews >= 100000 ? 1000000 : 100000;
if (stat.views >= nextMilestone) {
await setSnapshotScheduled(aid, false, 0);
return;
}
let eta = await getShortTermEtaPrediction(client, aid);
if (eta === null) {
const DELTA = 0.001;
const intervalSeconds = (Date.now() - parseTimestampFromPsql(lastSnapshoted)) / SECOND;
const viewsIncrement = stat.views - currentViews;
const incrementSpeed = viewsIncrement / (intervalSeconds + DELTA);
const viewsToIncrease = nextMilestone - stat.views;
eta = viewsToIncrease / (incrementSpeed + DELTA);
}
const scheduledNextSnapshotDelay = eta * SECOND / 3;
const maxInterval = 20 * MINUTE;
const minInterval = 1 * SECOND; const minInterval = 1 * SECOND;
const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval); const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval);
const targetTime = now + delay; await SnapshotQueue.add("snapshotMilestoneVideo", {
await scheduleSnapshot(client, aid, "milestone", targetTime); aid,
} currentViews: stat.views,
snapshotedAt: stat.time,
}, { delay, priority: 1 });
await job.updateData({
...job.data,
updatedViews: stat.views,
updatedTime: new Date(stat.time).toISOString(),
etaInMins: eta / 60,
});
logger.log(
`Scheduled next milestone snapshot for ${aid} in ${
formatSeconds(delay / 1000)
}, current views: ${stat.views}`,
"mq",
);
} catch (e) { } catch (e) {
logger.error(e as Error, "mq", "fn:collectMilestoneSnapshotsWorker"); if (e instanceof NetSchedulerError && e.code === "NO_AVAILABLE_PROXY") {
} finally { logger.warn(
client.release(); `No available proxy for aid ${job.data.aid}.`,
"mq",
"fn:takeSnapshotForMilestoneVideoWorker",
);
await SnapshotQueue.add("snapshotMilestoneVideo", {
aid: job.data.aid,
currentViews: job.data.currentViews,
snapshotedAt: job.data.snapshotedAt,
}, { delay: 5 * SECOND, priority: 1 });
return;
} }
}; throw e;
export const regularSnapshotsWorker = async (_job: Job) => {
const client = await db.connect();
try {
const aids = await getVideosWithoutActiveSnapshotSchedule(client);
for (const rawAid of aids) {
const aid = Number(rawAid);
const latestSnapshot = await getLatestVideoSnapshot(client, aid);
const now = Date.now();
const lastSnapshotedAt = latestSnapshot?.time ?? now;
const targetTime = truncate(lastSnapshotedAt + 24 * HOUR, now + 1, now + 100000 * WEEK);
await scheduleSnapshot(client, aid, "normal", targetTime);
}
} catch (e) {
logger.error(e as Error, "mq", "fn:regularSnapshotsWorker");
} finally { } finally {
client.release(); client.release();
} }
}; };
export const takeSnapshotForVideoWorker = async (job: Job) => { export const takeSnapshotForVideoWorker = async (job: Job) => {
const id = job.data.id;
const aid = Number(job.data.aid);
const type = job.data.type;
const task = snapshotTypeToTaskMap[type] ?? "snapshotVideo";
const client = await db.connect(); const client = await db.connect();
const retryInterval = type === "milestone" ? 5 * SECOND : 2 * MINUTE; await setSnapshotScheduled(job.data.aid, true, 6 * 60 * 60);
const exists = await snapshotScheduleExists(client, id);
if (!exists) {
return;
}
try { try {
if (await videoHasProcessingSchedule(client, aid)) { const { aid } = job.data;
return `ALREADY_PROCESSING`; const stat = await insertVideoStats(client, aid, "getVideoInfo");
}
await setSnapshotStatus(client, id, "processing");
const stat = await insertVideoSnapshot(client, aid, task);
if (typeof stat === "number") { if (typeof stat === "number") {
await setBiliVideoStatus(client, aid, stat); if (stat === -404 || stat === 62002 || stat == 62012) {
await setSnapshotStatus(client, id, "completed"); await setSnapshotScheduled(aid, true, 6 * 60 * 60);
return `BILI_STATUS_${stat}`; } else {
await setSnapshotScheduled(aid, false, 0);
} }
await setSnapshotStatus(client, id, "completed");
if (type === "normal") {
await scheduleSnapshot(client, aid, type, Date.now() + 24 * HOUR);
return `DONE`;
}
if (type !== "milestone") return `DONE`;
const eta = await getAdjustedShortTermETA(client, aid);
if (eta > 72) return "ETA_TOO_LONG";
const now = Date.now();
const targetTime = now + eta * HOUR;
await scheduleSnapshot(client, aid, type, targetTime);
return `DONE`;
} catch (e) {
if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") {
logger.warn(
`No available proxy for aid ${job.data.aid}.`,
"mq",
"fn:takeSnapshotForVideoWorker",
);
await setSnapshotStatus(client, id, "completed");
await scheduleSnapshot(client, aid, type, Date.now() + retryInterval);
return; return;
} }
logger.error(e as Error, "mq", "fn:takeSnapshotForVideoWorker"); logger.log(`Taken snapshot for ${aid}`, "mq");
await setSnapshotStatus(client, id, "failed"); if (stat == null) {
setSnapshotScheduled(aid, false, 0);
return;
}
await job.updateData({
...job.data,
updatedViews: stat.views,
updatedTime: new Date(stat.time).toISOString(),
});
const nearMilestone = (stat.views >= 90000 && stat.views < 100000) ||
(stat.views >= 900000 && stat.views < 1000000);
if (nearMilestone) {
await SnapshotQueue.add("snapshotMilestoneVideo", {
aid,
currentViews: stat.views,
snapshotedAt: stat.time,
}, { delay: 0, priority: 1 });
}
await setSnapshotScheduled(aid, false, 0);
} catch (e) {
if (e instanceof NetSchedulerError && e.code === "NO_AVAILABLE_PROXY") {
await setSnapshotScheduled(job.data.aid, false, 0);
return;
}
throw e;
} finally { } finally {
client.release(); client.release();
} }

View File

@ -1,39 +1,24 @@
import { MINUTE, SECOND } from "$std/datetime/constants.ts"; import { MINUTE } from "$std/datetime/constants.ts";
import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "lib/mq/index.ts"; import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "lib/mq/index.ts";
import logger from "lib/log/logger.ts"; import logger from "lib/log/logger.ts";
export async function initMQ() { export async function initMQ() {
await LatestVideosQueue.upsertJobScheduler("getLatestVideos", { // await LatestVideosQueue.upsertJobScheduler("getLatestVideos", {
every: 1 * MINUTE, // every: 1 * MINUTE,
immediately: true, // immediately: true,
}); // });
await ClassifyVideoQueue.upsertJobScheduler("classifyVideos", { // await ClassifyVideoQueue.upsertJobScheduler("classifyVideos", {
every: 5 * MINUTE, // every: 5 * MINUTE,
immediately: true, // immediately: true,
}); // });
await LatestVideosQueue.upsertJobScheduler("collectSongs", { // await LatestVideosQueue.upsertJobScheduler("collectSongs", {
// every: 3 * MINUTE,
// immediately: true,
// });
await SnapshotQueue.upsertJobScheduler("scheduleSnapshotTick", {
every: 3 * MINUTE, every: 3 * MINUTE,
immediately: true, immediately: true,
}); });
await SnapshotQueue.upsertJobScheduler("snapshotTick", {
every: 1 * SECOND,
immediately: true,
}, {
opts: {
removeOnComplete: 1,
removeOnFail: 1,
},
});
await SnapshotQueue.upsertJobScheduler("collectMilestoneSnapshots", {
every: 5 * MINUTE,
immediately: true,
});
await SnapshotQueue.upsertJobScheduler("dispatchRegularSnapshots", {
every: 30 * MINUTE,
immediately: true,
});
logger.log("Message queue initialized."); logger.log("Message queue initialized.");
} }

View File

@ -21,7 +21,7 @@ interface ProxiesMap {
} }
type NetSchedulerErrorCode = type NetSchedulerErrorCode =
| "NO_PROXY_AVAILABLE" | "NO_AVAILABLE_PROXY"
| "PROXY_RATE_LIMITED" | "PROXY_RATE_LIMITED"
| "PROXY_NOT_FOUND" | "PROXY_NOT_FOUND"
| "FETCH_ERROR" | "FETCH_ERROR"
@ -143,10 +143,10 @@ class NetScheduler {
* @param {string} method - The HTTP method to use for the request. Default is "GET". * @param {string} method - The HTTP method to use for the request. Default is "GET".
* @returns {Promise<any>} - A promise that resolves to the response body. * @returns {Promise<any>} - A promise that resolves to the response body.
* @throws {NetSchedulerError} - The error will be thrown in following cases: * @throws {NetSchedulerError} - The error will be thrown in following cases:
* - No proxy is available currently: with error code `NO_PROXY_AVAILABLE` * - No available proxy currently: with error code NO_AVAILABLE_PROXY
* - The native `fetch` function threw an error: with error code `FETCH_ERROR` * - Proxy is under rate limit: with error code PROXY_RATE_LIMITED
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR` * - The native `fetch` function threw an error: with error code FETCH_ERROR
* - The proxy type is not supported: with error code `NOT_IMPLEMENTED` * - The proxy type is not supported: with error code NOT_IMPLEMENTED
*/ */
async request<R>(url: string, task: string, method: string = "GET"): Promise<R> { async request<R>(url: string, task: string, method: string = "GET"): Promise<R> {
// find a available proxy // find a available proxy
@ -156,7 +156,7 @@ class NetScheduler {
return await this.proxyRequest<R>(url, proxyName, task, method); return await this.proxyRequest<R>(url, proxyName, task, method);
} }
} }
throw new NetSchedulerError("No proxy is available currently.", "NO_PROXY_AVAILABLE"); throw new NetSchedulerError("No available proxy currently.", "NO_AVAILABLE_PROXY");
} }
/* /*
@ -168,11 +168,10 @@ class NetScheduler {
* @param {boolean} force - If true, the request will be made even if the proxy is rate limited. Default is false. * @param {boolean} force - If true, the request will be made even if the proxy is rate limited. Default is false.
* @returns {Promise<any>} - A promise that resolves to the response body. * @returns {Promise<any>} - A promise that resolves to the response body.
* @throws {NetSchedulerError} - The error will be thrown in following cases: * @throws {NetSchedulerError} - The error will be thrown in following cases:
* - Proxy not found: with error code `PROXY_NOT_FOUND` * - Proxy not found: with error code PROXY_NOT_FOUND
* - Proxy is under rate limit: with error code `PROXY_RATE_LIMITED` * - Proxy is under rate limit: with error code PROXY_RATE_LIMITED
* - The native `fetch` function threw an error: with error code `FETCH_ERROR` * - The native `fetch` function threw an error: with error code FETCH_ERROR
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR` * - The proxy type is not supported: with error code NOT_IMPLEMENTED
* - The proxy type is not supported: with error code `NOT_IMPLEMENTED`
*/ */
async proxyRequest<R>( async proxyRequest<R>(
url: string, url: string,
@ -256,6 +255,8 @@ class NetScheduler {
} }
private async alicloudFcRequest<R>(url: string, region: string): Promise<R> { private async alicloudFcRequest<R>(url: string, region: string): Promise<R> {
let rawOutput: null | Uint8Array = null;
let rawErr: null | Uint8Array = null;
try { try {
const decoder = new TextDecoder(); const decoder = new TextDecoder();
const output = await new Deno.Command("aliyun", { const output = await new Deno.Command("aliyun", {
@ -279,9 +280,15 @@ class NetScheduler {
`CVSA-${region}`, `CVSA-${region}`,
], ],
}).output(); }).output();
rawOutput = output.stdout;
rawErr = output.stderr;
const out = decoder.decode(output.stdout); const out = decoder.decode(output.stdout);
const rawData = JSON.parse(out); const rawData = JSON.parse(out);
if (rawData.statusCode !== 200) { if (rawData.statusCode !== 200) {
const fileId = randomUUID();
await Deno.writeFile(`./logs/files/${fileId}.stdout`, output.stdout);
await Deno.writeFile(`./logs/files/${fileId}.stderr`, output.stderr);
logger.log(`Returned non-200 status code. Raw ouput saved to ./logs/files/${fileId}.stdout/stderr`, "net", "fn:alicloudFcRequest")
throw new NetSchedulerError( throw new NetSchedulerError(
`Error proxying ${url} to ali-fc region ${region}, code: ${rawData.statusCode}.`, `Error proxying ${url} to ali-fc region ${region}, code: ${rawData.statusCode}.`,
"ALICLOUD_PROXY_ERR", "ALICLOUD_PROXY_ERR",
@ -290,6 +297,12 @@ class NetScheduler {
return JSON.parse(JSON.parse(rawData.body)) as R; return JSON.parse(JSON.parse(rawData.body)) as R;
} }
} catch (e) { } catch (e) {
if (rawOutput !== null || rawErr !== null) {
const fileId = randomUUID();
rawOutput && await Deno.writeFile(`./logs/files/${fileId}.stdout`, rawOutput);
rawErr && await Deno.writeFile(`./logs/files/${fileId}.stderr`, rawErr);
logger.log(`Error occurred. Raw ouput saved to ./logs/files/${fileId}.stdout/stderr`, "net", "fn:alicloudFcRequest")
}
logger.error(e as Error, "net", "fn:alicloudFcRequest"); logger.error(e as Error, "net", "fn:alicloudFcRequest");
throw new NetSchedulerError(`Unhandled error: Cannot proxy ${url} to ali-fc.`, "ALICLOUD_PROXY_ERR", e); throw new NetSchedulerError(`Unhandled error: Cannot proxy ${url} to ali-fc.`, "ALICLOUD_PROXY_ERR", e);
} }
@ -348,8 +361,7 @@ Execution order for setup:
- Call after addProxy and addTask. Configures rate limiters specifically for tasks and their associated proxies. - Call after addProxy and addTask. Configures rate limiters specifically for tasks and their associated proxies.
- Depends on tasks and proxies being defined to apply limiters correctly. - Depends on tasks and proxies being defined to apply limiters correctly.
4. setProviderLimiter(providerName, config): 4. setProviderLimiter(providerName, config):
- Call after addProxy and addTask. - Call after addProxy and addTask. Sets rate limiters at the provider level, affecting all proxies used by tasks of that provider.
- It sets rate limiters at the provider level, affecting all proxies used by tasks of that provider.
- Depends on tasks and proxies being defined to identify which proxies to apply provider-level limiters to. - Depends on tasks and proxies being defined to identify which proxies to apply provider-level limiters to.
In summary: addProxy -> addTask -> (setTaskLimiter and/or setProviderLimiter). In summary: addProxy -> addTask -> (setTaskLimiter and/or setProviderLimiter).

View File

@ -15,11 +15,12 @@ export async function collectSongs(client: Client) {
export async function insertIntoSongs(client: Client, aid: number) { export async function insertIntoSongs(client: Client, aid: number) {
await client.queryObject( await client.queryObject(
` `
INSERT INTO songs (aid, published_at, duration) INSERT INTO songs (aid, bvid, published_at, duration)
VALUES ( VALUES (
$1, $1,
(SELECT published_at FROM bilibili_metadata WHERE aid = $1), (SELECT bvid FROM all_data WHERE aid = $1),
(SELECT duration FROM bilibili_metadata WHERE aid = $1) (SELECT published_at FROM all_data WHERE aid = $1),
(SELECT duration FROM all_data WHERE aid = $1)
) )
ON CONFLICT DO NOTHING ON CONFLICT DO NOTHING
`, `,

View File

@ -4,7 +4,6 @@ import { formatTimestampToPsql } from "lib/utils/formatTimestampToPostgre.ts";
import logger from "lib/log/logger.ts"; import logger from "lib/log/logger.ts";
import { ClassifyVideoQueue } from "lib/mq/index.ts"; import { ClassifyVideoQueue } from "lib/mq/index.ts";
import { userExistsInBiliUsers, videoExistsInAllData } from "lib/db/allData.ts"; import { userExistsInBiliUsers, videoExistsInAllData } from "lib/db/allData.ts";
import { HOUR, SECOND } from "$std/datetime/constants.ts";
export async function insertVideoInfo(client: Client, aid: number) { export async function insertVideoInfo(client: Client, aid: number) {
const videoExists = await videoExistsInAllData(client, aid); const videoExists = await videoExistsInAllData(client, aid);
@ -19,25 +18,25 @@ export async function insertVideoInfo(client: Client, aid: number) {
const desc = data.View.desc; const desc = data.View.desc;
const uid = data.View.owner.mid; const uid = data.View.owner.mid;
const tags = data.Tags const tags = data.Tags
.filter((tag) => !["old_channel", "topic"].indexOf(tag.tag_type)) .filter((tag) => tag.tag_type in ["old_channel", "topic"])
.map((tag) => tag.tag_name).join(","); .map((tag) => tag.tag_name).join(",");
const title = data.View.title; const title = data.View.title;
const published_at = formatTimestampToPsql(data.View.pubdate * SECOND + 8 * HOUR); const published_at = formatTimestampToPsql(data.View.pubdate);
const duration = data.View.duration; const duration = data.View.duration;
await client.queryObject( await client.queryObject(
`INSERT INTO bilibili_metadata (aid, bvid, description, uid, tags, title, published_at, duration) `INSERT INTO all_data (aid, bvid, description, uid, tags, title, published_at, duration)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
[aid, bvid, desc, uid, tags, title, published_at, duration], [aid, bvid, desc, uid, tags, title, published_at, duration],
); );
const userExists = await userExistsInBiliUsers(client, aid); const userExists = await userExistsInBiliUsers(client, aid);
if (!userExists) { if (!userExists) {
await client.queryObject( await client.queryObject(
`INSERT INTO bilibili_user (uid, username, "desc", fans) VALUES ($1, $2, $3, $4)`, `INSERT INTO bili_user (uid, username, "desc", fans) VALUES ($1, $2, $3, $4)`,
[uid, data.View.owner.name, data.Card.card.sign, data.Card.follower], [uid, data.View.owner.name, data.Card.card.sign, data.Card.follower],
); );
} else { } else {
await client.queryObject( await client.queryObject(
`UPDATE bilibili_user SET fans = $1 WHERE uid = $2`, `UPDATE bili_user SET fans = $1 WHERE uid = $2`,
[data.Card.follower, uid], [data.Card.follower, uid],
); );
} }

View File

@ -1,28 +1,12 @@
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { getVideoInfo } from "lib/net/getVideoInfo.ts"; import { getVideoInfo } from "lib/net/getVideoInfo.ts";
import { LatestSnapshotType } from "lib/db/schema.d.ts";
import logger from "lib/log/logger.ts";
/* export async function insertVideoStats(client: Client, aid: number, task: string) {
* Fetch video stats from bilibili API and insert into database
* @returns {Promise<number|VideoSnapshot>}
* A number indicating the status code when receiving non-0 status code from bilibili,
* otherwise an VideoSnapshot object containing the video stats
* @throws {NetSchedulerError} - The error will be thrown in following cases:
* - No proxy is available currently: with error code `NO_PROXY_AVAILABLE`
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
*/
export async function insertVideoSnapshot(
client: Client,
aid: number,
task: string,
): Promise<number | LatestSnapshotType> {
const data = await getVideoInfo(aid, task); const data = await getVideoInfo(aid, task);
if (typeof data == "number") { const time = new Date().getTime();
if (typeof data == 'number') {
return data; return data;
} }
const time = new Date().getTime();
const views = data.stat.view; const views = data.stat.view;
const danmakus = data.stat.danmaku; const danmakus = data.stat.danmaku;
const replies = data.stat.reply; const replies = data.stat.reply;
@ -30,19 +14,11 @@ export async function insertVideoSnapshot(
const coins = data.stat.coin; const coins = data.stat.coin;
const shares = data.stat.share; const shares = data.stat.share;
const favorites = data.stat.favorite; const favorites = data.stat.favorite;
await client.queryObject(`
const query: string = `
INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites) INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`; `, [aid, views, danmakus, replies, likes, coins, shares, favorites]);
await client.queryObject( return {
query,
[aid, views, danmakus, replies, likes, coins, shares, favorites],
);
logger.log(`Taken snapshot for video ${aid}.`, "net", "fn:insertVideoSnapshot");
const snapshot: LatestSnapshotType = {
aid, aid,
views, views,
danmakus, danmakus,
@ -51,8 +27,6 @@ export async function insertVideoSnapshot(
coins, coins,
shares, shares,
favorites, favorites,
time, time
}; }
return snapshot;
} }

View File

@ -26,8 +26,8 @@ interface VideoInfoData {
mid: number; mid: number;
name: string; name: string;
face: string; face: string;
}; },
stat: VideoStats; stat: VideoStats,
} }
interface VideoDetailsData { interface VideoDetailsData {

View File

@ -2,19 +2,6 @@ import netScheduler from "lib/mq/scheduler.ts";
import { VideoInfoData, VideoInfoResponse } from "lib/net/bilibili.d.ts"; import { VideoInfoData, VideoInfoResponse } from "lib/net/bilibili.d.ts";
import logger from "lib/log/logger.ts"; import logger from "lib/log/logger.ts";
/*
* Fetch video metadata from bilibili API
* @param {number} aid - The video's aid
* @param {string} task - The task name used in scheduler. It can be one of the following:
* - snapshotVideo
* - getVideoInfo
* - snapshotMilestoneVideo
* @returns {Promise<VideoInfoData | number>} VideoInfoData or the error code returned by bilibili API
* @throws {NetSchedulerError} - The error will be thrown in following cases:
* - No proxy is available currently: with error code `NO_PROXY_AVAILABLE`
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
*/
export async function getVideoInfo(aid: number, task: string): Promise<VideoInfoData | number> { export async function getVideoInfo(aid: number, task: string): Promise<VideoInfoData | number> {
const url = `https://api.bilibili.com/x/web-interface/view?aid=${aid}`; const url = `https://api.bilibili.com/x/web-interface/view?aid=${aid}`;
const data = await netScheduler.request<VideoInfoResponse>(url, task); const data = await netScheduler.request<VideoInfoResponse>(url, task);

View File

@ -1,9 +1,9 @@
export const formatSeconds = (seconds: number) => { export const formatSeconds = (seconds: number) => {
if (seconds < 60) { if (seconds < 60) {
return `${seconds.toFixed(1)}s`; return `${(seconds).toFixed(1)}s`;
} }
if (seconds < 3600) { if (seconds < 3600) {
return `${Math.floor(seconds / 60)}m${(seconds % 60).toFixed(1)}s`; return `${Math.floor(seconds / 60)}m${seconds % 60}s`;
} }
return `${Math.floor(seconds / 3600)}h ${((seconds % 3600) / 60).toFixed(2)}m`; return `${Math.floor(seconds / 3600)}h ${((seconds % 3600) / 60).toFixed(2)}m`;
}; };

View File

@ -4,20 +4,20 @@ from model import CompactPredictor
import torch import torch
def main(): def main():
model = CompactPredictor(10).to('cpu', dtype=torch.float32) model = CompactPredictor(16).to('cpu', dtype=torch.float32)
model.load_state_dict(torch.load('./pred/checkpoints/long_term.pt')) model.load_state_dict(torch.load('./pred/checkpoints/model_20250315_0530.pt'))
model.eval() model.eval()
# inference # inference
initial = 997029 initial = 999269
last = initial last = initial
start_time = '2025-03-17 00:13:17' start_time = '2025-03-15 01:03:21'
for i in range(1, 120): for i in range(1, 48):
hour = i / 0.5 hour = i / 0.5
sec = hour * 3600 sec = hour * 3600
time_d = np.log2(sec) time_d = np.log2(sec)
data = [time_d, np.log2(initial+1), # time_delta, current_views data = [time_d, np.log2(initial+1), # time_delta, current_views
6.111542, 8.404707, 10.071566, 11.55888, 12.457823,# grows_feat 2.801318, 3.455128, 3.903391, 3.995577, 4.641488, 5.75131, 6.723868, 6.105322, 8.141023, 9.576701, 10.665067, # grows_feat
0.009225, 0.001318, 28.001814# time_feat 0.043993, 0.72057, 28.000902 # time_feat
] ]
np_arr = np.array([data]) np_arr = np.array([data])
tensor = torch.from_numpy(np_arr).to('cpu', dtype=torch.float32) tensor = torch.from_numpy(np_arr).to('cpu', dtype=torch.float32)
@ -25,7 +25,7 @@ def main():
num = output.detach().numpy()[0][0] num = output.detach().numpy()[0][0]
views_pred = int(np.exp2(num)) + initial views_pred = int(np.exp2(num)) + initial
current_time = datetime.datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S') + datetime.timedelta(hours=hour) current_time = datetime.datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S') + datetime.timedelta(hours=hour)
print(current_time.strftime('%m-%d %H:%M:%S'), views_pred, views_pred - last) print(current_time.strftime('%m-%d %H:%M'), views_pred, views_pred - last)
last = views_pred last = views_pred
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -1,10 +1,10 @@
import { ConnectionOptions, Job, Worker } from "bullmq"; import { Job, Worker } from "bullmq";
import { redis } from "lib/db/redis.ts"; import { redis } from "lib/db/redis.ts";
import logger from "lib/log/logger.ts"; import logger from "lib/log/logger.ts";
import { classifyVideosWorker, classifyVideoWorker } from "lib/mq/exec/classifyVideo.ts"; import { classifyVideosWorker, classifyVideoWorker } from "lib/mq/exec/classifyVideo.ts";
import { WorkerError } from "lib/mq/schema.ts"; import { WorkerError } from "lib/mq/schema.ts";
import { lockManager } from "lib/mq/lockManager.ts"; import { lockManager } from "lib/mq/lockManager.ts";
import Akari from "lib/ml/akari.ts"; import { initializeModels } from "lib/ml/filter_inference.ts";
Deno.addSignalListener("SIGINT", async () => { Deno.addSignalListener("SIGINT", async () => {
logger.log("SIGINT Received: Shutting down workers...", "mq"); logger.log("SIGINT Received: Shutting down workers...", "mq");
@ -18,7 +18,7 @@ Deno.addSignalListener("SIGTERM", async () => {
Deno.exit(); Deno.exit();
}); });
Akari.init(); await initializeModels();
const filterWorker = new Worker( const filterWorker = new Worker(
"classifyVideo", "classifyVideo",
@ -32,7 +32,7 @@ const filterWorker = new Worker(
break; break;
} }
}, },
{ connection: redis as ConnectionOptions, concurrency: 2, removeOnComplete: { count: 1000 } }, { connection: redis, concurrency: 2, removeOnComplete: { count: 1000 } },
); );
filterWorker.on("active", () => { filterWorker.on("active", () => {

View File

@ -1,27 +1,21 @@
import { ConnectionOptions, Job, Worker } from "bullmq"; import { Job, Worker } from "bullmq";
import { collectSongsWorker, getLatestVideosWorker } from "lib/mq/executors.ts"; import { collectSongsWorker, getLatestVideosWorker } from "lib/mq/executors.ts";
import { redis } from "lib/db/redis.ts"; import { redis } from "lib/db/redis.ts";
import logger from "lib/log/logger.ts"; import logger from "lib/log/logger.ts";
import { lockManager } from "lib/mq/lockManager.ts"; import { lockManager } from "lib/mq/lockManager.ts";
import { WorkerError } from "lib/mq/schema.ts"; import { WorkerError } from "lib/mq/schema.ts";
import { getVideoInfoWorker } from "lib/mq/exec/getLatestVideos.ts"; import { getVideoInfoWorker } from "lib/mq/exec/getLatestVideos.ts";
import { import { snapshotTickWorker, takeSnapshotForMilestoneVideoWorker, takeSnapshotForVideoWorker } from "lib/mq/exec/snapshotTick.ts";
collectMilestoneSnapshotsWorker, regularSnapshotsWorker,
snapshotTickWorker,
takeSnapshotForVideoWorker,
} from "lib/mq/exec/snapshotTick.ts";
Deno.addSignalListener("SIGINT", async () => { Deno.addSignalListener("SIGINT", async () => {
logger.log("SIGINT Received: Shutting down workers...", "mq"); logger.log("SIGINT Received: Shutting down workers...", "mq");
await latestVideoWorker.close(true); await latestVideoWorker.close(true);
await snapshotWorker.close(true);
Deno.exit(); Deno.exit();
}); });
Deno.addSignalListener("SIGTERM", async () => { Deno.addSignalListener("SIGTERM", async () => {
logger.log("SIGTERM Received: Shutting down workers...", "mq"); logger.log("SIGTERM Received: Shutting down workers...", "mq");
await latestVideoWorker.close(true); await latestVideoWorker.close(true);
await snapshotWorker.close(true);
Deno.exit(); Deno.exit();
}); });
@ -42,12 +36,7 @@ const latestVideoWorker = new Worker(
break; break;
} }
}, },
{ { connection: redis, concurrency: 6, removeOnComplete: { count: 1440 }, removeOnFail: { count: 0 } },
connection: redis as ConnectionOptions,
concurrency: 6,
removeOnComplete: { count: 1440 },
removeOnFail: { count: 0 },
},
); );
latestVideoWorker.on("active", () => { latestVideoWorker.on("active", () => {
@ -67,26 +56,23 @@ const snapshotWorker = new Worker(
"snapshot", "snapshot",
async (job: Job) => { async (job: Job) => {
switch (job.name) { switch (job.name) {
case "snapshotVideo": case "scheduleSnapshotTick":
await takeSnapshotForVideoWorker(job);
break;
case "snapshotTick":
await snapshotTickWorker(job); await snapshotTickWorker(job);
break; break;
case "collectMilestoneSnapshots": case "snapshotMilestoneVideo":
await collectMilestoneSnapshotsWorker(job); await takeSnapshotForMilestoneVideoWorker(job);
break; break;
case "dispatchRegularSnapshots": case "snapshotVideo":
await regularSnapshotsWorker(job); await takeSnapshotForVideoWorker(job);
break; break;
default: default:
break; break;
} }
}, },
{ connection: redis as ConnectionOptions, concurrency: 10, removeOnComplete: { count: 2000 } }, { connection: redis, concurrency: 10, removeOnComplete: { count: 2000 } },
); );
snapshotWorker.on("error", (err) => { snapshotWorker.on("error", (err) => {
const e = err as WorkerError; const e = err as WorkerError;
logger.error(e.rawError, e.service, e.codePath); logger.error(e.rawError, e.service, e.codePath);
}); })

View File

@ -1,18 +0,0 @@
import { assertEquals, assertInstanceOf, assertNotEquals } from "@std/assert";
import { findClosestSnapshot } from "lib/db/snapshotSchedule.ts";
import { postgresConfig } from "lib/db/pgConfig.ts";
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
Deno.test("Snapshot Schedule - getShortTermTimeFeaturesForVideo", async () => {
const client = new Client(postgresConfig);
try {
const result = await findClosestSnapshot(client, 247308539, new Date(1741983383000));
assertNotEquals(result, null);
const created_at = result!.created_at;
const views = result!.views;
assertInstanceOf(created_at, Date);
assertEquals(typeof views, "number");
} finally {
client.end();
}
});

View File

@ -1,22 +0,0 @@
{
"test1": [
{
"title": "【洛天依】《一花依世界》2024重调版|“抬头仰望夜空多安详”【原创PV付】",
"desc": "本家BV1Vs411H7JH\n作曲LS\n作词杏花包子\n调教鬼面P\n混音虎皮猫P\n演唱洛天依\n曲绘山下鸭鸭窝\n映像阿妍\n——————————————————————\n本稿为同人二创非本家重制",
"tags": "发现《一花依世界》, Vsinger创作激励计划, 洛天依, VOCALOID CHINA, 翻唱, 原创PV付, ACE虚拟歌姬, 中文VOCALOID, 国风电子, 一花依世界, ACE Studio, Vsinger创作激励计划2024冬季物语",
"label": 2
},
{
"title": "【鏡音レン】アカシア【VOCALOID Cover】",
"desc": "鏡音リン・レン 13th Anniversary\n\nMusicBUMP OF CHICKEN https://youtu.be/BoZ0Zwab6Oc\nustMaplestyle sm37853236\nOff Vocal: https://youtu.be/YMzrUzq1uX0\nSinger鏡音レン\n\n氷雨ハルカ\nYoutube https://t.co/8zuv6g7Acm\nniconicohttps://t.co/C6DRfdYAp0\ntwitter https://twitter.com/hisame_haruka\n\n転載禁止\nPlease do not reprint without my permission.",
"tags": "鏡音レン",
"label": 0
},
{
"title": "【洛天依原创曲】谪星【姆斯塔之谕】",
"desc": "谪星\n\n策划/世界观:听雨\n作词听雨\n作曲/编曲:太白\n混音虎皮猫\n人设以木\n曲绘Ar极光\n调校哈士奇p\n视频苏卿白",
"tags": "2025虚拟歌手贺岁纪, 洛天依, 原创歌曲, VOCALOID, 虚拟歌手, 原创音乐, 姆斯塔, 中文VOCALOID",
"label": 1
}
]
}

View File

@ -1,46 +0,0 @@
import Akari from "lib/ml/akari.ts";
import { assertEquals, assertGreaterOrEqual } from "jsr:@std/assert";
import { join } from "$std/path/join.ts";
import { SECOND } from "$std/datetime/constants.ts";
Deno.test("Akari AI - normal cases accuracy test", async () => {
const path = import.meta.dirname!;
const dataPath = join(path, "akari.json");
const rawData = await Deno.readTextFile(dataPath);
const data = JSON.parse(rawData);
await Akari.init();
for (const testCase of data.test1) {
const result = await Akari.classifyVideo(
testCase.title,
testCase.desc,
testCase.tags,
);
assertEquals(result, testCase.label);
}
});
Deno.test("Akari AI - performance test", async () => {
const path = import.meta.dirname!;
const dataPath = join(path, "akari.json");
const rawData = await Deno.readTextFile(dataPath);
const data = JSON.parse(rawData);
await Akari.init();
const N = 200;
const testCase = data.test1[0];
const title = testCase.title;
const desc = testCase.desc;
const tags = testCase.tags;
const time = performance.now();
for (let i = 0; i < N; i++) {
await Akari.classifyVideo(
title,
desc,
tags,
);
}
const end = performance.now();
const elapsed = (end - time) / SECOND;
const throughput = N / elapsed;
assertGreaterOrEqual(throughput, 100);
console.log(`Akari AI throughput: ${throughput.toFixed(1)} samples / sec`);
});