Compare commits
27 Commits
afffbd8ecb
...
35d58be8fd
Author | SHA1 | Date | |
---|---|---|---|
35d58be8fd | |||
3a0dd26c68 | |||
9e764746fb | |||
33c63fc29f | |||
b654eb3643 | |||
7768a202b2 | |||
8652ac8fb7 | |||
18fc9752bb | |||
2c12310e8c | |||
b201bfd64d | |||
e38dc96275 | |||
9f9ef800d1 | |||
e5534cda24 | |||
559c63b434 | |||
1895d601d9 | |||
fabb77d98d | |||
8158ce10c0 | |||
00b52c01f7 | |||
2e8ed7ce70 | |||
cd8aa826e1 | |||
b07d0c18f9 | |||
a9ac8de547 | |||
0ff1c78dcc | |||
a6c8fd7f3f | |||
7104a95af9 | |||
5af2236109 | |||
93bdddc21e |
@ -1,21 +1,24 @@
|
||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||
import { AllDataType, BiliUserType } from "lib/db/schema.d.ts";
|
||||
import { modelVersion } from "lib/ml/filter_inference.ts";
|
||||
import Akari from "lib/ml/akari.ts";
|
||||
|
||||
export async function videoExistsInAllData(client: Client, aid: number) {
|
||||
return await client.queryObject<{ exists: boolean }>(`SELECT EXISTS(SELECT 1 FROM all_data WHERE aid = $1)`, [aid])
|
||||
return await client.queryObject<{ exists: boolean }>(
|
||||
`SELECT EXISTS(SELECT 1 FROM bilibili_metadata WHERE aid = $1)`,
|
||||
[aid],
|
||||
)
|
||||
.then((result) => result.rows[0].exists);
|
||||
}
|
||||
|
||||
export async function userExistsInBiliUsers(client: Client, uid: number) {
|
||||
return await client.queryObject<{ exists: boolean }>(`SELECT EXISTS(SELECT 1 FROM bili_user WHERE uid = $1)`, [
|
||||
return await client.queryObject<{ exists: boolean }>(`SELECT EXISTS(SELECT 1 FROM bilibili_user WHERE uid = $1)`, [
|
||||
uid,
|
||||
]);
|
||||
}
|
||||
|
||||
export async function getUnlabelledVideos(client: Client) {
|
||||
const queryResult = await client.queryObject<{ aid: number }>(
|
||||
`SELECT a.aid FROM all_data a LEFT JOIN labelling_result l ON a.aid = l.aid WHERE l.aid IS NULL`,
|
||||
`SELECT a.aid FROM bilibili_metadata a LEFT JOIN labelling_result l ON a.aid = l.aid WHERE l.aid IS NULL`,
|
||||
);
|
||||
return queryResult.rows.map((row) => row.aid);
|
||||
}
|
||||
@ -23,20 +26,20 @@ export async function getUnlabelledVideos(client: Client) {
|
||||
export async function insertVideoLabel(client: Client, aid: number, label: number) {
|
||||
return await client.queryObject(
|
||||
`INSERT INTO labelling_result (aid, label, model_version) VALUES ($1, $2, $3) ON CONFLICT (aid, model_version) DO NOTHING`,
|
||||
[aid, label, modelVersion],
|
||||
[aid, label, Akari.getModelVersion()],
|
||||
);
|
||||
}
|
||||
|
||||
export async function getVideoInfoFromAllData(client: Client, aid: number) {
|
||||
const queryResult = await client.queryObject<AllDataType>(
|
||||
`SELECT * FROM all_data WHERE aid = $1`,
|
||||
`SELECT * FROM bilibili_metadata WHERE aid = $1`,
|
||||
[aid],
|
||||
);
|
||||
const row = queryResult.rows[0];
|
||||
let authorInfo = "";
|
||||
if (row.uid && await userExistsInBiliUsers(client, row.uid)) {
|
||||
const q = await client.queryObject<BiliUserType>(
|
||||
`SELECT * FROM bili_user WHERE uid = $1`,
|
||||
`SELECT * FROM bilibili_user WHERE uid = $1`,
|
||||
[row.uid],
|
||||
);
|
||||
const userRow = q.rows[0];
|
||||
@ -56,8 +59,8 @@ export async function getUnArchivedBiliUsers(client: Client) {
|
||||
const queryResult = await client.queryObject<{ uid: number }>(
|
||||
`
|
||||
SELECT ad.uid
|
||||
FROM all_data ad
|
||||
LEFT JOIN bili_user bu ON ad.uid = bu.uid
|
||||
FROM bilibili_metadata ad
|
||||
LEFT JOIN bilibili_user bu ON ad.uid = bu.uid
|
||||
WHERE bu.uid IS NULL;
|
||||
`,
|
||||
[],
|
||||
@ -65,3 +68,10 @@ export async function getUnArchivedBiliUsers(client: Client) {
|
||||
const rows = queryResult.rows;
|
||||
return rows.map((row) => row.uid);
|
||||
}
|
||||
|
||||
export async function setBiliVideoStatus(client: Client, aid: number, status: number) {
|
||||
return await client.queryObject(
|
||||
`UPDATE bilibili_metadata SET status = $1 WHERE aid = $2`,
|
||||
[status, aid],
|
||||
);
|
||||
}
|
||||
|
22
lib/db/schema.d.ts
vendored
22
lib/db/schema.d.ts
vendored
@ -31,3 +31,25 @@ export interface VideoSnapshotType {
|
||||
aid: bigint;
|
||||
replies: number;
|
||||
}
|
||||
|
||||
export interface LatestSnapshotType {
|
||||
aid: number;
|
||||
time: number;
|
||||
views: number;
|
||||
danmakus: number;
|
||||
replies: number;
|
||||
likes: number;
|
||||
coins: number;
|
||||
shares: number;
|
||||
favorites: number;
|
||||
}
|
||||
|
||||
export interface SnapshotScheduleType {
|
||||
id: number;
|
||||
aid: number;
|
||||
type?: string;
|
||||
created_at: string;
|
||||
started_at?: string;
|
||||
finished_at?: string;
|
||||
status: string;
|
||||
}
|
||||
|
@ -1,44 +1,17 @@
|
||||
import { DAY, SECOND } from "$std/datetime/constants.ts";
|
||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||
import { VideoSnapshotType } from "lib/db/schema.d.ts";
|
||||
import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts";
|
||||
import { LatestSnapshotType } from "lib/db/schema.d.ts";
|
||||
|
||||
export async function getSongsNearMilestone(client: Client) {
|
||||
const queryResult = await client.queryObject<VideoSnapshotType>(`
|
||||
WITH max_views_per_aid AS (
|
||||
-- 找出每个 aid 的最大 views 值,并确保 aid 存在于 songs 表中
|
||||
SELECT
|
||||
vs.aid,
|
||||
MAX(vs.views) AS max_views
|
||||
FROM
|
||||
video_snapshot vs
|
||||
export async function getVideosNearMilestone(client: Client) {
|
||||
const queryResult = await client.queryObject<LatestSnapshotType>(`
|
||||
SELECT ls.*
|
||||
FROM latest_video_snapshot ls
|
||||
INNER JOIN
|
||||
songs s
|
||||
ON
|
||||
vs.aid = s.aid
|
||||
GROUP BY
|
||||
vs.aid
|
||||
),
|
||||
filtered_max_views AS (
|
||||
-- 筛选出满足条件的最大 views
|
||||
SELECT
|
||||
aid,
|
||||
max_views
|
||||
FROM
|
||||
max_views_per_aid
|
||||
songs s ON ls.aid = s.aid
|
||||
WHERE
|
||||
(max_views >= 90000 AND max_views < 100000) OR
|
||||
(max_views >= 900000 AND max_views < 1000000)
|
||||
)
|
||||
-- 获取符合条件的完整行数据
|
||||
SELECT
|
||||
vs.*
|
||||
FROM
|
||||
video_snapshot vs
|
||||
INNER JOIN
|
||||
filtered_max_views fmv
|
||||
ON
|
||||
vs.aid = fmv.aid AND vs.views = fmv.max_views
|
||||
s.deleted = false AND
|
||||
(views >= 90000 AND views < 100000) OR
|
||||
(views >= 900000 AND views < 1000000) OR
|
||||
(views >= 9900000 AND views < 10000000)
|
||||
`);
|
||||
return queryResult.rows.map((row) => {
|
||||
return {
|
||||
@ -48,143 +21,20 @@ export async function getSongsNearMilestone(client: Client) {
|
||||
});
|
||||
}
|
||||
|
||||
export async function getUnsnapshotedSongs(client: Client) {
|
||||
const queryResult = await client.queryObject<{ aid: bigint }>(`
|
||||
SELECT DISTINCT s.aid
|
||||
FROM songs s
|
||||
LEFT JOIN video_snapshot v ON s.aid = v.aid
|
||||
WHERE v.aid IS NULL;
|
||||
`);
|
||||
return queryResult.rows.map((row) => Number(row.aid));
|
||||
}
|
||||
|
||||
export async function getSongSnapshotCount(client: Client, aid: number) {
|
||||
const queryResult = await client.queryObject<{ count: number }>(
|
||||
`
|
||||
SELECT COUNT(*) AS count
|
||||
FROM video_snapshot
|
||||
WHERE aid = $1;
|
||||
`,
|
||||
[aid],
|
||||
);
|
||||
return queryResult.rows[0].count;
|
||||
}
|
||||
|
||||
export async function getShortTermEtaPrediction(client: Client, aid: number) {
|
||||
const queryResult = await client.queryObject<{eta: number}>(
|
||||
`
|
||||
WITH old_snapshot AS (
|
||||
SELECT created_at, views
|
||||
FROM video_snapshot
|
||||
WHERE aid = $1 AND
|
||||
NOW() - created_at > '20 min'
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 1
|
||||
),
|
||||
new_snapshot AS (
|
||||
SELECT created_at, views
|
||||
FROM video_snapshot
|
||||
export async function getLatestVideoSnapshot(client: Client, aid: number): Promise<null | LatestSnapshotType> {
|
||||
const queryResult = await client.queryObject<LatestSnapshotType>(`
|
||||
SELECT *
|
||||
FROM latest_video_snapshot
|
||||
WHERE aid = $1
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 1
|
||||
)
|
||||
SELECT
|
||||
CASE
|
||||
WHEN n.views > 100000
|
||||
THEN
|
||||
(1000000 - n.views) -- Views remaining
|
||||
/
|
||||
(
|
||||
(n.views - o.views) -- Views delta
|
||||
/
|
||||
(EXTRACT(EPOCH FROM (n.created_at - o.created_at)) + 0.001) -- Time delta in seconds
|
||||
+ 0.001
|
||||
) -- Increment per second
|
||||
ELSE
|
||||
(100000 - n.views) -- Views remaining
|
||||
/
|
||||
(
|
||||
(n.views - o.views) -- Views delta
|
||||
/
|
||||
(EXTRACT(EPOCH FROM (n.created_at - o.created_at)) + 0.001) -- Time delta in seconds
|
||||
+ 0.001
|
||||
) -- Increment per second
|
||||
END AS eta
|
||||
FROM old_snapshot o, new_snapshot n;
|
||||
`,
|
||||
[aid],
|
||||
);
|
||||
`, [aid]);
|
||||
if (queryResult.rows.length === 0) {
|
||||
return null;
|
||||
}
|
||||
return queryResult.rows[0].eta;
|
||||
}
|
||||
|
||||
export async function songEligibleForMilestoneSnapshot(client: Client, aid: number) {
|
||||
const count = await getSongSnapshotCount(client, aid);
|
||||
if (count < 2) {
|
||||
return true;
|
||||
}
|
||||
const queryResult = await client.queryObject<
|
||||
{ views1: number; created_at1: string; views2: number; created_at2: string }
|
||||
>(
|
||||
`
|
||||
WITH latest_snapshot AS (
|
||||
SELECT
|
||||
aid,
|
||||
views,
|
||||
created_at
|
||||
FROM video_snapshot
|
||||
WHERE aid = $1
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 1
|
||||
),
|
||||
pairs AS (
|
||||
SELECT
|
||||
a.views AS views1,
|
||||
a.created_at AS created_at1,
|
||||
b.views AS views2,
|
||||
b.created_at AS created_at2,
|
||||
(b.created_at - a.created_at) AS interval
|
||||
FROM video_snapshot a
|
||||
JOIN latest_snapshot b
|
||||
ON a.aid = b.aid
|
||||
AND a.created_at < b.created_at
|
||||
)
|
||||
SELECT
|
||||
views1,
|
||||
created_at1,
|
||||
views2,
|
||||
created_at2
|
||||
FROM (
|
||||
SELECT
|
||||
*,
|
||||
ROW_NUMBER() OVER (
|
||||
ORDER BY
|
||||
CASE WHEN interval <= INTERVAL '3 days' THEN 0 ELSE 1 END,
|
||||
CASE WHEN interval <= INTERVAL '3 days' THEN -interval ELSE interval END
|
||||
) AS rn
|
||||
FROM pairs
|
||||
) ranked
|
||||
WHERE rn = 1;
|
||||
`,
|
||||
[aid],
|
||||
);
|
||||
if (queryResult.rows.length === 0) {
|
||||
return true;
|
||||
}
|
||||
const recentViewsData = queryResult.rows[0];
|
||||
const time1 = parseTimestampFromPsql(recentViewsData.created_at1);
|
||||
const time2 = parseTimestampFromPsql(recentViewsData.created_at2);
|
||||
const intervalSec = (time2 - time1) / SECOND;
|
||||
const views1 = recentViewsData.views1;
|
||||
const views2 = recentViewsData.views2;
|
||||
const viewsDiff = views2 - views1;
|
||||
if (viewsDiff == 0) {
|
||||
return false;
|
||||
}
|
||||
const nextMilestone = views2 >= 100000 ? 1000000 : 100000;
|
||||
const expectedViewsDiff = nextMilestone - views2;
|
||||
const expectedIntervalSec = expectedViewsDiff / viewsDiff * intervalSec;
|
||||
return expectedIntervalSec <= 3 * DAY;
|
||||
return queryResult.rows.map((row) => {
|
||||
return {
|
||||
...row,
|
||||
aid: Number(row.aid),
|
||||
time: new Date(row.time).getTime(),
|
||||
}
|
||||
})[0];
|
||||
}
|
||||
|
228
lib/db/snapshotSchedule.ts
Normal file
228
lib/db/snapshotSchedule.ts
Normal file
@ -0,0 +1,228 @@
|
||||
import {DAY, HOUR, MINUTE} from "$std/datetime/constants.ts";
|
||||
import {Client} from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||
import {formatTimestampToPsql} from "lib/utils/formatTimestampToPostgre.ts";
|
||||
import {SnapshotScheduleType} from "./schema.d.ts";
|
||||
import logger from "lib/log/logger.ts";
|
||||
|
||||
export async function snapshotScheduleExists(client: Client, id: number) {
|
||||
const res = await client.queryObject<{ id: number }>(
|
||||
`SELECT id FROM snapshot_schedule WHERE id = $1`,
|
||||
[id],
|
||||
);
|
||||
return res.rows.length > 0;
|
||||
}
|
||||
|
||||
/*
|
||||
Returns true if the specified `aid` has at least one record with "pending" or "processing" status.
|
||||
*/
|
||||
export async function videoHasActiveSchedule(client: Client, aid: number) {
|
||||
const res = await client.queryObject<{ status: string }>(
|
||||
`SELECT status FROM snapshot_schedule WHERE aid = $1 AND (status = 'pending' OR status = 'processing')`,
|
||||
[aid],
|
||||
);
|
||||
return res.rows.length > 0;
|
||||
}
|
||||
|
||||
export async function videoHasProcessingSchedule(client: Client, aid: number) {
|
||||
const res = await client.queryObject<{ status: string }>(
|
||||
`SELECT status FROM snapshot_schedule WHERE aid = $1 AND status = 'processing'`,
|
||||
[aid],
|
||||
);
|
||||
return res.rows.length > 0;
|
||||
}
|
||||
|
||||
interface Snapshot {
|
||||
created_at: number;
|
||||
views: number;
|
||||
}
|
||||
|
||||
export async function findClosestSnapshot(
|
||||
client: Client,
|
||||
aid: number,
|
||||
targetTime: Date,
|
||||
): Promise<Snapshot | null> {
|
||||
const query = `
|
||||
SELECT created_at, views
|
||||
FROM video_snapshot
|
||||
WHERE aid = $1
|
||||
ORDER BY ABS(EXTRACT(EPOCH FROM (created_at - $2::timestamptz)))
|
||||
LIMIT 1
|
||||
`;
|
||||
const result = await client.queryObject<{ created_at: string; views: number }>(
|
||||
query,
|
||||
[aid, targetTime.toISOString()],
|
||||
);
|
||||
if (result.rows.length === 0) return null;
|
||||
const row = result.rows[0];
|
||||
return {
|
||||
created_at: new Date(row.created_at).getTime(),
|
||||
views: row.views,
|
||||
};
|
||||
}
|
||||
|
||||
export async function hasAtLeast2Snapshots(client: Client, aid: number) {
|
||||
const res = await client.queryObject<{ count: number }>(
|
||||
`SELECT COUNT(*) FROM video_snapshot WHERE aid = $1`,
|
||||
[aid],
|
||||
);
|
||||
return res.rows[0].count >= 2;
|
||||
}
|
||||
|
||||
export async function getLatestSnapshot(client: Client, aid: number): Promise<Snapshot | null> {
|
||||
const res = await client.queryObject<{ created_at: string; views: number }>(
|
||||
`SELECT created_at, views FROM video_snapshot WHERE aid = $1 ORDER BY created_at DESC LIMIT 1`,
|
||||
[aid],
|
||||
);
|
||||
if (res.rows.length === 0) return null;
|
||||
const row = res.rows[0];
|
||||
return {
|
||||
created_at: new Date(row.created_at).getTime(),
|
||||
views: row.views,
|
||||
};
|
||||
}
|
||||
|
||||
/*
|
||||
* Returns the number of snapshot schedules within the specified range.
|
||||
* @param client The database client.
|
||||
* @param start The start time of the range. (Timestamp in milliseconds)
|
||||
* @param end The end time of the range. (Timestamp in milliseconds)
|
||||
*/
|
||||
export async function getSnapshotScheduleCountWithinRange(client: Client, start: number, end: number) {
|
||||
const startTimeString = formatTimestampToPsql(start);
|
||||
const endTimeString = formatTimestampToPsql(end);
|
||||
const query = `
|
||||
SELECT COUNT(*) FROM snapshot_schedule
|
||||
WHERE started_at BETWEEN $1 AND $2
|
||||
AND status = 'pending'
|
||||
`;
|
||||
const res = await client.queryObject<{ count: number }>(query, [startTimeString, endTimeString]);
|
||||
return res.rows[0].count;
|
||||
}
|
||||
|
||||
/*
|
||||
* Creates a new snapshot schedule record.
|
||||
* @param client The database client.
|
||||
* @param aid The aid of the video.
|
||||
* @param targetTime Scheduled time for snapshot. (Timestamp in milliseconds)
|
||||
*/
|
||||
export async function scheduleSnapshot(client: Client, aid: number, type: string, targetTime: number) {
|
||||
if (await videoHasActiveSchedule(client, aid)) return;
|
||||
const allowedCount = type === "milestone" ? 2000 : 800;
|
||||
const adjustedTime = await adjustSnapshotTime(client, new Date(targetTime), allowedCount);
|
||||
logger.log(`Scheduled snapshot for ${aid} at ${adjustedTime.toISOString()}`, "mq", "fn:scheduleSnapshot");
|
||||
return client.queryObject(
|
||||
`INSERT INTO snapshot_schedule (aid, type, started_at) VALUES ($1, $2, $3)`,
|
||||
[aid, type, adjustedTime.toISOString()],
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Adjust the trigger time of the snapshot to ensure it does not exceed the frequency limit
|
||||
* @param client PostgreSQL client
|
||||
* @param expectedStartTime The expected snapshot time
|
||||
* @param allowedCounts The number of snapshots allowed in the 5-minutes windows.
|
||||
* @returns The adjusted actual snapshot time
|
||||
*/
|
||||
export async function adjustSnapshotTime(
|
||||
client: Client,
|
||||
expectedStartTime: Date,
|
||||
allowedCounts: number = 2000
|
||||
): Promise<Date> {
|
||||
const findWindowQuery = `
|
||||
WITH windows AS (
|
||||
SELECT generate_series(
|
||||
$1::timestamp, -- Start time: current time truncated to the nearest 5-minute window
|
||||
$2::timestamp, -- End time: 24 hours after the target time window starts
|
||||
INTERVAL '5 MINUTES'
|
||||
) AS window_start
|
||||
)
|
||||
SELECT w.window_start
|
||||
FROM windows w
|
||||
LEFT JOIN snapshot_schedule s ON s.started_at >= w.window_start
|
||||
AND s.started_at < w.window_start + INTERVAL '5 MINUTES'
|
||||
AND s.status = 'pending'
|
||||
GROUP BY w.window_start
|
||||
HAVING COUNT(s.*) < ${allowedCounts}
|
||||
ORDER BY w.window_start
|
||||
LIMIT 1;
|
||||
`;
|
||||
const now = new Date();
|
||||
const targetTime = expectedStartTime.getTime();
|
||||
let start = new Date(targetTime - 2 * HOUR);
|
||||
if (start.getTime() <= now.getTime()) {
|
||||
start = now;
|
||||
}
|
||||
const startTruncated = truncateTo5MinInterval(start);
|
||||
const end = new Date(startTruncated.getTime() + 1 * DAY);
|
||||
|
||||
const windowResult = await client.queryObject<{ window_start: Date }>(
|
||||
findWindowQuery,
|
||||
[startTruncated, end],
|
||||
);
|
||||
|
||||
|
||||
const windowStart = windowResult.rows[0]?.window_start;
|
||||
if (!windowStart) {
|
||||
return expectedStartTime;
|
||||
}
|
||||
|
||||
if (windowStart.getTime() > new Date().getTime() + 5 * MINUTE) {
|
||||
const randomDelay = Math.floor(Math.random() * 5 * MINUTE);
|
||||
return new Date(windowStart.getTime() + randomDelay);
|
||||
} else {
|
||||
return expectedStartTime;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Truncate the timestamp to the nearest 5-minute interval
|
||||
* @param timestamp The timestamp
|
||||
* @returns The truncated time
|
||||
*/
|
||||
function truncateTo5MinInterval(timestamp: Date): Date {
|
||||
const minutes = timestamp.getMinutes() - (timestamp.getMinutes() % 5);
|
||||
return new Date(
|
||||
timestamp.getFullYear(),
|
||||
timestamp.getMonth(),
|
||||
timestamp.getDate(),
|
||||
timestamp.getHours(),
|
||||
minutes,
|
||||
0,
|
||||
0,
|
||||
);
|
||||
}
|
||||
|
||||
export async function getSnapshotsInNextSecond(client: Client) {
|
||||
const query = `
|
||||
SELECT *
|
||||
FROM snapshot_schedule
|
||||
WHERE started_at <= NOW() + INTERVAL '1 seconds' AND status = 'pending'
|
||||
ORDER BY
|
||||
CASE
|
||||
WHEN type = 'milestone' THEN 0
|
||||
ELSE 1
|
||||
END,
|
||||
started_at
|
||||
LIMIT 3;
|
||||
`;
|
||||
const res = await client.queryObject<SnapshotScheduleType>(query, []);
|
||||
return res.rows;
|
||||
}
|
||||
|
||||
export async function setSnapshotStatus(client: Client, id: number, status: string) {
|
||||
return await client.queryObject(
|
||||
`UPDATE snapshot_schedule SET status = $2 WHERE id = $1`,
|
||||
[id, status],
|
||||
);
|
||||
}
|
||||
|
||||
export async function getVideosWithoutActiveSnapshotSchedule(client: Client) {
|
||||
const query: string = `
|
||||
SELECT s.aid
|
||||
FROM songs s
|
||||
LEFT JOIN snapshot_schedule ss ON s.aid = ss.aid AND (ss.status = 'pending' OR ss.status = 'processing')
|
||||
WHERE ss.aid IS NULL
|
||||
`;
|
||||
const res = await client.queryObject<{ aid: number }>(query, []);
|
||||
return res.rows.map((r) => Number(r.aid));
|
||||
}
|
107
lib/ml/akari.ts
Normal file
107
lib/ml/akari.ts
Normal file
@ -0,0 +1,107 @@
|
||||
import { AIManager } from "lib/ml/manager.ts";
|
||||
import * as ort from "onnxruntime";
|
||||
import logger from "lib/log/logger.ts";
|
||||
import { WorkerError } from "lib/mq/schema.ts";
|
||||
import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers";
|
||||
|
||||
const tokenizerModel = "alikia2x/jina-embedding-v3-m2v-1024";
|
||||
const onnxClassifierPath = "./model/akari/3.17.onnx";
|
||||
const onnxEmbeddingPath = "./model/embedding/model.onnx";
|
||||
|
||||
class AkariProto extends AIManager {
|
||||
private tokenizer: PreTrainedTokenizer | null = null;
|
||||
private readonly modelVersion = "3.17";
|
||||
|
||||
constructor() {
|
||||
super();
|
||||
this.models = {
|
||||
"classifier": onnxClassifierPath,
|
||||
"embedding": onnxEmbeddingPath,
|
||||
};
|
||||
}
|
||||
|
||||
public override async init(): Promise<void> {
|
||||
await super.init();
|
||||
await this.initJinaTokenizer();
|
||||
}
|
||||
|
||||
private tokenizerInitialized(): boolean {
|
||||
return this.tokenizer !== null;
|
||||
}
|
||||
|
||||
private getTokenizer(): PreTrainedTokenizer {
|
||||
if (!this.tokenizerInitialized()) {
|
||||
throw new Error("Tokenizer is not initialized. Call init() first.");
|
||||
}
|
||||
return this.tokenizer!;
|
||||
}
|
||||
|
||||
private async initJinaTokenizer(): Promise<void> {
|
||||
if (this.tokenizerInitialized()) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
this.tokenizer = await AutoTokenizer.from_pretrained(tokenizerModel);
|
||||
logger.log("Tokenizer initialized", "ml");
|
||||
} catch (error) {
|
||||
throw new WorkerError(error as Error, "ml", "fn:initTokenizer");
|
||||
}
|
||||
}
|
||||
|
||||
private async getJinaEmbeddings1024(texts: string[]): Promise<number[]> {
|
||||
const tokenizer = this.getTokenizer();
|
||||
const session = this.getModelSession("embedding");
|
||||
|
||||
const { input_ids } = await tokenizer(texts, {
|
||||
add_special_tokens: false,
|
||||
return_tensor: false,
|
||||
});
|
||||
|
||||
const cumsum = (arr: number[]): number[] =>
|
||||
arr.reduce((acc: number[], num: number, i: number) => [...acc, num + (acc[i - 1] || 0)], []);
|
||||
|
||||
const offsets: number[] = [0, ...cumsum(input_ids.slice(0, -1).map((x: string) => x.length))];
|
||||
const flattened_input_ids = input_ids.flat();
|
||||
|
||||
const inputs = {
|
||||
input_ids: new ort.Tensor("int64", new BigInt64Array(flattened_input_ids.map(BigInt)), [
|
||||
flattened_input_ids.length,
|
||||
]),
|
||||
offsets: new ort.Tensor("int64", new BigInt64Array(offsets.map(BigInt)), [offsets.length]),
|
||||
};
|
||||
|
||||
const { embeddings } = await session.run(inputs);
|
||||
return Array.from(embeddings.data as Float32Array);
|
||||
}
|
||||
|
||||
private async runClassification(embeddings: number[]): Promise<number[]> {
|
||||
const session = this.getModelSession("classifier");
|
||||
const inputTensor = new ort.Tensor(
|
||||
Float32Array.from(embeddings),
|
||||
[1, 3, 1024],
|
||||
);
|
||||
|
||||
const { logits } = await session.run({ channel_features: inputTensor });
|
||||
return this.softmax(logits.data as Float32Array);
|
||||
}
|
||||
|
||||
public async classifyVideo(title: string, description: string, tags: string, aid?: number): Promise<number> {
|
||||
const embeddings = await this.getJinaEmbeddings1024([
|
||||
title,
|
||||
description,
|
||||
tags,
|
||||
]);
|
||||
const probabilities = await this.runClassification(embeddings);
|
||||
if (aid) {
|
||||
logger.log(`Prediction result for aid: ${aid}: [${probabilities.map((p) => p.toFixed(5))}]`, "ml");
|
||||
}
|
||||
return probabilities.indexOf(Math.max(...probabilities));
|
||||
}
|
||||
|
||||
public getModelVersion(): string {
|
||||
return this.modelVersion;
|
||||
}
|
||||
}
|
||||
|
||||
const Akari = new AkariProto();
|
||||
export default Akari;
|
@ -1,6 +1,12 @@
|
||||
import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers";
|
||||
import * as ort from "onnxruntime";
|
||||
import { softmax } from "lib/ml/filter_inference.ts";
|
||||
|
||||
function softmax(logits: Float32Array): number[] {
|
||||
const maxLogit = Math.max(...logits);
|
||||
const exponents = logits.map((logit) => Math.exp(logit - maxLogit));
|
||||
const sumOfExponents = exponents.reduce((sum, exp) => sum + exp, 0);
|
||||
return Array.from(exponents.map((exp) => exp / sumOfExponents));
|
||||
}
|
||||
|
||||
// 配置参数
|
||||
const sentenceTransformerModelName = "alikia2x/jina-embedding-v3-m2v-1024";
|
||||
|
@ -1,99 +0,0 @@
|
||||
import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers";
|
||||
import * as ort from "onnxruntime";
|
||||
import logger from "lib/log/logger.ts";
|
||||
import { WorkerError } from "lib/mq/schema.ts";
|
||||
|
||||
const tokenizerModel = "alikia2x/jina-embedding-v3-m2v-1024";
|
||||
const onnxClassifierPath = "./model/video_classifier_v3_17.onnx";
|
||||
const onnxEmbeddingOriginalPath = "./model/model.onnx";
|
||||
export const modelVersion = "3.17";
|
||||
|
||||
let sessionClassifier: ort.InferenceSession | null = null;
|
||||
let sessionEmbedding: ort.InferenceSession | null = null;
|
||||
let tokenizer: PreTrainedTokenizer | null = null;
|
||||
|
||||
export async function initializeModels() {
|
||||
if (tokenizer && sessionClassifier && sessionEmbedding) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
tokenizer = await AutoTokenizer.from_pretrained(tokenizerModel);
|
||||
|
||||
const [classifierSession, embeddingSession] = await Promise.all([
|
||||
ort.InferenceSession.create(onnxClassifierPath),
|
||||
ort.InferenceSession.create(onnxEmbeddingOriginalPath),
|
||||
]);
|
||||
|
||||
sessionClassifier = classifierSession;
|
||||
sessionEmbedding = embeddingSession;
|
||||
logger.log("Filter models initialized", "ml");
|
||||
} catch (error) {
|
||||
throw new WorkerError(error as Error, "ml", "fn:initializeModels");
|
||||
}
|
||||
}
|
||||
|
||||
export function softmax(logits: Float32Array): number[] {
|
||||
const maxLogit = Math.max(...logits);
|
||||
const exponents = logits.map((logit) => Math.exp(logit - maxLogit));
|
||||
const sumOfExponents = exponents.reduce((sum, exp) => sum + exp, 0);
|
||||
return Array.from(exponents.map((exp) => exp / sumOfExponents));
|
||||
}
|
||||
|
||||
async function getONNXEmbeddings(texts: string[], session: ort.InferenceSession): Promise<number[]> {
|
||||
if (!tokenizer) {
|
||||
throw new Error("Tokenizer is not initialized. Call initializeModels() first.");
|
||||
}
|
||||
const { input_ids } = await tokenizer(texts, {
|
||||
add_special_tokens: false,
|
||||
return_tensor: false,
|
||||
});
|
||||
|
||||
const cumsum = (arr: number[]): number[] =>
|
||||
arr.reduce((acc: number[], num: number, i: number) => [...acc, num + (acc[i - 1] || 0)], []);
|
||||
|
||||
const offsets: number[] = [0, ...cumsum(input_ids.slice(0, -1).map((x: string) => x.length))];
|
||||
const flattened_input_ids = input_ids.flat();
|
||||
|
||||
const inputs = {
|
||||
input_ids: new ort.Tensor("int64", new BigInt64Array(flattened_input_ids.map(BigInt)), [
|
||||
flattened_input_ids.length,
|
||||
]),
|
||||
offsets: new ort.Tensor("int64", new BigInt64Array(offsets.map(BigInt)), [offsets.length]),
|
||||
};
|
||||
|
||||
const { embeddings } = await session.run(inputs);
|
||||
return Array.from(embeddings.data as Float32Array);
|
||||
}
|
||||
|
||||
async function runClassification(embeddings: number[]): Promise<number[]> {
|
||||
if (!sessionClassifier) {
|
||||
throw new Error("Classifier session is not initialized. Call initializeModels() first.");
|
||||
}
|
||||
const inputTensor = new ort.Tensor(
|
||||
Float32Array.from(embeddings),
|
||||
[1, 3, 1024],
|
||||
);
|
||||
|
||||
const { logits } = await sessionClassifier.run({ channel_features: inputTensor });
|
||||
return softmax(logits.data as Float32Array);
|
||||
}
|
||||
|
||||
export async function classifyVideo(
|
||||
title: string,
|
||||
description: string,
|
||||
tags: string,
|
||||
aid: number,
|
||||
): Promise<number> {
|
||||
if (!sessionEmbedding) {
|
||||
throw new Error("Embedding session is not initialized. Call initializeModels() first.");
|
||||
}
|
||||
const embeddings = await getONNXEmbeddings([
|
||||
title,
|
||||
description,
|
||||
tags,
|
||||
], sessionEmbedding);
|
||||
const probabilities = await runClassification(embeddings);
|
||||
logger.log(`Prediction result for aid: ${aid}: [${probabilities.map((p) => p.toFixed(5))}]`, "ml");
|
||||
return probabilities.indexOf(Math.max(...probabilities));
|
||||
}
|
37
lib/ml/manager.ts
Normal file
37
lib/ml/manager.ts
Normal file
@ -0,0 +1,37 @@
|
||||
import * as ort from "onnxruntime";
|
||||
import logger from "lib/log/logger.ts";
|
||||
import { WorkerError } from "lib/mq/schema.ts";
|
||||
|
||||
export class AIManager {
|
||||
public sessions: { [key: string]: ort.InferenceSession } = {};
|
||||
public models: { [key: string]: string } = {};
|
||||
|
||||
constructor() {
|
||||
}
|
||||
|
||||
public async init() {
|
||||
const modelKeys = Object.keys(this.models);
|
||||
for (const key of modelKeys) {
|
||||
try {
|
||||
this.sessions[key] = await ort.InferenceSession.create(this.models[key]);
|
||||
logger.log(`Model ${key} initialized`, "ml");
|
||||
} catch (error) {
|
||||
throw new WorkerError(error as Error, "ml", "fn:init");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public getModelSession(key: string): ort.InferenceSession {
|
||||
if (this.sessions[key] === undefined) {
|
||||
throw new WorkerError(new Error(`Model ${key} not found / not initialized.`), "ml", "fn:getModelSession");
|
||||
}
|
||||
return this.sessions[key];
|
||||
}
|
||||
|
||||
public softmax(logits: Float32Array): number[] {
|
||||
const maxLogit = Math.max(...logits);
|
||||
const exponents = logits.map((logit) => Math.exp(logit - maxLogit));
|
||||
const sumOfExponents = exponents.reduce((sum, exp) => sum + exp, 0);
|
||||
return Array.from(exponents.map((exp) => exp / sumOfExponents));
|
||||
}
|
||||
}
|
22
lib/ml/mantis.ts
Normal file
22
lib/ml/mantis.ts
Normal file
@ -0,0 +1,22 @@
|
||||
import { AIManager } from "lib/ml/manager.ts";
|
||||
import * as ort from "onnxruntime";
|
||||
import logger from "lib/log/logger.ts";
|
||||
import { WorkerError } from "lib/mq/schema.ts";
|
||||
|
||||
const modelPath = "./model/model.onnx";
|
||||
|
||||
class MantisProto extends AIManager {
|
||||
constructor() {
|
||||
super();
|
||||
this.models = {
|
||||
"predictor": modelPath,
|
||||
};
|
||||
}
|
||||
|
||||
public override async init(): Promise<void> {
|
||||
await super.init();
|
||||
}
|
||||
}
|
||||
|
||||
const Mantis = new MantisProto();
|
||||
export default Mantis;
|
@ -1,6 +1,12 @@
|
||||
import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers";
|
||||
import * as ort from "onnxruntime";
|
||||
import { softmax } from "lib/ml/filter_inference.ts";
|
||||
|
||||
function softmax(logits: Float32Array): number[] {
|
||||
const maxLogit = Math.max(...logits);
|
||||
const exponents = logits.map((logit) => Math.exp(logit - maxLogit));
|
||||
const sumOfExponents = exponents.reduce((sum, exp) => sum + exp, 0);
|
||||
return Array.from(exponents.map((exp) => exp / sumOfExponents));
|
||||
}
|
||||
|
||||
// 配置参数
|
||||
const sentenceTransformerModelName = "alikia2x/jina-embedding-v3-m2v-1024";
|
||||
|
@ -1,7 +1,7 @@
|
||||
import { Job } from "bullmq";
|
||||
import { db } from "lib/db/init.ts";
|
||||
import { getUnlabelledVideos, getVideoInfoFromAllData, insertVideoLabel } from "lib/db/allData.ts";
|
||||
import { classifyVideo } from "lib/ml/filter_inference.ts";
|
||||
import Akari from "lib/ml/akari.ts";
|
||||
import { ClassifyVideoQueue } from "lib/mq/index.ts";
|
||||
import logger from "lib/log/logger.ts";
|
||||
import { lockManager } from "lib/mq/lockManager.ts";
|
||||
@ -19,14 +19,14 @@ export const classifyVideoWorker = async (job: Job) => {
|
||||
const title = videoInfo.title?.trim() || "untitled";
|
||||
const description = videoInfo.description?.trim() || "N/A";
|
||||
const tags = videoInfo.tags?.trim() || "empty";
|
||||
const label = await classifyVideo(title, description, tags, aid);
|
||||
const label = await Akari.classifyVideo(title, description, tags, aid);
|
||||
if (label == -1) {
|
||||
logger.warn(`Failed to classify video ${aid}`, "ml");
|
||||
}
|
||||
await insertVideoLabel(client, aid, label);
|
||||
|
||||
const exists = await aidExistsInSongs(client, aid);
|
||||
if (!exists) {
|
||||
if (!exists && label !== 0) {
|
||||
await insertIntoSongs(client, aid);
|
||||
}
|
||||
|
||||
|
@ -1,228 +1,199 @@
|
||||
import { Job } from "bullmq";
|
||||
import { MINUTE, SECOND } from "$std/datetime/constants.ts";
|
||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||
import { db } from "lib/db/init.ts";
|
||||
import {getLatestVideoSnapshot, getVideosNearMilestone} from "lib/db/snapshot.ts";
|
||||
import {
|
||||
getShortTermEtaPrediction,
|
||||
getSongsNearMilestone,
|
||||
getUnsnapshotedSongs,
|
||||
songEligibleForMilestoneSnapshot,
|
||||
} from "lib/db/snapshot.ts";
|
||||
import { SnapshotQueue } from "lib/mq/index.ts";
|
||||
import { insertVideoStats } from "lib/mq/task/getVideoStats.ts";
|
||||
import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts";
|
||||
import { redis } from "lib/db/redis.ts";
|
||||
import { NetSchedulerError } from "lib/mq/scheduler.ts";
|
||||
findClosestSnapshot,
|
||||
getLatestSnapshot,
|
||||
getSnapshotsInNextSecond, getVideosWithoutActiveSnapshotSchedule,
|
||||
hasAtLeast2Snapshots,
|
||||
scheduleSnapshot,
|
||||
setSnapshotStatus,
|
||||
snapshotScheduleExists,
|
||||
videoHasProcessingSchedule,
|
||||
} from "lib/db/snapshotSchedule.ts";
|
||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||
import { WEEK, HOUR, MINUTE, SECOND } from "$std/datetime/constants.ts";
|
||||
import logger from "lib/log/logger.ts";
|
||||
import { formatSeconds } from "lib/utils/formatSeconds.ts";
|
||||
import { SnapshotQueue } from "lib/mq/index.ts";
|
||||
import { insertVideoSnapshot } from "lib/mq/task/getVideoStats.ts";
|
||||
import { NetSchedulerError } from "lib/mq/scheduler.ts";
|
||||
import { setBiliVideoStatus } from "lib/db/allData.ts";
|
||||
import { truncate } from "lib/utils/truncate.ts";
|
||||
|
||||
async function snapshotScheduled(aid: number) {
|
||||
try {
|
||||
return await redis.exists(`cvsa:snapshot:${aid}`);
|
||||
} catch {
|
||||
logger.error(`Failed to check scheduled status for ${aid}`, "mq");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
const priorityMap: { [key: string]: number } = {
|
||||
"milestone": 1,
|
||||
"normal": 3,
|
||||
};
|
||||
|
||||
async function setSnapshotScheduled(aid: number, value: boolean, exp: number) {
|
||||
try {
|
||||
if (value) {
|
||||
await redis.set(`cvsa:snapshot:${aid}`, 1, "EX", exp);
|
||||
} else {
|
||||
await redis.del(`cvsa:snapshot:${aid}`);
|
||||
}
|
||||
} catch {
|
||||
logger.error(`Failed to set scheduled status to ${value} for ${aid}`, "mq");
|
||||
}
|
||||
}
|
||||
|
||||
interface SongNearMilestone {
|
||||
aid: number;
|
||||
id: number;
|
||||
created_at: string;
|
||||
views: number;
|
||||
coins: number;
|
||||
likes: number;
|
||||
favorites: number;
|
||||
shares: number;
|
||||
danmakus: number;
|
||||
replies: number;
|
||||
}
|
||||
|
||||
async function processMilestoneSnapshots(client: Client, vidoesNearMilestone: SongNearMilestone[]) {
|
||||
let i = 0;
|
||||
for (const snapshot of vidoesNearMilestone) {
|
||||
if (await snapshotScheduled(snapshot.aid)) {
|
||||
logger.silly(
|
||||
`Video ${snapshot.aid} is already scheduled for snapshot`,
|
||||
"mq",
|
||||
"fn:processMilestoneSnapshots",
|
||||
);
|
||||
continue;
|
||||
}
|
||||
if (await songEligibleForMilestoneSnapshot(client, snapshot.aid) === false) {
|
||||
logger.silly(
|
||||
`Video ${snapshot.aid} is not eligible for milestone snapshot`,
|
||||
"mq",
|
||||
"fn:processMilestoneSnapshots",
|
||||
);
|
||||
continue;
|
||||
}
|
||||
const factor = Math.floor(i / 8);
|
||||
const delayTime = factor * SECOND * 2;
|
||||
await SnapshotQueue.add("snapshotMilestoneVideo", {
|
||||
aid: snapshot.aid,
|
||||
currentViews: snapshot.views,
|
||||
snapshotedAt: snapshot.created_at,
|
||||
}, { delay: delayTime, priority: 1 });
|
||||
await setSnapshotScheduled(snapshot.aid, true, 20 * 60);
|
||||
i++;
|
||||
}
|
||||
}
|
||||
|
||||
async function processUnsnapshotedVideos(unsnapshotedVideos: number[]) {
|
||||
let i = 0;
|
||||
for (const aid of unsnapshotedVideos) {
|
||||
if (await snapshotScheduled(aid)) {
|
||||
logger.silly(`Video ${aid} is already scheduled for snapshot`, "mq", "fn:processUnsnapshotedVideos");
|
||||
continue;
|
||||
}
|
||||
const factor = Math.floor(i / 5);
|
||||
const delayTime = factor * SECOND * 4;
|
||||
await SnapshotQueue.add("snapshotVideo", {
|
||||
aid,
|
||||
}, { delay: delayTime, priority: 3 });
|
||||
await setSnapshotScheduled(aid, true, 6 * 60 * 60);
|
||||
i++;
|
||||
}
|
||||
}
|
||||
const snapshotTypeToTaskMap: { [key: string]: string } = {
|
||||
"milestone": "snapshotMilestoneVideo",
|
||||
"normal": "snapshotVideo",
|
||||
};
|
||||
|
||||
export const snapshotTickWorker = async (_job: Job) => {
|
||||
const client = await db.connect();
|
||||
try {
|
||||
const vidoesNearMilestone = await getSongsNearMilestone(client);
|
||||
await processMilestoneSnapshots(client, vidoesNearMilestone);
|
||||
|
||||
const unsnapshotedVideos = await getUnsnapshotedSongs(client);
|
||||
await processUnsnapshotedVideos(unsnapshotedVideos);
|
||||
const schedules = await getSnapshotsInNextSecond(client);
|
||||
for (const schedule of schedules) {
|
||||
let priority = 3;
|
||||
if (schedule.type && priorityMap[schedule.type]) {
|
||||
priority = priorityMap[schedule.type];
|
||||
}
|
||||
const aid = Number(schedule.aid);
|
||||
await SnapshotQueue.add("snapshotVideo", {
|
||||
aid: aid,
|
||||
id: Number(schedule.id),
|
||||
type: schedule.type ?? "normal",
|
||||
}, { priority });
|
||||
}
|
||||
} catch (e) {
|
||||
logger.error(e as Error);
|
||||
} finally {
|
||||
client.release();
|
||||
}
|
||||
};
|
||||
|
||||
export const takeSnapshotForMilestoneVideoWorker = async (job: Job) => {
|
||||
export const closetMilestone = (views: number) => {
|
||||
if (views < 100000) return 100000;
|
||||
if (views < 1000000) return 1000000;
|
||||
return 10000000;
|
||||
};
|
||||
|
||||
const log = (value: number, base: number = 10) => Math.log(value) / Math.log(base);
|
||||
|
||||
/*
|
||||
* Returns the minimum ETA in hours for the next snapshot
|
||||
* @param client - Postgres client
|
||||
* @param aid - aid of the video
|
||||
* @returns ETA in hours
|
||||
*/
|
||||
const getAdjustedShortTermETA = async (client: Client, aid: number) => {
|
||||
const latestSnapshot = await getLatestSnapshot(client, aid);
|
||||
// Immediately dispatch a snapshot if there is no snapshot yet
|
||||
if (!latestSnapshot) return 0;
|
||||
const snapshotsEnough = await hasAtLeast2Snapshots(client, aid);
|
||||
if (!snapshotsEnough) return 0;
|
||||
|
||||
const currentTimestamp = new Date().getTime();
|
||||
const timeIntervals = [3 * MINUTE, 20 * MINUTE, 1 * HOUR, 3 * HOUR, 6 * HOUR];
|
||||
const DELTA = 0.00001;
|
||||
let minETAHours = Infinity;
|
||||
|
||||
for (const timeInterval of timeIntervals) {
|
||||
const date = new Date(currentTimestamp - timeInterval);
|
||||
const snapshot = await findClosestSnapshot(client, aid, date);
|
||||
if (!snapshot) continue;
|
||||
const hoursDiff = (latestSnapshot.created_at - snapshot.created_at) / HOUR;
|
||||
const viewsDiff = latestSnapshot.views - snapshot.views;
|
||||
if (viewsDiff <= 0) continue;
|
||||
const speed = viewsDiff / (hoursDiff + DELTA);
|
||||
const target = closetMilestone(latestSnapshot.views);
|
||||
const viewsToIncrease = target - latestSnapshot.views;
|
||||
const eta = viewsToIncrease / (speed + DELTA);
|
||||
let factor = log(2.97 / log(viewsToIncrease + 1), 1.14);
|
||||
factor = truncate(factor, 3, 100);
|
||||
const adjustedETA = eta / factor;
|
||||
if (adjustedETA < minETAHours) {
|
||||
minETAHours = adjustedETA;
|
||||
}
|
||||
}
|
||||
|
||||
if (isNaN(minETAHours)) {
|
||||
minETAHours = Infinity;
|
||||
}
|
||||
|
||||
return minETAHours;
|
||||
};
|
||||
|
||||
export const collectMilestoneSnapshotsWorker = async (_job: Job) => {
|
||||
const client = await db.connect();
|
||||
await setSnapshotScheduled(job.data.aid, true, 20 * 60);
|
||||
try {
|
||||
const aid: number = job.data.aid;
|
||||
const currentViews: number = job.data.currentViews;
|
||||
const lastSnapshoted: string = job.data.snapshotedAt;
|
||||
const stat = await insertVideoStats(client, aid, "snapshotMilestoneVideo");
|
||||
if (typeof stat === "number") {
|
||||
if (stat === -404 || stat === 62002 || stat == 62012) {
|
||||
await setSnapshotScheduled(aid, true, 6 * 60 * 60);
|
||||
} else {
|
||||
await setSnapshotScheduled(aid, false, 0);
|
||||
}
|
||||
return;
|
||||
}
|
||||
const nextMilestone = currentViews >= 100000 ? 1000000 : 100000;
|
||||
if (stat.views >= nextMilestone) {
|
||||
await setSnapshotScheduled(aid, false, 0);
|
||||
return;
|
||||
}
|
||||
let eta = await getShortTermEtaPrediction(client, aid);
|
||||
if (eta === null) {
|
||||
const DELTA = 0.001;
|
||||
const intervalSeconds = (Date.now() - parseTimestampFromPsql(lastSnapshoted)) / SECOND;
|
||||
const viewsIncrement = stat.views - currentViews;
|
||||
const incrementSpeed = viewsIncrement / (intervalSeconds + DELTA);
|
||||
const viewsToIncrease = nextMilestone - stat.views;
|
||||
eta = viewsToIncrease / (incrementSpeed + DELTA);
|
||||
}
|
||||
const scheduledNextSnapshotDelay = eta * SECOND / 3;
|
||||
const maxInterval = 20 * MINUTE;
|
||||
const videos = await getVideosNearMilestone(client);
|
||||
for (const video of videos) {
|
||||
const aid = Number(video.aid);
|
||||
const eta = await getAdjustedShortTermETA(client, aid);
|
||||
if (eta > 72) continue;
|
||||
const now = Date.now();
|
||||
const scheduledNextSnapshotDelay = eta * HOUR;
|
||||
const maxInterval = 4 * HOUR;
|
||||
const minInterval = 1 * SECOND;
|
||||
const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval);
|
||||
await SnapshotQueue.add("snapshotMilestoneVideo", {
|
||||
aid,
|
||||
currentViews: stat.views,
|
||||
snapshotedAt: stat.time,
|
||||
}, { delay, priority: 1 });
|
||||
await job.updateData({
|
||||
...job.data,
|
||||
updatedViews: stat.views,
|
||||
updatedTime: new Date(stat.time).toISOString(),
|
||||
etaInMins: eta / 60,
|
||||
});
|
||||
logger.log(
|
||||
`Scheduled next milestone snapshot for ${aid} in ${
|
||||
formatSeconds(delay / 1000)
|
||||
}, current views: ${stat.views}`,
|
||||
"mq",
|
||||
);
|
||||
} catch (e) {
|
||||
if (e instanceof NetSchedulerError && e.code === "NO_AVAILABLE_PROXY") {
|
||||
logger.warn(
|
||||
`No available proxy for aid ${job.data.aid}.`,
|
||||
"mq",
|
||||
"fn:takeSnapshotForMilestoneVideoWorker",
|
||||
);
|
||||
await SnapshotQueue.add("snapshotMilestoneVideo", {
|
||||
aid: job.data.aid,
|
||||
currentViews: job.data.currentViews,
|
||||
snapshotedAt: job.data.snapshotedAt,
|
||||
}, { delay: 5 * SECOND, priority: 1 });
|
||||
return;
|
||||
const targetTime = now + delay;
|
||||
await scheduleSnapshot(client, aid, "milestone", targetTime);
|
||||
}
|
||||
throw e;
|
||||
} catch (e) {
|
||||
logger.error(e as Error, "mq", "fn:collectMilestoneSnapshotsWorker");
|
||||
} finally {
|
||||
client.release();
|
||||
}
|
||||
};
|
||||
|
||||
export const regularSnapshotsWorker = async (_job: Job) => {
|
||||
const client = await db.connect();
|
||||
try {
|
||||
const aids = await getVideosWithoutActiveSnapshotSchedule(client);
|
||||
for (const rawAid of aids) {
|
||||
const aid = Number(rawAid);
|
||||
const latestSnapshot = await getLatestVideoSnapshot(client, aid);
|
||||
const now = Date.now();
|
||||
const lastSnapshotedAt = latestSnapshot?.time ?? now;
|
||||
const targetTime = truncate(lastSnapshotedAt + 24 * HOUR, now + 1, now + 100000 * WEEK);
|
||||
await scheduleSnapshot(client, aid, "normal", targetTime);
|
||||
}
|
||||
} catch (e) {
|
||||
logger.error(e as Error, "mq", "fn:regularSnapshotsWorker");
|
||||
} finally {
|
||||
client.release();
|
||||
}
|
||||
};
|
||||
|
||||
export const takeSnapshotForVideoWorker = async (job: Job) => {
|
||||
const id = job.data.id;
|
||||
const aid = Number(job.data.aid);
|
||||
const type = job.data.type;
|
||||
const task = snapshotTypeToTaskMap[type] ?? "snapshotVideo";
|
||||
const client = await db.connect();
|
||||
await setSnapshotScheduled(job.data.aid, true, 6 * 60 * 60);
|
||||
const retryInterval = type === "milestone" ? 5 * SECOND : 2 * MINUTE;
|
||||
const exists = await snapshotScheduleExists(client, id);
|
||||
if (!exists) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
const { aid } = job.data;
|
||||
const stat = await insertVideoStats(client, aid, "getVideoInfo");
|
||||
if (await videoHasProcessingSchedule(client, aid)) {
|
||||
return `ALREADY_PROCESSING`;
|
||||
}
|
||||
await setSnapshotStatus(client, id, "processing");
|
||||
const stat = await insertVideoSnapshot(client, aid, task);
|
||||
if (typeof stat === "number") {
|
||||
if (stat === -404 || stat === 62002 || stat == 62012) {
|
||||
await setSnapshotScheduled(aid, true, 6 * 60 * 60);
|
||||
} else {
|
||||
await setSnapshotScheduled(aid, false, 0);
|
||||
await setBiliVideoStatus(client, aid, stat);
|
||||
await setSnapshotStatus(client, id, "completed");
|
||||
return `BILI_STATUS_${stat}`;
|
||||
}
|
||||
return;
|
||||
await setSnapshotStatus(client, id, "completed");
|
||||
if (type === "normal") {
|
||||
await scheduleSnapshot(client, aid, type, Date.now() + 24 * HOUR);
|
||||
return `DONE`;
|
||||
}
|
||||
logger.log(`Taken snapshot for ${aid}`, "mq");
|
||||
if (stat == null) {
|
||||
setSnapshotScheduled(aid, false, 0);
|
||||
return;
|
||||
}
|
||||
await job.updateData({
|
||||
...job.data,
|
||||
updatedViews: stat.views,
|
||||
updatedTime: new Date(stat.time).toISOString(),
|
||||
});
|
||||
const nearMilestone = (stat.views >= 90000 && stat.views < 100000) ||
|
||||
(stat.views >= 900000 && stat.views < 1000000);
|
||||
if (nearMilestone) {
|
||||
await SnapshotQueue.add("snapshotMilestoneVideo", {
|
||||
aid,
|
||||
currentViews: stat.views,
|
||||
snapshotedAt: stat.time,
|
||||
}, { delay: 0, priority: 1 });
|
||||
}
|
||||
await setSnapshotScheduled(aid, false, 0);
|
||||
if (type !== "milestone") return `DONE`;
|
||||
const eta = await getAdjustedShortTermETA(client, aid);
|
||||
if (eta > 72) return "ETA_TOO_LONG";
|
||||
const now = Date.now();
|
||||
const targetTime = now + eta * HOUR;
|
||||
await scheduleSnapshot(client, aid, type, targetTime);
|
||||
return `DONE`;
|
||||
} catch (e) {
|
||||
if (e instanceof NetSchedulerError && e.code === "NO_AVAILABLE_PROXY") {
|
||||
await setSnapshotScheduled(job.data.aid, false, 0);
|
||||
if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") {
|
||||
logger.warn(
|
||||
`No available proxy for aid ${job.data.aid}.`,
|
||||
"mq",
|
||||
"fn:takeSnapshotForVideoWorker",
|
||||
);
|
||||
await setSnapshotStatus(client, id, "completed");
|
||||
await scheduleSnapshot(client, aid, type, Date.now() + retryInterval);
|
||||
return;
|
||||
}
|
||||
throw e;
|
||||
logger.error(e as Error, "mq", "fn:takeSnapshotForVideoWorker");
|
||||
await setSnapshotStatus(client, id, "failed");
|
||||
} finally {
|
||||
client.release();
|
||||
}
|
||||
|
@ -1,24 +1,39 @@
|
||||
import { MINUTE } from "$std/datetime/constants.ts";
|
||||
import { MINUTE, SECOND } from "$std/datetime/constants.ts";
|
||||
import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "lib/mq/index.ts";
|
||||
import logger from "lib/log/logger.ts";
|
||||
|
||||
export async function initMQ() {
|
||||
// await LatestVideosQueue.upsertJobScheduler("getLatestVideos", {
|
||||
// every: 1 * MINUTE,
|
||||
// immediately: true,
|
||||
// });
|
||||
// await ClassifyVideoQueue.upsertJobScheduler("classifyVideos", {
|
||||
// every: 5 * MINUTE,
|
||||
// immediately: true,
|
||||
// });
|
||||
// await LatestVideosQueue.upsertJobScheduler("collectSongs", {
|
||||
// every: 3 * MINUTE,
|
||||
// immediately: true,
|
||||
// });
|
||||
await SnapshotQueue.upsertJobScheduler("scheduleSnapshotTick", {
|
||||
await LatestVideosQueue.upsertJobScheduler("getLatestVideos", {
|
||||
every: 1 * MINUTE,
|
||||
immediately: true,
|
||||
});
|
||||
await ClassifyVideoQueue.upsertJobScheduler("classifyVideos", {
|
||||
every: 5 * MINUTE,
|
||||
immediately: true,
|
||||
});
|
||||
await LatestVideosQueue.upsertJobScheduler("collectSongs", {
|
||||
every: 3 * MINUTE,
|
||||
immediately: true,
|
||||
});
|
||||
await SnapshotQueue.upsertJobScheduler("snapshotTick", {
|
||||
every: 1 * SECOND,
|
||||
immediately: true,
|
||||
}, {
|
||||
opts: {
|
||||
removeOnComplete: 1,
|
||||
removeOnFail: 1,
|
||||
},
|
||||
});
|
||||
|
||||
await SnapshotQueue.upsertJobScheduler("collectMilestoneSnapshots", {
|
||||
every: 5 * MINUTE,
|
||||
immediately: true,
|
||||
});
|
||||
|
||||
await SnapshotQueue.upsertJobScheduler("dispatchRegularSnapshots", {
|
||||
every: 30 * MINUTE,
|
||||
immediately: true,
|
||||
});
|
||||
|
||||
logger.log("Message queue initialized.");
|
||||
}
|
||||
|
@ -21,7 +21,7 @@ interface ProxiesMap {
|
||||
}
|
||||
|
||||
type NetSchedulerErrorCode =
|
||||
| "NO_AVAILABLE_PROXY"
|
||||
| "NO_PROXY_AVAILABLE"
|
||||
| "PROXY_RATE_LIMITED"
|
||||
| "PROXY_NOT_FOUND"
|
||||
| "FETCH_ERROR"
|
||||
@ -143,10 +143,10 @@ class NetScheduler {
|
||||
* @param {string} method - The HTTP method to use for the request. Default is "GET".
|
||||
* @returns {Promise<any>} - A promise that resolves to the response body.
|
||||
* @throws {NetSchedulerError} - The error will be thrown in following cases:
|
||||
* - No available proxy currently: with error code NO_AVAILABLE_PROXY
|
||||
* - Proxy is under rate limit: with error code PROXY_RATE_LIMITED
|
||||
* - The native `fetch` function threw an error: with error code FETCH_ERROR
|
||||
* - The proxy type is not supported: with error code NOT_IMPLEMENTED
|
||||
* - No proxy is available currently: with error code `NO_PROXY_AVAILABLE`
|
||||
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
|
||||
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
|
||||
* - The proxy type is not supported: with error code `NOT_IMPLEMENTED`
|
||||
*/
|
||||
async request<R>(url: string, task: string, method: string = "GET"): Promise<R> {
|
||||
// find a available proxy
|
||||
@ -156,7 +156,7 @@ class NetScheduler {
|
||||
return await this.proxyRequest<R>(url, proxyName, task, method);
|
||||
}
|
||||
}
|
||||
throw new NetSchedulerError("No available proxy currently.", "NO_AVAILABLE_PROXY");
|
||||
throw new NetSchedulerError("No proxy is available currently.", "NO_PROXY_AVAILABLE");
|
||||
}
|
||||
|
||||
/*
|
||||
@ -168,10 +168,11 @@ class NetScheduler {
|
||||
* @param {boolean} force - If true, the request will be made even if the proxy is rate limited. Default is false.
|
||||
* @returns {Promise<any>} - A promise that resolves to the response body.
|
||||
* @throws {NetSchedulerError} - The error will be thrown in following cases:
|
||||
* - Proxy not found: with error code PROXY_NOT_FOUND
|
||||
* - Proxy is under rate limit: with error code PROXY_RATE_LIMITED
|
||||
* - The native `fetch` function threw an error: with error code FETCH_ERROR
|
||||
* - The proxy type is not supported: with error code NOT_IMPLEMENTED
|
||||
* - Proxy not found: with error code `PROXY_NOT_FOUND`
|
||||
* - Proxy is under rate limit: with error code `PROXY_RATE_LIMITED`
|
||||
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
|
||||
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
|
||||
* - The proxy type is not supported: with error code `NOT_IMPLEMENTED`
|
||||
*/
|
||||
async proxyRequest<R>(
|
||||
url: string,
|
||||
@ -255,8 +256,6 @@ class NetScheduler {
|
||||
}
|
||||
|
||||
private async alicloudFcRequest<R>(url: string, region: string): Promise<R> {
|
||||
let rawOutput: null | Uint8Array = null;
|
||||
let rawErr: null | Uint8Array = null;
|
||||
try {
|
||||
const decoder = new TextDecoder();
|
||||
const output = await new Deno.Command("aliyun", {
|
||||
@ -280,15 +279,9 @@ class NetScheduler {
|
||||
`CVSA-${region}`,
|
||||
],
|
||||
}).output();
|
||||
rawOutput = output.stdout;
|
||||
rawErr = output.stderr;
|
||||
const out = decoder.decode(output.stdout);
|
||||
const rawData = JSON.parse(out);
|
||||
if (rawData.statusCode !== 200) {
|
||||
const fileId = randomUUID();
|
||||
await Deno.writeFile(`./logs/files/${fileId}.stdout`, output.stdout);
|
||||
await Deno.writeFile(`./logs/files/${fileId}.stderr`, output.stderr);
|
||||
logger.log(`Returned non-200 status code. Raw ouput saved to ./logs/files/${fileId}.stdout/stderr`, "net", "fn:alicloudFcRequest")
|
||||
throw new NetSchedulerError(
|
||||
`Error proxying ${url} to ali-fc region ${region}, code: ${rawData.statusCode}.`,
|
||||
"ALICLOUD_PROXY_ERR",
|
||||
@ -297,12 +290,6 @@ class NetScheduler {
|
||||
return JSON.parse(JSON.parse(rawData.body)) as R;
|
||||
}
|
||||
} catch (e) {
|
||||
if (rawOutput !== null || rawErr !== null) {
|
||||
const fileId = randomUUID();
|
||||
rawOutput && await Deno.writeFile(`./logs/files/${fileId}.stdout`, rawOutput);
|
||||
rawErr && await Deno.writeFile(`./logs/files/${fileId}.stderr`, rawErr);
|
||||
logger.log(`Error occurred. Raw ouput saved to ./logs/files/${fileId}.stdout/stderr`, "net", "fn:alicloudFcRequest")
|
||||
}
|
||||
logger.error(e as Error, "net", "fn:alicloudFcRequest");
|
||||
throw new NetSchedulerError(`Unhandled error: Cannot proxy ${url} to ali-fc.`, "ALICLOUD_PROXY_ERR", e);
|
||||
}
|
||||
@ -361,7 +348,8 @@ Execution order for setup:
|
||||
- Call after addProxy and addTask. Configures rate limiters specifically for tasks and their associated proxies.
|
||||
- Depends on tasks and proxies being defined to apply limiters correctly.
|
||||
4. setProviderLimiter(providerName, config):
|
||||
- Call after addProxy and addTask. Sets rate limiters at the provider level, affecting all proxies used by tasks of that provider.
|
||||
- Call after addProxy and addTask.
|
||||
- It sets rate limiters at the provider level, affecting all proxies used by tasks of that provider.
|
||||
- Depends on tasks and proxies being defined to identify which proxies to apply provider-level limiters to.
|
||||
|
||||
In summary: addProxy -> addTask -> (setTaskLimiter and/or setProviderLimiter).
|
||||
|
@ -15,12 +15,11 @@ export async function collectSongs(client: Client) {
|
||||
export async function insertIntoSongs(client: Client, aid: number) {
|
||||
await client.queryObject(
|
||||
`
|
||||
INSERT INTO songs (aid, bvid, published_at, duration)
|
||||
INSERT INTO songs (aid, published_at, duration)
|
||||
VALUES (
|
||||
$1,
|
||||
(SELECT bvid FROM all_data WHERE aid = $1),
|
||||
(SELECT published_at FROM all_data WHERE aid = $1),
|
||||
(SELECT duration FROM all_data WHERE aid = $1)
|
||||
(SELECT published_at FROM bilibili_metadata WHERE aid = $1),
|
||||
(SELECT duration FROM bilibili_metadata WHERE aid = $1)
|
||||
)
|
||||
ON CONFLICT DO NOTHING
|
||||
`,
|
||||
|
@ -4,6 +4,7 @@ import { formatTimestampToPsql } from "lib/utils/formatTimestampToPostgre.ts";
|
||||
import logger from "lib/log/logger.ts";
|
||||
import { ClassifyVideoQueue } from "lib/mq/index.ts";
|
||||
import { userExistsInBiliUsers, videoExistsInAllData } from "lib/db/allData.ts";
|
||||
import { HOUR, SECOND } from "$std/datetime/constants.ts";
|
||||
|
||||
export async function insertVideoInfo(client: Client, aid: number) {
|
||||
const videoExists = await videoExistsInAllData(client, aid);
|
||||
@ -18,25 +19,25 @@ export async function insertVideoInfo(client: Client, aid: number) {
|
||||
const desc = data.View.desc;
|
||||
const uid = data.View.owner.mid;
|
||||
const tags = data.Tags
|
||||
.filter((tag) => tag.tag_type in ["old_channel", "topic"])
|
||||
.filter((tag) => !["old_channel", "topic"].indexOf(tag.tag_type))
|
||||
.map((tag) => tag.tag_name).join(",");
|
||||
const title = data.View.title;
|
||||
const published_at = formatTimestampToPsql(data.View.pubdate);
|
||||
const published_at = formatTimestampToPsql(data.View.pubdate * SECOND + 8 * HOUR);
|
||||
const duration = data.View.duration;
|
||||
await client.queryObject(
|
||||
`INSERT INTO all_data (aid, bvid, description, uid, tags, title, published_at, duration)
|
||||
`INSERT INTO bilibili_metadata (aid, bvid, description, uid, tags, title, published_at, duration)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`,
|
||||
[aid, bvid, desc, uid, tags, title, published_at, duration],
|
||||
);
|
||||
const userExists = await userExistsInBiliUsers(client, aid);
|
||||
if (!userExists) {
|
||||
await client.queryObject(
|
||||
`INSERT INTO bili_user (uid, username, "desc", fans) VALUES ($1, $2, $3, $4)`,
|
||||
`INSERT INTO bilibili_user (uid, username, "desc", fans) VALUES ($1, $2, $3, $4)`,
|
||||
[uid, data.View.owner.name, data.Card.card.sign, data.Card.follower],
|
||||
);
|
||||
} else {
|
||||
await client.queryObject(
|
||||
`UPDATE bili_user SET fans = $1 WHERE uid = $2`,
|
||||
`UPDATE bilibili_user SET fans = $1 WHERE uid = $2`,
|
||||
[data.Card.follower, uid],
|
||||
);
|
||||
}
|
||||
|
@ -1,12 +1,28 @@
|
||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||
import { getVideoInfo } from "lib/net/getVideoInfo.ts";
|
||||
import { LatestSnapshotType } from "lib/db/schema.d.ts";
|
||||
import logger from "lib/log/logger.ts";
|
||||
|
||||
export async function insertVideoStats(client: Client, aid: number, task: string) {
|
||||
/*
|
||||
* Fetch video stats from bilibili API and insert into database
|
||||
* @returns {Promise<number|VideoSnapshot>}
|
||||
* A number indicating the status code when receiving non-0 status code from bilibili,
|
||||
* otherwise an VideoSnapshot object containing the video stats
|
||||
* @throws {NetSchedulerError} - The error will be thrown in following cases:
|
||||
* - No proxy is available currently: with error code `NO_PROXY_AVAILABLE`
|
||||
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
|
||||
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
|
||||
*/
|
||||
export async function insertVideoSnapshot(
|
||||
client: Client,
|
||||
aid: number,
|
||||
task: string,
|
||||
): Promise<number | LatestSnapshotType> {
|
||||
const data = await getVideoInfo(aid, task);
|
||||
const time = new Date().getTime();
|
||||
if (typeof data == 'number') {
|
||||
if (typeof data == "number") {
|
||||
return data;
|
||||
}
|
||||
const time = new Date().getTime();
|
||||
const views = data.stat.view;
|
||||
const danmakus = data.stat.danmaku;
|
||||
const replies = data.stat.reply;
|
||||
@ -14,11 +30,19 @@ export async function insertVideoStats(client: Client, aid: number, task: string
|
||||
const coins = data.stat.coin;
|
||||
const shares = data.stat.share;
|
||||
const favorites = data.stat.favorite;
|
||||
await client.queryObject(`
|
||||
|
||||
const query: string = `
|
||||
INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||
`, [aid, views, danmakus, replies, likes, coins, shares, favorites]);
|
||||
return {
|
||||
`;
|
||||
await client.queryObject(
|
||||
query,
|
||||
[aid, views, danmakus, replies, likes, coins, shares, favorites],
|
||||
);
|
||||
|
||||
logger.log(`Taken snapshot for video ${aid}.`, "net", "fn:insertVideoSnapshot");
|
||||
|
||||
const snapshot: LatestSnapshotType = {
|
||||
aid,
|
||||
views,
|
||||
danmakus,
|
||||
@ -27,6 +51,8 @@ export async function insertVideoStats(client: Client, aid: number, task: string
|
||||
coins,
|
||||
shares,
|
||||
favorites,
|
||||
time
|
||||
}
|
||||
time,
|
||||
};
|
||||
|
||||
return snapshot;
|
||||
}
|
||||
|
4
lib/net/bilibili.d.ts
vendored
4
lib/net/bilibili.d.ts
vendored
@ -26,8 +26,8 @@ interface VideoInfoData {
|
||||
mid: number;
|
||||
name: string;
|
||||
face: string;
|
||||
},
|
||||
stat: VideoStats,
|
||||
};
|
||||
stat: VideoStats;
|
||||
}
|
||||
|
||||
interface VideoDetailsData {
|
||||
|
@ -2,6 +2,19 @@ import netScheduler from "lib/mq/scheduler.ts";
|
||||
import { VideoInfoData, VideoInfoResponse } from "lib/net/bilibili.d.ts";
|
||||
import logger from "lib/log/logger.ts";
|
||||
|
||||
/*
|
||||
* Fetch video metadata from bilibili API
|
||||
* @param {number} aid - The video's aid
|
||||
* @param {string} task - The task name used in scheduler. It can be one of the following:
|
||||
* - snapshotVideo
|
||||
* - getVideoInfo
|
||||
* - snapshotMilestoneVideo
|
||||
* @returns {Promise<VideoInfoData | number>} VideoInfoData or the error code returned by bilibili API
|
||||
* @throws {NetSchedulerError} - The error will be thrown in following cases:
|
||||
* - No proxy is available currently: with error code `NO_PROXY_AVAILABLE`
|
||||
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
|
||||
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
|
||||
*/
|
||||
export async function getVideoInfo(aid: number, task: string): Promise<VideoInfoData | number> {
|
||||
const url = `https://api.bilibili.com/x/web-interface/view?aid=${aid}`;
|
||||
const data = await netScheduler.request<VideoInfoResponse>(url, task);
|
||||
|
@ -1,9 +1,9 @@
|
||||
export const formatSeconds = (seconds: number) => {
|
||||
if (seconds < 60) {
|
||||
return `${(seconds).toFixed(1)}s`;
|
||||
return `${seconds.toFixed(1)}s`;
|
||||
}
|
||||
if (seconds < 3600) {
|
||||
return `${Math.floor(seconds / 60)}m${seconds % 60}s`;
|
||||
return `${Math.floor(seconds / 60)}m${(seconds % 60).toFixed(1)}s`;
|
||||
}
|
||||
return `${Math.floor(seconds / 3600)}h ${((seconds % 3600) / 60).toFixed(2)}m`;
|
||||
};
|
||||
|
@ -4,20 +4,20 @@ from model import CompactPredictor
|
||||
import torch
|
||||
|
||||
def main():
|
||||
model = CompactPredictor(16).to('cpu', dtype=torch.float32)
|
||||
model.load_state_dict(torch.load('./pred/checkpoints/model_20250315_0530.pt'))
|
||||
model = CompactPredictor(10).to('cpu', dtype=torch.float32)
|
||||
model.load_state_dict(torch.load('./pred/checkpoints/long_term.pt'))
|
||||
model.eval()
|
||||
# inference
|
||||
initial = 999269
|
||||
initial = 997029
|
||||
last = initial
|
||||
start_time = '2025-03-15 01:03:21'
|
||||
for i in range(1, 48):
|
||||
start_time = '2025-03-17 00:13:17'
|
||||
for i in range(1, 120):
|
||||
hour = i / 0.5
|
||||
sec = hour * 3600
|
||||
time_d = np.log2(sec)
|
||||
data = [time_d, np.log2(initial+1), # time_delta, current_views
|
||||
2.801318, 3.455128, 3.903391, 3.995577, 4.641488, 5.75131, 6.723868, 6.105322, 8.141023, 9.576701, 10.665067, # grows_feat
|
||||
0.043993, 0.72057, 28.000902 # time_feat
|
||||
6.111542, 8.404707, 10.071566, 11.55888, 12.457823,# grows_feat
|
||||
0.009225, 0.001318, 28.001814# time_feat
|
||||
]
|
||||
np_arr = np.array([data])
|
||||
tensor = torch.from_numpy(np_arr).to('cpu', dtype=torch.float32)
|
||||
@ -25,7 +25,7 @@ def main():
|
||||
num = output.detach().numpy()[0][0]
|
||||
views_pred = int(np.exp2(num)) + initial
|
||||
current_time = datetime.datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S') + datetime.timedelta(hours=hour)
|
||||
print(current_time.strftime('%m-%d %H:%M'), views_pred, views_pred - last)
|
||||
print(current_time.strftime('%m-%d %H:%M:%S'), views_pred, views_pred - last)
|
||||
last = views_pred
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
@ -1,10 +1,10 @@
|
||||
import { Job, Worker } from "bullmq";
|
||||
import { ConnectionOptions, Job, Worker } from "bullmq";
|
||||
import { redis } from "lib/db/redis.ts";
|
||||
import logger from "lib/log/logger.ts";
|
||||
import { classifyVideosWorker, classifyVideoWorker } from "lib/mq/exec/classifyVideo.ts";
|
||||
import { WorkerError } from "lib/mq/schema.ts";
|
||||
import { lockManager } from "lib/mq/lockManager.ts";
|
||||
import { initializeModels } from "lib/ml/filter_inference.ts";
|
||||
import Akari from "lib/ml/akari.ts";
|
||||
|
||||
Deno.addSignalListener("SIGINT", async () => {
|
||||
logger.log("SIGINT Received: Shutting down workers...", "mq");
|
||||
@ -18,7 +18,7 @@ Deno.addSignalListener("SIGTERM", async () => {
|
||||
Deno.exit();
|
||||
});
|
||||
|
||||
await initializeModels();
|
||||
Akari.init();
|
||||
|
||||
const filterWorker = new Worker(
|
||||
"classifyVideo",
|
||||
@ -32,7 +32,7 @@ const filterWorker = new Worker(
|
||||
break;
|
||||
}
|
||||
},
|
||||
{ connection: redis, concurrency: 2, removeOnComplete: { count: 1000 } },
|
||||
{ connection: redis as ConnectionOptions, concurrency: 2, removeOnComplete: { count: 1000 } },
|
||||
);
|
||||
|
||||
filterWorker.on("active", () => {
|
||||
|
@ -1,21 +1,27 @@
|
||||
import { Job, Worker } from "bullmq";
|
||||
import { ConnectionOptions, Job, Worker } from "bullmq";
|
||||
import { collectSongsWorker, getLatestVideosWorker } from "lib/mq/executors.ts";
|
||||
import { redis } from "lib/db/redis.ts";
|
||||
import logger from "lib/log/logger.ts";
|
||||
import { lockManager } from "lib/mq/lockManager.ts";
|
||||
import { WorkerError } from "lib/mq/schema.ts";
|
||||
import { getVideoInfoWorker } from "lib/mq/exec/getLatestVideos.ts";
|
||||
import { snapshotTickWorker, takeSnapshotForMilestoneVideoWorker, takeSnapshotForVideoWorker } from "lib/mq/exec/snapshotTick.ts";
|
||||
import {
|
||||
collectMilestoneSnapshotsWorker, regularSnapshotsWorker,
|
||||
snapshotTickWorker,
|
||||
takeSnapshotForVideoWorker,
|
||||
} from "lib/mq/exec/snapshotTick.ts";
|
||||
|
||||
Deno.addSignalListener("SIGINT", async () => {
|
||||
logger.log("SIGINT Received: Shutting down workers...", "mq");
|
||||
await latestVideoWorker.close(true);
|
||||
await snapshotWorker.close(true);
|
||||
Deno.exit();
|
||||
});
|
||||
|
||||
Deno.addSignalListener("SIGTERM", async () => {
|
||||
logger.log("SIGTERM Received: Shutting down workers...", "mq");
|
||||
await latestVideoWorker.close(true);
|
||||
await snapshotWorker.close(true);
|
||||
Deno.exit();
|
||||
});
|
||||
|
||||
@ -36,7 +42,12 @@ const latestVideoWorker = new Worker(
|
||||
break;
|
||||
}
|
||||
},
|
||||
{ connection: redis, concurrency: 6, removeOnComplete: { count: 1440 }, removeOnFail: { count: 0 } },
|
||||
{
|
||||
connection: redis as ConnectionOptions,
|
||||
concurrency: 6,
|
||||
removeOnComplete: { count: 1440 },
|
||||
removeOnFail: { count: 0 },
|
||||
},
|
||||
);
|
||||
|
||||
latestVideoWorker.on("active", () => {
|
||||
@ -56,23 +67,26 @@ const snapshotWorker = new Worker(
|
||||
"snapshot",
|
||||
async (job: Job) => {
|
||||
switch (job.name) {
|
||||
case "scheduleSnapshotTick":
|
||||
await snapshotTickWorker(job);
|
||||
break;
|
||||
case "snapshotMilestoneVideo":
|
||||
await takeSnapshotForMilestoneVideoWorker(job);
|
||||
break;
|
||||
case "snapshotVideo":
|
||||
await takeSnapshotForVideoWorker(job);
|
||||
break;
|
||||
case "snapshotTick":
|
||||
await snapshotTickWorker(job);
|
||||
break;
|
||||
case "collectMilestoneSnapshots":
|
||||
await collectMilestoneSnapshotsWorker(job);
|
||||
break;
|
||||
case "dispatchRegularSnapshots":
|
||||
await regularSnapshotsWorker(job);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
},
|
||||
{ connection: redis, concurrency: 10, removeOnComplete: { count: 2000 } },
|
||||
{ connection: redis as ConnectionOptions, concurrency: 10, removeOnComplete: { count: 2000 } },
|
||||
);
|
||||
|
||||
snapshotWorker.on("error", (err) => {
|
||||
const e = err as WorkerError;
|
||||
logger.error(e.rawError, e.service, e.codePath);
|
||||
})
|
||||
});
|
||||
|
18
test/db/snapshotSchedule.test.ts
Normal file
18
test/db/snapshotSchedule.test.ts
Normal file
@ -0,0 +1,18 @@
|
||||
import { assertEquals, assertInstanceOf, assertNotEquals } from "@std/assert";
|
||||
import { findClosestSnapshot } from "lib/db/snapshotSchedule.ts";
|
||||
import { postgresConfig } from "lib/db/pgConfig.ts";
|
||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||
|
||||
Deno.test("Snapshot Schedule - getShortTermTimeFeaturesForVideo", async () => {
|
||||
const client = new Client(postgresConfig);
|
||||
try {
|
||||
const result = await findClosestSnapshot(client, 247308539, new Date(1741983383000));
|
||||
assertNotEquals(result, null);
|
||||
const created_at = result!.created_at;
|
||||
const views = result!.views;
|
||||
assertInstanceOf(created_at, Date);
|
||||
assertEquals(typeof views, "number");
|
||||
} finally {
|
||||
client.end();
|
||||
}
|
||||
});
|
22
test/ml/akari.json
Normal file
22
test/ml/akari.json
Normal file
@ -0,0 +1,22 @@
|
||||
{
|
||||
"test1": [
|
||||
{
|
||||
"title": "【洛天依】《一花依世界》(2024重调版)|“抬头仰望,夜空多安详”【原创PV付】",
|
||||
"desc": "本家:BV1Vs411H7JH\n作曲:LS\n作词:杏花包子\n调教:鬼面P\n混音:虎皮猫P\n演唱:洛天依\n曲绘:山下鸭鸭窝\n映像:阿妍\n——————————————————————\n本稿为同人二创,非本家重制",
|
||||
"tags": "发现《一花依世界》, Vsinger创作激励计划, 洛天依, VOCALOID CHINA, 翻唱, 原创PV付, ACE虚拟歌姬, 中文VOCALOID, 国风电子, 一花依世界, ACE Studio, Vsinger创作激励计划2024冬季物语",
|
||||
"label": 2
|
||||
},
|
||||
{
|
||||
"title": "【鏡音レン】アカシア【VOCALOID Cover】",
|
||||
"desc": "鏡音リン・レン 13th Anniversary\n\nMusic:BUMP OF CHICKEN https://youtu.be/BoZ0Zwab6Oc\nust:Maplestyle sm37853236\nOff Vocal: https://youtu.be/YMzrUzq1uX0\nSinger:鏡音レン\n\n氷雨ハルカ\nYoutube :https://t.co/8zuv6g7Acm\nniconico:https://t.co/C6DRfdYAp0\ntwitter :https://twitter.com/hisame_haruka\n\n転載禁止\nPlease do not reprint without my permission.",
|
||||
"tags": "鏡音レン",
|
||||
"label": 0
|
||||
},
|
||||
{
|
||||
"title": "【洛天依原创曲】谪星【姆斯塔之谕】",
|
||||
"desc": "谪星\n\n策划/世界观:听雨\n作词:听雨\n作曲/编曲:太白\n混音:虎皮猫\n人设:以木\n曲绘:Ar极光\n调校:哈士奇p\n视频:苏卿白",
|
||||
"tags": "2025虚拟歌手贺岁纪, 洛天依, 原创歌曲, VOCALOID, 虚拟歌手, 原创音乐, 姆斯塔, 中文VOCALOID",
|
||||
"label": 1
|
||||
}
|
||||
]
|
||||
}
|
46
test/ml/akari.test.ts
Normal file
46
test/ml/akari.test.ts
Normal file
@ -0,0 +1,46 @@
|
||||
import Akari from "lib/ml/akari.ts";
|
||||
import { assertEquals, assertGreaterOrEqual } from "jsr:@std/assert";
|
||||
import { join } from "$std/path/join.ts";
|
||||
import { SECOND } from "$std/datetime/constants.ts";
|
||||
|
||||
Deno.test("Akari AI - normal cases accuracy test", async () => {
|
||||
const path = import.meta.dirname!;
|
||||
const dataPath = join(path, "akari.json");
|
||||
const rawData = await Deno.readTextFile(dataPath);
|
||||
const data = JSON.parse(rawData);
|
||||
await Akari.init();
|
||||
for (const testCase of data.test1) {
|
||||
const result = await Akari.classifyVideo(
|
||||
testCase.title,
|
||||
testCase.desc,
|
||||
testCase.tags,
|
||||
);
|
||||
assertEquals(result, testCase.label);
|
||||
}
|
||||
});
|
||||
|
||||
Deno.test("Akari AI - performance test", async () => {
|
||||
const path = import.meta.dirname!;
|
||||
const dataPath = join(path, "akari.json");
|
||||
const rawData = await Deno.readTextFile(dataPath);
|
||||
const data = JSON.parse(rawData);
|
||||
await Akari.init();
|
||||
const N = 200;
|
||||
const testCase = data.test1[0];
|
||||
const title = testCase.title;
|
||||
const desc = testCase.desc;
|
||||
const tags = testCase.tags;
|
||||
const time = performance.now();
|
||||
for (let i = 0; i < N; i++) {
|
||||
await Akari.classifyVideo(
|
||||
title,
|
||||
desc,
|
||||
tags,
|
||||
);
|
||||
}
|
||||
const end = performance.now();
|
||||
const elapsed = (end - time) / SECOND;
|
||||
const throughput = N / elapsed;
|
||||
assertGreaterOrEqual(throughput, 100);
|
||||
console.log(`Akari AI throughput: ${throughput.toFixed(1)} samples / sec`);
|
||||
});
|
Loading…
Reference in New Issue
Block a user