From 2e8ed7ce7014f16bbaac49f20ed43d9320096eef Mon Sep 17 00:00:00 2001 From: alikia2x Date: Thu, 20 Mar 2025 01:57:33 +0800 Subject: [PATCH] add: ETA estimation for short-term snapshot --- lib/db/snapshotSchedule.ts | 116 +++++++++++------------------------- lib/mq/exec/snapshotTick.ts | 44 +++++++++++++- 2 files changed, 77 insertions(+), 83 deletions(-) diff --git a/lib/db/snapshotSchedule.ts b/lib/db/snapshotSchedule.ts index 3b77fce..583d06a 100644 --- a/lib/db/snapshotSchedule.ts +++ b/lib/db/snapshotSchedule.ts @@ -1,101 +1,53 @@ -import { DAY, HOUR, MINUTE, SECOND } from "$std/datetime/constants.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; -/* +/* Returns true if the specified `aid` has at least one record with "pending" or "processing" status. */ export async function videoHasActiveSchedule(client: Client, aid: number) { - const res = await client.queryObject<{ status: string }>( - `SELECT status FROM snapshot_schedule WHERE aid = $1 AND (status = 'pending' OR status = 'processing')`, - [aid], - ); - return res.rows.length > 0; + const res = await client.queryObject<{ status: string }>( + `SELECT status FROM snapshot_schedule WHERE aid = $1 AND (status = 'pending' OR status = 'processing')`, + [aid], + ); + return res.rows.length > 0; } interface Snapshot { - created_at: Date; - views: number; + created_at: number; + views: number; } export async function findClosestSnapshot( - client: Client, - aid: number, - targetTime: Date + client: Client, + aid: number, + targetTime: Date, ): Promise { - const query = ` + const query = ` SELECT created_at, views FROM video_snapshot WHERE aid = $1 ORDER BY ABS(EXTRACT(EPOCH FROM (created_at - $2::timestamptz))) ASC LIMIT 1 `; - const result = await client.queryObject<{ created_at: string; views: number }>( - query, - [aid, targetTime.toISOString()] - ); - if (result.rows.length === 0) return null; - const row = result.rows[0]; - return { - created_at: new Date(row.created_at), - views: row.views, - }; + const result = await client.queryObject<{ created_at: string; views: number }>( + query, + [aid, targetTime.toISOString()], + ); + if (result.rows.length === 0) return null; + const row = result.rows[0]; + return { + created_at: new Date(row.created_at).getTime(), + views: row.views, + }; } -export async function getShortTermTimeFeaturesForVideo( - client: Client, - aid: number, - initialTimestampMiliseconds: number -): Promise { - const initialTime = new Date(initialTimestampMiliseconds); - const timeWindows = [ - [ 5 * MINUTE, 0 * MINUTE], - [ 15 * MINUTE, 0 * MINUTE], - [ 40 * MINUTE, 0 * MINUTE], - [ 1 * HOUR, 0 * HOUR], - [ 2 * HOUR, 1 * HOUR], - [ 3 * HOUR, 2 * HOUR], - [ 3 * HOUR, 0 * HOUR], - [ 6 * HOUR, 0 * HOUR], - [18 * HOUR, 12 * HOUR], - [ 1 * DAY, 0 * DAY], - [ 3 * DAY, 0 * DAY], - [ 7 * DAY, 0 * DAY] - ]; - - const results: number[] = []; - - for (const [windowStart, windowEnd] of timeWindows) { - const targetTimeStart = new Date(initialTime.getTime() - windowStart); - const targetTimeEnd = new Date(initialTime.getTime() - windowEnd); - - const startRecord = await findClosestSnapshot(client, aid, targetTimeStart); - const endRecord = await findClosestSnapshot(client, aid, targetTimeEnd); - - if (!startRecord || !endRecord) { - results.push(NaN); - continue; - } - - const timeDiffSeconds = - (endRecord.created_at.getTime() - startRecord.created_at.getTime()) / 1000; - const windowDuration = windowStart - windowEnd; - - let scale = 0; - if (windowDuration > 0) { - scale = timeDiffSeconds / windowDuration; - } - - const viewsDiff = endRecord.views - startRecord.views; - const adjustedViews = Math.max(viewsDiff, 1); - - let result: number; - if (scale > 0) { - result = Math.log2(adjustedViews / scale + 1); - } else { - result = Math.log2(adjustedViews + 1); - } - - results.push(result); - } - - return results; -} \ No newline at end of file +export async function getLatestSnapshot(client: Client, aid: number): Promise{ + const res = await client.queryObject<{ created_at: string; views: number }>( + `SELECT created_at, views FROM video_snapshot WHERE aid = $1 ORDER BY created_at DESC LIMIT 1`, + [aid], + ); + if (res.rows.length === 0) return null; + const row = res.rows[0]; + return { + created_at: new Date(row.created_at).getTime(), + views: row.views, + } +} diff --git a/lib/mq/exec/snapshotTick.ts b/lib/mq/exec/snapshotTick.ts index 65564d0..9fcc604 100644 --- a/lib/mq/exec/snapshotTick.ts +++ b/lib/mq/exec/snapshotTick.ts @@ -1,7 +1,9 @@ import { Job } from "bullmq"; import { db } from "lib/db/init.ts"; import { getVideosNearMilestone } from "lib/db/snapshot.ts"; -import { videoHasActiveSchedule } from "lib/db/snapshotSchedule.ts"; +import { findClosestSnapshot, getLatestSnapshot, videoHasActiveSchedule } from "lib/db/snapshotSchedule.ts"; +import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; +import { HOUR, MINUTE } from "$std/datetime/constants.ts"; export const snapshotTickWorker = async (_job: Job) => { const client = await db.connect(); @@ -12,12 +14,52 @@ export const snapshotTickWorker = async (_job: Job) => { } }; +export const closetMilestone = (views: number) => { + if (views < 100000) return 100000; + if (views < 1000000) return 1000000; + return 10000000; +}; + +const log = (value: number, base: number = 10) => Math.log(value) / Math.log(base); + +const getAdjustedShortTermETA = async (client: Client, aid: number) => { + const latestSnapshot = await getLatestSnapshot(client, aid); + // Immediately dispatch a snapshot if there is no snapshot yet + if (!latestSnapshot) return 0; + + const currentTimestamp = Date.now(); + const timeIntervals = [20 * MINUTE, 1 * HOUR, 3 * HOUR, 6 * HOUR]; + const DELTA = 0.00001; + let minETAHours = Infinity; + + for (const timeInterval of timeIntervals) { + const date = new Date(currentTimestamp - timeInterval); + const snapshot = await findClosestSnapshot(client, aid, date); + if (!snapshot) continue; + const hoursDiff = (currentTimestamp - snapshot.created_at) / HOUR; + const viewsDiff = snapshot.views - latestSnapshot.views; + const speed = viewsDiff / (hoursDiff + DELTA); + const target = closetMilestone(latestSnapshot.views); + const viewsToIncrease = target - latestSnapshot.views; + const eta = viewsToIncrease / (speed + DELTA); + const factor = log(2.97 / log(viewsToIncrease + 1), 1.14); + const adjustedETA = eta / factor; + if (adjustedETA < minETAHours) { + minETAHours = adjustedETA; + } + } + return minETAHours; +}; + export const collectMilestoneSnapshotsWorker = async (_job: Job) => { const client = await db.connect(); try { const videos = await getVideosNearMilestone(client); for (const video of videos) { if (await videoHasActiveSchedule(client, video.aid)) continue; + const eta = await getAdjustedShortTermETA(client, video.aid); + if (eta > 72) continue; + // TODO: dispatch snapshot job } } catch (_e) { //