From bce4161501ba4c233f0a7328b2d60c761bb5a4f1 Mon Sep 17 00:00:00 2001 From: alikia2x Date: Tue, 11 Mar 2025 23:40:15 +0800 Subject: [PATCH] improve: better eta prediction --- lib/db/snapshot.ts | 44 +++++++++++++++++++++++++++++--- lib/mq/exec/snapshotTick.ts | 51 ++++++++++++++++++++++++------------- 2 files changed, 75 insertions(+), 20 deletions(-) diff --git a/lib/db/snapshot.ts b/lib/db/snapshot.ts index 0b46b5e..bdad6e0 100644 --- a/lib/db/snapshot.ts +++ b/lib/db/snapshot.ts @@ -59,20 +59,58 @@ export async function getUnsnapshotedSongs(client: Client) { } export async function getSongSnapshotCount(client: Client, aid: number) { - const queryResult = await client.queryObject<{ count: number }>(` + const queryResult = await client.queryObject<{ count: number }>( + ` SELECT COUNT(*) AS count FROM video_snapshot WHERE aid = $1; - `, [aid]); + `, + [aid], + ); return queryResult.rows[0].count; } +export async function getShortTermEtaPrediction(client: Client, aid: number) { + const queryResult = await client.queryObject<{eta: number}>( + ` + WITH old_snapshot AS ( + SELECT created_at, views + FROM video_snapshot + WHERE aid = $1 AND + NOW() - created_at > '20 min' + ORDER BY created_at DESC + LIMIT 1 + ), + new_snapshot AS ( + SELECT created_at, views + FROM video_snapshot + WHERE aid = $1 + ORDER BY created_at DESC + LIMIT 1 + ) + SELECT + CASE + WHEN n.views > 100000 THEN (1000000 - n.views) / ((n.views - o.views) / (EXTRACT(EPOCH FROM (n.created_at - o.created_at)) + 0.01)) + ELSE (100000 - n.views) / ((n.views - o.views) / (EXTRACT(EPOCH FROM (n.created_at - o.created_at)) + 0.01)) + END AS eta + FROM old_snapshot o, new_snapshot n; + `, + [aid], + ); + if (queryResult.rows.length === 0) { + return null; + } + return queryResult.rows[0].eta; +} + export async function songEligibleForMilestoneSnapshot(client: Client, aid: number) { const count = await getSongSnapshotCount(client, aid); if (count < 2) { return true; } - const queryResult = await client.queryObject<{ views1: number, created_at1: string, views2: number, created_at2: string }>( + const queryResult = await client.queryObject< + { views1: number; created_at1: string; views2: number; created_at2: string } + >( ` WITH latest_snapshot AS ( SELECT diff --git a/lib/mq/exec/snapshotTick.ts b/lib/mq/exec/snapshotTick.ts index 11cb518..12443ff 100644 --- a/lib/mq/exec/snapshotTick.ts +++ b/lib/mq/exec/snapshotTick.ts @@ -2,7 +2,12 @@ import { Job } from "bullmq"; import { MINUTE, SECOND } from "$std/datetime/constants.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { db } from "lib/db/init.ts"; -import { getSongsNearMilestone, getUnsnapshotedSongs, songEligibleForMilestoneSnapshot } from "lib/db/snapshot.ts"; +import { + getShortTermEtaPrediction, + getSongsNearMilestone, + getUnsnapshotedSongs, + songEligibleForMilestoneSnapshot, +} from "lib/db/snapshot.ts"; import { SnapshotQueue } from "lib/mq/index.ts"; import { insertVideoStats } from "lib/mq/task/getVideoStats.ts"; import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts"; @@ -50,11 +55,19 @@ async function processMilestoneSnapshots(client: Client, vidoesNearMilestone: So let i = 0; for (const snapshot of vidoesNearMilestone) { if (await snapshotScheduled(snapshot.aid)) { - logger.silly(`Video ${snapshot.aid} is already scheduled for snapshot`, "mq", "fn:processMilestoneSnapshots"); + logger.silly( + `Video ${snapshot.aid} is already scheduled for snapshot`, + "mq", + "fn:processMilestoneSnapshots", + ); continue; } if (await songEligibleForMilestoneSnapshot(client, snapshot.aid) === false) { - logger.silly(`Video ${snapshot.aid} is not eligible for milestone snapshot`, "mq", "fn:processMilestoneSnapshots"); + logger.silly( + `Video ${snapshot.aid} is not eligible for milestone snapshot`, + "mq", + "fn:processMilestoneSnapshots", + ); continue; } const factor = Math.floor(i / 8); @@ -103,14 +116,14 @@ export const takeSnapshotForMilestoneVideoWorker = async (job: Job) => { const client = await db.connect(); await setSnapshotScheduled(job.data.aid, true, 20 * 60); try { - const { aid, currentViews, snapshotedAt } = job.data; - const lastSnapshoted = snapshotedAt; + const aid: number = job.data.aid; + const currentViews: number = job.data.currentViews; + const lastSnapshoted: string = job.data.snapshotedAt; const stat = await insertVideoStats(client, aid, "snapshotMilestoneVideo"); if (typeof stat === "number") { if (stat === -404 || stat === 62002 || stat == 62012) { await setSnapshotScheduled(aid, true, 6 * 60 * 60); - } - else { + } else { await setSnapshotScheduled(aid, false, 0); } return; @@ -120,12 +133,15 @@ export const takeSnapshotForMilestoneVideoWorker = async (job: Job) => { await setSnapshotScheduled(aid, false, 0); return; } - const DELTA = 0.001; - const intervalSeconds = (Date.now() - parseTimestampFromPsql(lastSnapshoted)) / SECOND; - const viewsIncrement = stat.views - currentViews; - const incrementSpeed = viewsIncrement / (intervalSeconds + DELTA); - const viewsToIncrease = nextMilestone - stat.views; - const eta = viewsToIncrease / (incrementSpeed + DELTA); + let eta = await getShortTermEtaPrediction(client, aid); + if (eta === null) { + const DELTA = 0.001; + const intervalSeconds = (Date.now() - parseTimestampFromPsql(lastSnapshoted)) / SECOND; + const viewsIncrement = stat.views - currentViews; + const incrementSpeed = viewsIncrement / (intervalSeconds + DELTA); + const viewsToIncrease = nextMilestone - stat.views; + eta = viewsToIncrease / (incrementSpeed + DELTA); + } const scheduledNextSnapshotDelay = eta * SECOND / 3; const maxInterval = 20 * MINUTE; const minInterval = 1 * SECOND; @@ -134,7 +150,7 @@ export const takeSnapshotForMilestoneVideoWorker = async (job: Job) => { aid, currentViews: stat.views, snapshotedAt: stat.time, - }, { delay, priority: 1}); + }, { delay, priority: 1 }); await job.updateData({ ...job.data, updatedViews: stat.views, @@ -142,7 +158,9 @@ export const takeSnapshotForMilestoneVideoWorker = async (job: Job) => { etaInMins: eta / 60, }); logger.log( - `Scheduled next milestone snapshot for ${aid} in ${formatSeconds(delay / 1000)}, current views: ${stat.views}`, + `Scheduled next milestone snapshot for ${aid} in ${ + formatSeconds(delay / 1000) + }, current views: ${stat.views}`, "mq", ); } catch (e) { @@ -174,8 +192,7 @@ export const takeSnapshotForVideoWorker = async (job: Job) => { if (typeof stat === "number") { if (stat === -404 || stat === 62002 || stat == 62012) { await setSnapshotScheduled(aid, true, 6 * 60 * 60); - } - else { + } else { await setSnapshotScheduled(aid, false, 0); } return;