feat: snapshot based on persistent schedule

This commit is contained in:
alikia2x (寒寒) 2025-03-23 17:44:59 +08:00
parent b201bfd64d
commit 2c12310e8c
Signed by: alikia2x
GPG Key ID: 56209E0CCD8420C6
9 changed files with 170 additions and 237 deletions

View File

@ -68,3 +68,10 @@ export async function getUnArchivedBiliUsers(client: Client) {
const rows = queryResult.rows;
return rows.map((row) => row.uid);
}
export async function setBiliVideoStatus(client: Client, aid: number, status: number) {
return await client.queryObject(
`UPDATE bilibili_metadata SET status = $1 WHERE aid = $2`,
[status, aid],
);
}

12
lib/db/schema.d.ts vendored
View File

@ -32,6 +32,18 @@ export interface VideoSnapshotType {
replies: number;
}
export interface LatestSnapshotType {
aid: number;
time: number;
views: number;
danmakus: number;
replies: number;
likes: number;
coins: number;
shares: number;
favorites: number;
}
export interface SnapshotScheduleType {
id: number;
aid: number;

View File

@ -1,35 +1,17 @@
import { DAY, SECOND } from "$std/datetime/constants.ts";
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { VideoSnapshotType } from "lib/db/schema.d.ts";
import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts";
import { LatestSnapshotType } from "lib/db/schema.d.ts";
export async function getVideosNearMilestone(client: Client) {
const queryResult = await client.queryObject<VideoSnapshotType>(`
WITH filtered_snapshots AS (
SELECT
vs.*
FROM
video_snapshot vs
WHERE
(vs.views >= 90000 AND vs.views < 100000) OR
(vs.views >= 900000 AND vs.views < 1000000)
),
ranked_snapshots AS (
SELECT
fs.*,
ROW_NUMBER() OVER (PARTITION BY fs.aid ORDER BY fs.created_at DESC) as rn,
MAX(fs.views) OVER (PARTITION BY fs.aid) as max_views_per_aid
FROM
filtered_snapshots fs
INNER JOIN
songs s ON fs.aid = s.aid
)
SELECT
rs.id, rs.created_at, rs.views, rs.coins, rs.likes, rs.favorites, rs.shares, rs.danmakus, rs.aid, rs.replies
FROM
ranked_snapshots rs
WHERE
rs.rn = 1;
const queryResult = await client.queryObject<LatestSnapshotType>(`
SELECT ls.*
FROM latest_video_snapshot ls
INNER JOIN
songs s ON ls.aid = s.aid
WHERE
s.deleted = false AND
(views >= 90000 AND views < 100000) OR
(views >= 900000 AND views < 1000000) OR
(views >= 9900000 AND views < 10000000)
`);
return queryResult.rows.map((row) => {
return {
@ -38,161 +20,3 @@ export async function getVideosNearMilestone(client: Client) {
};
});
}
export async function getUnsnapshotedSongs(client: Client) {
const queryResult = await client.queryObject<{ aid: bigint }>(`
SELECT DISTINCT s.aid
FROM songs s
LEFT JOIN video_snapshot v ON s.aid = v.aid
WHERE v.aid IS NULL;
`);
return queryResult.rows.map((row) => Number(row.aid));
}
export async function getSongSnapshotCount(client: Client, aid: number) {
const queryResult = await client.queryObject<{ count: number }>(
`
SELECT COUNT(*) AS count
FROM video_snapshot
WHERE aid = $1;
`,
[aid],
);
return queryResult.rows[0].count;
}
export async function getShortTermEtaPrediction(client: Client, aid: number) {
const queryResult = await client.queryObject<{ eta: number }>(
`
WITH old_snapshot AS (
SELECT created_at, views
FROM video_snapshot
WHERE aid = $1 AND
NOW() - created_at > '20 min'
ORDER BY created_at DESC
LIMIT 1
),
new_snapshot AS (
SELECT created_at, views
FROM video_snapshot
WHERE aid = $1
ORDER BY created_at DESC
LIMIT 1
)
SELECT
CASE
WHEN n.views > 100000
THEN
(1000000 - n.views) -- Views remaining
/
(
(n.views - o.views) -- Views delta
/
(EXTRACT(EPOCH FROM (n.created_at - o.created_at)) + 0.001) -- Time delta in seconds
+ 0.001
) -- Increment per second
ELSE
(100000 - n.views) -- Views remaining
/
(
(n.views - o.views) -- Views delta
/
(EXTRACT(EPOCH FROM (n.created_at - o.created_at)) + 0.001) -- Time delta in seconds
+ 0.001
) -- Increment per second
END AS eta
FROM old_snapshot o, new_snapshot n;
`,
[aid],
);
if (queryResult.rows.length === 0) {
return null;
}
return queryResult.rows[0].eta;
}
export async function getIntervalFromLastSnapshotToNow(client: Client, aid: number) {
const queryResult = await client.queryObject<{ interval: number }>(
`
SELECT EXTRACT(EPOCH FROM (NOW() - created_at)) AS interval
FROM video_snapshot
WHERE aid = $1
ORDER BY created_at DESC
LIMIT 1;
`,
[aid],
);
if (queryResult.rows.length === 0) {
return null;
}
return queryResult.rows[0].interval;
}
export async function songEligibleForMilestoneSnapshot(client: Client, aid: number) {
const count = await getSongSnapshotCount(client, aid);
if (count < 2) {
return true;
}
const queryResult = await client.queryObject<
{ views1: number; created_at1: string; views2: number; created_at2: string }
>(
`
WITH latest_snapshot AS (
SELECT
aid,
views,
created_at
FROM video_snapshot
WHERE aid = $1
ORDER BY created_at DESC
LIMIT 1
),
pairs AS (
SELECT
a.views AS views1,
a.created_at AS created_at1,
b.views AS views2,
b.created_at AS created_at2,
(b.created_at - a.created_at) AS interval
FROM video_snapshot a
JOIN latest_snapshot b
ON a.aid = b.aid
AND a.created_at < b.created_at
)
SELECT
views1,
created_at1,
views2,
created_at2
FROM (
SELECT
*,
ROW_NUMBER() OVER (
ORDER BY
CASE WHEN interval <= INTERVAL '3 days' THEN 0 ELSE 1 END,
CASE WHEN interval <= INTERVAL '3 days' THEN -interval ELSE interval END
) AS rn
FROM pairs
) ranked
WHERE rn = 1;
`,
[aid],
);
if (queryResult.rows.length === 0) {
return true;
}
const recentViewsData = queryResult.rows[0];
const time1 = parseTimestampFromPsql(recentViewsData.created_at1);
const time2 = parseTimestampFromPsql(recentViewsData.created_at2);
const intervalSec = (time2 - time1) / SECOND;
const views1 = recentViewsData.views1;
const views2 = recentViewsData.views2;
const viewsDiff = views2 - views1;
if (viewsDiff == 0) {
return false;
}
const nextMilestone = views2 >= 100000 ? 1000000 : 100000;
const expectedViewsDiff = nextMilestone - views2;
const expectedIntervalSec = expectedViewsDiff / viewsDiff * intervalSec;
return expectedIntervalSec <= 3 * DAY;
}

View File

@ -14,6 +14,14 @@ export async function videoHasActiveSchedule(client: Client, aid: number) {
return res.rows.length > 0;
}
export async function videoHasProcessingSchedule(client: Client, aid: number) {
const res = await client.queryObject<{ status: string }>(
`SELECT status FROM snapshot_schedule WHERE aid = $1 AND status = 'processing'`,
[aid],
);
return res.rows.length > 0;
}
interface Snapshot {
created_at: number;
views: number;
@ -156,9 +164,20 @@ function truncateTo5MinInterval(timestamp: Date): Date {
}
export async function getSnapshotsInNextSecond(client: Client) {
const res = await client.queryObject<SnapshotScheduleType>(
`SELECT * FROM cvsa.public.snapshot_schedule WHERE started_at <= NOW() + INTERVAL '1 second'`,
[],
);
const query = `
SELECT *
FROM snapshot_schedule
WHERE started_at
BETWEEN NOW() - INTERVAL '5 seconds'
AND NOW() + INTERVAL '1 seconds'
`;
const res = await client.queryObject<SnapshotScheduleType>(query, []);
return res.rows;
}
export async function setSnapshotStatus(client: Client, id: number, status: string) {
return client.queryObject(
`UPDATE snapshot_schedule SET status = $2 WHERE id = $1`,
[id, status],
);
}

View File

@ -6,27 +6,44 @@ import {
getLatestSnapshot,
getSnapshotsInNextSecond,
scheduleSnapshot,
setSnapshotStatus,
videoHasActiveSchedule,
videoHasProcessingSchedule,
} from "lib/db/snapshotSchedule.ts";
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { HOUR, MINUTE } from "$std/datetime/constants.ts";
import { HOUR, MINUTE, SECOND } from "$std/datetime/constants.ts";
import logger from "lib/log/logger.ts";
import { SnapshotQueue } from "lib/mq/index.ts";
import { insertVideoSnapshot } from "../task/getVideoStats.ts";
import { NetSchedulerError } from "../scheduler.ts";
import { setBiliVideoStatus } from "../../db/allData.ts";
const priorityMap: { [key: string]: number } = {
"milestone": 1,
};
const snapshotTypeToTaskMap: { [key: string]: string } = {
"milestone": "snapshotMilestoneVideo",
"normal": "snapshotVideo",
};
export const snapshotTickWorker = async (_job: Job) => {
const client = await db.connect();
try {
const schedules = await getSnapshotsInNextSecond(client);
for (const schedule of schedules) {
let priority = 3;
if (schedule.type && priorityMap[schedule.type])
if (schedule.type && priorityMap[schedule.type]) {
priority = priorityMap[schedule.type];
await SnapshotQueue.add("snapshotVideo", { aid: schedule.aid, priority });
}
await SnapshotQueue.add("snapshotVideo", {
aid: schedule.aid,
id: schedule.id,
type: schedule.type ?? "normal",
}, { priority });
}
} catch (e) {
logger.error(e as Error);
} finally {
client.release();
}
@ -52,7 +69,7 @@ const getAdjustedShortTermETA = async (client: Client, aid: number) => {
if (!latestSnapshot) return 0;
const currentTimestamp = Date.now();
const timeIntervals = [20 * MINUTE, 1 * HOUR, 3 * HOUR, 6 * HOUR];
const timeIntervals = [3 * MINUTE, 20 * MINUTE, 1 * HOUR, 3 * HOUR, 6 * HOUR];
const DELTA = 0.00001;
let minETAHours = Infinity;
@ -94,6 +111,42 @@ export const collectMilestoneSnapshotsWorker = async (_job: Job) => {
}
};
export const takeSnapshotForVideoWorker = async (_job: Job) => {
// TODO: implement
export const takeSnapshotForVideoWorker = async (job: Job) => {
const id = job.data.id;
const aid = job.data.aid;
const task = snapshotTypeToTaskMap[job.data.type] ?? "snapshotVideo";
const client = await db.connect();
try {
if (await videoHasProcessingSchedule(client, aid)) {
return `ALREADY_PROCESSING`;
}
await setSnapshotStatus(client, id, "processing");
const stat = await insertVideoSnapshot(client, aid, task);
if (typeof stat === "number") {
await setBiliVideoStatus(client, aid, stat);
await setSnapshotStatus(client, id, "completed");
return `BILI_STATUS_${stat}`;
}
const eta = await getAdjustedShortTermETA(client, aid);
if (eta > 72) return "ETA_TOO_LONG";
const now = Date.now();
const targetTime = now + eta * HOUR;
await setSnapshotStatus(client, id, "completed");
await scheduleSnapshot(client, aid, "milestone", targetTime);
} catch (e) {
if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") {
logger.warn(
`No available proxy for aid ${job.data.aid}.`,
"mq",
"fn:takeSnapshotForVideoWorker",
);
await setSnapshotStatus(client, id, "completed");
await scheduleSnapshot(client, aid, "milestone", Date.now() + 5 * SECOND);
return;
}
logger.error(e as Error, "mq", "fn:takeSnapshotForVideoWorker");
await setSnapshotStatus(client, id, "failed");
} finally {
client.release();
}
};

View File

@ -21,7 +21,7 @@ interface ProxiesMap {
}
type NetSchedulerErrorCode =
| "NO_AVAILABLE_PROXY"
| "NO_PROXY_AVAILABLE"
| "PROXY_RATE_LIMITED"
| "PROXY_NOT_FOUND"
| "FETCH_ERROR"
@ -143,10 +143,10 @@ class NetScheduler {
* @param {string} method - The HTTP method to use for the request. Default is "GET".
* @returns {Promise<any>} - A promise that resolves to the response body.
* @throws {NetSchedulerError} - The error will be thrown in following cases:
* - No available proxy currently: with error code NO_AVAILABLE_PROXY
* - Proxy is under rate limit: with error code PROXY_RATE_LIMITED
* - The native `fetch` function threw an error: with error code FETCH_ERROR
* - The proxy type is not supported: with error code NOT_IMPLEMENTED
* - No proxy is available currently: with error code `NO_PROXY_AVAILABLE`
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
* - The proxy type is not supported: with error code `NOT_IMPLEMENTED`
*/
async request<R>(url: string, task: string, method: string = "GET"): Promise<R> {
// find a available proxy
@ -156,7 +156,7 @@ class NetScheduler {
return await this.proxyRequest<R>(url, proxyName, task, method);
}
}
throw new NetSchedulerError("No available proxy currently.", "NO_AVAILABLE_PROXY");
throw new NetSchedulerError("No proxy is available currently.", "NO_PROXY_AVAILABLE");
}
/*
@ -168,10 +168,11 @@ class NetScheduler {
* @param {boolean} force - If true, the request will be made even if the proxy is rate limited. Default is false.
* @returns {Promise<any>} - A promise that resolves to the response body.
* @throws {NetSchedulerError} - The error will be thrown in following cases:
* - Proxy not found: with error code PROXY_NOT_FOUND
* - Proxy is under rate limit: with error code PROXY_RATE_LIMITED
* - The native `fetch` function threw an error: with error code FETCH_ERROR
* - The proxy type is not supported: with error code NOT_IMPLEMENTED
* - Proxy not found: with error code `PROXY_NOT_FOUND`
* - Proxy is under rate limit: with error code `PROXY_RATE_LIMITED`
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
* - The proxy type is not supported: with error code `NOT_IMPLEMENTED`
*/
async proxyRequest<R>(
url: string,
@ -255,8 +256,6 @@ class NetScheduler {
}
private async alicloudFcRequest<R>(url: string, region: string): Promise<R> {
let rawOutput: null | Uint8Array = null;
let rawErr: null | Uint8Array = null;
try {
const decoder = new TextDecoder();
const output = await new Deno.Command("aliyun", {
@ -280,19 +279,9 @@ class NetScheduler {
`CVSA-${region}`,
],
}).output();
rawOutput = output.stdout;
rawErr = output.stderr;
const out = decoder.decode(output.stdout);
const rawData = JSON.parse(out);
if (rawData.statusCode !== 200) {
const fileId = randomUUID();
await Deno.writeFile(`./logs/files/${fileId}.stdout`, output.stdout);
await Deno.writeFile(`./logs/files/${fileId}.stderr`, output.stderr);
logger.log(
`Returned non-200 status code. Raw ouput saved to ./logs/files/${fileId}.stdout/stderr`,
"net",
"fn:alicloudFcRequest",
);
throw new NetSchedulerError(
`Error proxying ${url} to ali-fc region ${region}, code: ${rawData.statusCode}.`,
"ALICLOUD_PROXY_ERR",
@ -301,16 +290,6 @@ class NetScheduler {
return JSON.parse(JSON.parse(rawData.body)) as R;
}
} catch (e) {
if (rawOutput !== null || rawErr !== null) {
const fileId = randomUUID();
rawOutput && await Deno.writeFile(`./logs/files/${fileId}.stdout`, rawOutput);
rawErr && await Deno.writeFile(`./logs/files/${fileId}.stderr`, rawErr);
logger.log(
`Error occurred. Raw ouput saved to ./logs/files/${fileId}.stdout/stderr`,
"net",
"fn:alicloudFcRequest",
);
}
logger.error(e as Error, "net", "fn:alicloudFcRequest");
throw new NetSchedulerError(`Unhandled error: Cannot proxy ${url} to ali-fc.`, "ALICLOUD_PROXY_ERR", e);
}
@ -369,7 +348,8 @@ Execution order for setup:
- Call after addProxy and addTask. Configures rate limiters specifically for tasks and their associated proxies.
- Depends on tasks and proxies being defined to apply limiters correctly.
4. setProviderLimiter(providerName, config):
- Call after addProxy and addTask. Sets rate limiters at the provider level, affecting all proxies used by tasks of that provider.
- Call after addProxy and addTask.
- It sets rate limiters at the provider level, affecting all proxies used by tasks of that provider.
- Depends on tasks and proxies being defined to identify which proxies to apply provider-level limiters to.
In summary: addProxy -> addTask -> (setTaskLimiter and/or setProviderLimiter).

View File

@ -1,12 +1,27 @@
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { getVideoInfo } from "lib/net/getVideoInfo.ts";
import { LatestSnapshotType } from "lib/db/schema.d.ts";
export async function insertVideoStats(client: Client, aid: number, task: string) {
/*
* Fetch video stats from bilibili API and insert into database
* @returns {Promise<number|VideoSnapshot>}
* A number indicating the status code when receiving non-0 status code from bilibili,
* otherwise an VideoSnapshot object containing the video stats
* @throws {NetSchedulerError} - The error will be thrown in following cases:
* - No proxy is available currently: with error code `NO_PROXY_AVAILABLE`
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
*/
export async function insertVideoSnapshot(
client: Client,
aid: number,
task: string,
): Promise<number | LatestSnapshotType> {
const data = await getVideoInfo(aid, task);
const time = new Date().getTime();
if (typeof data == "number") {
return data;
}
const time = new Date().getTime();
const views = data.stat.view;
const danmakus = data.stat.danmaku;
const replies = data.stat.reply;
@ -14,14 +29,17 @@ export async function insertVideoStats(client: Client, aid: number, task: string
const coins = data.stat.coin;
const shares = data.stat.share;
const favorites = data.stat.favorite;
await client.queryObject(
`
const query: string = `
INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`,
`;
await client.queryObject(
query,
[aid, views, danmakus, replies, likes, coins, shares, favorites],
);
return {
const snapshot: LatestSnapshotType = {
aid,
views,
danmakus,
@ -32,4 +50,6 @@ export async function insertVideoStats(client: Client, aid: number, task: string
favorites,
time,
};
return snapshot;
}

View File

@ -2,6 +2,19 @@ import netScheduler from "lib/mq/scheduler.ts";
import { VideoInfoData, VideoInfoResponse } from "lib/net/bilibili.d.ts";
import logger from "lib/log/logger.ts";
/*
* Fetch video metadata from bilibili API
* @param {number} aid - The video's aid
* @param {string} task - The task name used in scheduler. It can be one of the following:
* - snapshotVideo
* - getVideoInfo
* - snapshotMilestoneVideo
* @returns {Promise<VideoInfoData | number>} VideoInfoData or the error code returned by bilibili API
* @throws {NetSchedulerError} - The error will be thrown in following cases:
* - No proxy is available currently: with error code `NO_PROXY_AVAILABLE`
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
*/
export async function getVideoInfo(aid: number, task: string): Promise<VideoInfoData | number> {
const url = `https://api.bilibili.com/x/web-interface/view?aid=${aid}`;
const data = await netScheduler.request<VideoInfoResponse>(url, task);

View File

@ -40,7 +40,12 @@ const latestVideoWorker = new Worker(
break;
}
},
{ connection: redis as ConnectionOptions, concurrency: 6, removeOnComplete: { count: 1440 }, removeOnFail: { count: 0 } },
{
connection: redis as ConnectionOptions,
concurrency: 6,
removeOnComplete: { count: 1440 },
removeOnFail: { count: 0 },
},
);
latestVideoWorker.on("active", () => {