add: ETA estimation for short-term snapshot

This commit is contained in:
alikia2x (寒寒) 2025-03-20 01:57:33 +08:00
parent b07d0c18f9
commit 2e8ed7ce70
Signed by: alikia2x
GPG Key ID: 56209E0CCD8420C6
2 changed files with 77 additions and 83 deletions

View File

@ -1,101 +1,53 @@
import { DAY, HOUR, MINUTE, SECOND } from "$std/datetime/constants.ts";
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
/*
/*
Returns true if the specified `aid` has at least one record with "pending" or "processing" status.
*/
export async function videoHasActiveSchedule(client: Client, aid: number) {
const res = await client.queryObject<{ status: string }>(
`SELECT status FROM snapshot_schedule WHERE aid = $1 AND (status = 'pending' OR status = 'processing')`,
[aid],
);
return res.rows.length > 0;
const res = await client.queryObject<{ status: string }>(
`SELECT status FROM snapshot_schedule WHERE aid = $1 AND (status = 'pending' OR status = 'processing')`,
[aid],
);
return res.rows.length > 0;
}
interface Snapshot {
created_at: Date;
views: number;
created_at: number;
views: number;
}
export async function findClosestSnapshot(
client: Client,
aid: number,
targetTime: Date
client: Client,
aid: number,
targetTime: Date,
): Promise<Snapshot | null> {
const query = `
const query = `
SELECT created_at, views FROM video_snapshot
WHERE aid = $1
ORDER BY ABS(EXTRACT(EPOCH FROM (created_at - $2::timestamptz))) ASC
LIMIT 1
`;
const result = await client.queryObject<{ created_at: string; views: number }>(
query,
[aid, targetTime.toISOString()]
);
if (result.rows.length === 0) return null;
const row = result.rows[0];
return {
created_at: new Date(row.created_at),
views: row.views,
};
const result = await client.queryObject<{ created_at: string; views: number }>(
query,
[aid, targetTime.toISOString()],
);
if (result.rows.length === 0) return null;
const row = result.rows[0];
return {
created_at: new Date(row.created_at).getTime(),
views: row.views,
};
}
export async function getShortTermTimeFeaturesForVideo(
client: Client,
aid: number,
initialTimestampMiliseconds: number
): Promise<number[]> {
const initialTime = new Date(initialTimestampMiliseconds);
const timeWindows = [
[ 5 * MINUTE, 0 * MINUTE],
[ 15 * MINUTE, 0 * MINUTE],
[ 40 * MINUTE, 0 * MINUTE],
[ 1 * HOUR, 0 * HOUR],
[ 2 * HOUR, 1 * HOUR],
[ 3 * HOUR, 2 * HOUR],
[ 3 * HOUR, 0 * HOUR],
[ 6 * HOUR, 0 * HOUR],
[18 * HOUR, 12 * HOUR],
[ 1 * DAY, 0 * DAY],
[ 3 * DAY, 0 * DAY],
[ 7 * DAY, 0 * DAY]
];
const results: number[] = [];
for (const [windowStart, windowEnd] of timeWindows) {
const targetTimeStart = new Date(initialTime.getTime() - windowStart);
const targetTimeEnd = new Date(initialTime.getTime() - windowEnd);
const startRecord = await findClosestSnapshot(client, aid, targetTimeStart);
const endRecord = await findClosestSnapshot(client, aid, targetTimeEnd);
if (!startRecord || !endRecord) {
results.push(NaN);
continue;
}
const timeDiffSeconds =
(endRecord.created_at.getTime() - startRecord.created_at.getTime()) / 1000;
const windowDuration = windowStart - windowEnd;
let scale = 0;
if (windowDuration > 0) {
scale = timeDiffSeconds / windowDuration;
}
const viewsDiff = endRecord.views - startRecord.views;
const adjustedViews = Math.max(viewsDiff, 1);
let result: number;
if (scale > 0) {
result = Math.log2(adjustedViews / scale + 1);
} else {
result = Math.log2(adjustedViews + 1);
}
results.push(result);
}
return results;
}
export async function getLatestSnapshot(client: Client, aid: number): Promise<Snapshot | null>{
const res = await client.queryObject<{ created_at: string; views: number }>(
`SELECT created_at, views FROM video_snapshot WHERE aid = $1 ORDER BY created_at DESC LIMIT 1`,
[aid],
);
if (res.rows.length === 0) return null;
const row = res.rows[0];
return {
created_at: new Date(row.created_at).getTime(),
views: row.views,
}
}

View File

@ -1,7 +1,9 @@
import { Job } from "bullmq";
import { db } from "lib/db/init.ts";
import { getVideosNearMilestone } from "lib/db/snapshot.ts";
import { videoHasActiveSchedule } from "lib/db/snapshotSchedule.ts";
import { findClosestSnapshot, getLatestSnapshot, videoHasActiveSchedule } from "lib/db/snapshotSchedule.ts";
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { HOUR, MINUTE } from "$std/datetime/constants.ts";
export const snapshotTickWorker = async (_job: Job) => {
const client = await db.connect();
@ -12,12 +14,52 @@ export const snapshotTickWorker = async (_job: Job) => {
}
};
export const closetMilestone = (views: number) => {
if (views < 100000) return 100000;
if (views < 1000000) return 1000000;
return 10000000;
};
const log = (value: number, base: number = 10) => Math.log(value) / Math.log(base);
const getAdjustedShortTermETA = async (client: Client, aid: number) => {
const latestSnapshot = await getLatestSnapshot(client, aid);
// Immediately dispatch a snapshot if there is no snapshot yet
if (!latestSnapshot) return 0;
const currentTimestamp = Date.now();
const timeIntervals = [20 * MINUTE, 1 * HOUR, 3 * HOUR, 6 * HOUR];
const DELTA = 0.00001;
let minETAHours = Infinity;
for (const timeInterval of timeIntervals) {
const date = new Date(currentTimestamp - timeInterval);
const snapshot = await findClosestSnapshot(client, aid, date);
if (!snapshot) continue;
const hoursDiff = (currentTimestamp - snapshot.created_at) / HOUR;
const viewsDiff = snapshot.views - latestSnapshot.views;
const speed = viewsDiff / (hoursDiff + DELTA);
const target = closetMilestone(latestSnapshot.views);
const viewsToIncrease = target - latestSnapshot.views;
const eta = viewsToIncrease / (speed + DELTA);
const factor = log(2.97 / log(viewsToIncrease + 1), 1.14);
const adjustedETA = eta / factor;
if (adjustedETA < minETAHours) {
minETAHours = adjustedETA;
}
}
return minETAHours;
};
export const collectMilestoneSnapshotsWorker = async (_job: Job) => {
const client = await db.connect();
try {
const videos = await getVideosNearMilestone(client);
for (const video of videos) {
if (await videoHasActiveSchedule(client, video.aid)) continue;
const eta = await getAdjustedShortTermETA(client, video.aid);
if (eta > 72) continue;
// TODO: dispatch snapshot job
}
} catch (_e) {
//