From 18fc9752bb06fbdb67cfa28b6b37eeab723f0161 Mon Sep 17 00:00:00 2001 From: alikia2x Date: Sun, 23 Mar 2025 18:46:25 +0800 Subject: [PATCH] fix: several bugs of snapshot scheduling --- lib/db/snapshotSchedule.ts | 49 +++++++++++++++++++++++-------------- lib/mq/exec/snapshotTick.ts | 30 ++++++++++++++++------- lib/mq/init.ts | 5 ++++ src/worker.ts | 2 ++ 4 files changed, 59 insertions(+), 27 deletions(-) diff --git a/lib/db/snapshotSchedule.ts b/lib/db/snapshotSchedule.ts index bd4e805..697a680 100644 --- a/lib/db/snapshotSchedule.ts +++ b/lib/db/snapshotSchedule.ts @@ -2,6 +2,7 @@ import { DAY, MINUTE } from "$std/datetime/constants.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { formatTimestampToPsql } from "lib/utils/formatTimestampToPostgre.ts"; import { SnapshotScheduleType } from "./schema.d.ts"; +import logger from "../log/logger.ts"; /* Returns true if the specified `aid` has at least one record with "pending" or "processing" status. @@ -51,6 +52,14 @@ export async function findClosestSnapshot( }; } +export async function hasAtLeast2Snapshots(client: Client, aid: number) { + const res = await client.queryObject<{ count: number }>( + `SELECT COUNT(*) FROM video_snapshot WHERE aid = $1`, + [aid], + ); + return res.rows[0].count >= 2; +} + export async function getLatestSnapshot(client: Client, aid: number): Promise { const res = await client.queryObject<{ created_at: string; views: number }>( `SELECT created_at, views FROM video_snapshot WHERE aid = $1 ORDER BY created_at DESC LIMIT 1`, @@ -89,10 +98,11 @@ export async function getSnapshotScheduleCountWithinRange(client: Client, start: * @param targetTime Scheduled time for snapshot. (Timestamp in milliseconds) */ export async function scheduleSnapshot(client: Client, aid: number, type: string, targetTime: number) { - const ajustedTime = await adjustSnapshotTime(client, new Date(targetTime)); + const adjustedTime = (await adjustSnapshotTime(client, new Date(targetTime))); + logger.log(`Scheduled snapshot for ${aid} at ${adjustedTime.toISOString()}`, "mq", "fn:scheduleSnapshot"); return client.queryObject( `INSERT INTO snapshot_schedule (aid, type, started_at) VALUES ($1, $2, $3)`, - [aid, type, ajustedTime.toISOString()], + [aid, type, adjustedTime.toISOString()], ); } @@ -124,25 +134,28 @@ export async function adjustSnapshotTime( ORDER BY w.window_start LIMIT 1; `; - for (let i = 0; i < 7; i++) { - const now = new Date(new Date().getTime() + 5 * MINUTE); - const nowTruncated = truncateTo5MinInterval(now); - const currentWindowStart = truncateTo5MinInterval(expectedStartTime); - const end = new Date(currentWindowStart.getTime() + 1 * DAY); + const now = new Date(new Date().getTime() + 5 * MINUTE); + const nowTruncated = truncateTo5MinInterval(now); + const currentWindowStart = truncateTo5MinInterval(expectedStartTime); + const end = new Date(currentWindowStart.getTime() + 1 * DAY); - const windowResult = await client.queryObject<{ window_start: Date }>( - findWindowQuery, - [nowTruncated, end], - ); + const windowResult = await client.queryObject<{ window_start: Date }>( + findWindowQuery, + [nowTruncated, end], + ); - const windowStart = windowResult.rows[0]?.window_start; - if (!windowStart) { - continue; - } - - return windowStart; + const windowStart = windowResult.rows[0]?.window_start; + if (!windowStart) { + return expectedStartTime; + } + + // Returns windowStart if it is within the next 5 minutes + if (windowStart.getTime() > new Date().getTime() + 5 * MINUTE) { + return windowStart + } + else { + return expectedStartTime; } - return expectedStartTime; } /** diff --git a/lib/mq/exec/snapshotTick.ts b/lib/mq/exec/snapshotTick.ts index 453ac65..253629e 100644 --- a/lib/mq/exec/snapshotTick.ts +++ b/lib/mq/exec/snapshotTick.ts @@ -5,6 +5,7 @@ import { findClosestSnapshot, getLatestSnapshot, getSnapshotsInNextSecond, + hasAtLeast2Snapshots, scheduleSnapshot, setSnapshotStatus, videoHasActiveSchedule, @@ -17,6 +18,7 @@ import { SnapshotQueue } from "lib/mq/index.ts"; import { insertVideoSnapshot } from "../task/getVideoStats.ts"; import { NetSchedulerError } from "../scheduler.ts"; import { setBiliVideoStatus } from "../../db/allData.ts"; +import {truncate} from "../../utils/truncate.ts"; const priorityMap: { [key: string]: number } = { "milestone": 1, @@ -36,8 +38,9 @@ export const snapshotTickWorker = async (_job: Job) => { if (schedule.type && priorityMap[schedule.type]) { priority = priorityMap[schedule.type]; } + const aid = Number(schedule.aid); await SnapshotQueue.add("snapshotVideo", { - aid: schedule.aid, + aid: aid, id: schedule.id, type: schedule.type ?? "normal", }, { priority }); @@ -67,8 +70,10 @@ const getAdjustedShortTermETA = async (client: Client, aid: number) => { const latestSnapshot = await getLatestSnapshot(client, aid); // Immediately dispatch a snapshot if there is no snapshot yet if (!latestSnapshot) return 0; + const snapshotsEnough = await hasAtLeast2Snapshots(client, aid); + if (!snapshotsEnough) return 0; - const currentTimestamp = Date.now(); + const currentTimestamp = new Date().getTime() const timeIntervals = [3 * MINUTE, 20 * MINUTE, 1 * HOUR, 3 * HOUR, 6 * HOUR]; const DELTA = 0.00001; let minETAHours = Infinity; @@ -77,13 +82,15 @@ const getAdjustedShortTermETA = async (client: Client, aid: number) => { const date = new Date(currentTimestamp - timeInterval); const snapshot = await findClosestSnapshot(client, aid, date); if (!snapshot) continue; - const hoursDiff = (currentTimestamp - snapshot.created_at) / HOUR; - const viewsDiff = snapshot.views - latestSnapshot.views; + const hoursDiff = (latestSnapshot.created_at - snapshot.created_at) / HOUR; + const viewsDiff = latestSnapshot.views - snapshot.views; + if (viewsDiff <= 0) continue; const speed = viewsDiff / (hoursDiff + DELTA); const target = closetMilestone(latestSnapshot.views); const viewsToIncrease = target - latestSnapshot.views; const eta = viewsToIncrease / (speed + DELTA); - const factor = log(2.97 / log(viewsToIncrease + 1), 1.14); + let factor = log(2.97 / log(viewsToIncrease + 1), 1.14); + factor = truncate(factor, 3, 100) const adjustedETA = eta / factor; if (adjustedETA < minETAHours) { minETAHours = adjustedETA; @@ -97,12 +104,17 @@ export const collectMilestoneSnapshotsWorker = async (_job: Job) => { try { const videos = await getVideosNearMilestone(client); for (const video of videos) { - if (await videoHasActiveSchedule(client, video.aid)) continue; - const eta = await getAdjustedShortTermETA(client, video.aid); + const aid = Number(video.aid) + if (await videoHasActiveSchedule(client, aid)) continue; + const eta = await getAdjustedShortTermETA(client, aid); if (eta > 72) continue; const now = Date.now(); - const targetTime = now + eta * HOUR; - await scheduleSnapshot(client, video.aid, "milestone", targetTime); + const scheduledNextSnapshotDelay = eta * HOUR; + const maxInterval = 60 * MINUTE; + const minInterval = 1 * SECOND; + const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval); + const targetTime = now + delay; + await scheduleSnapshot(client, aid, "milestone", targetTime); } } catch (e) { logger.error(e as Error, "mq", "fn:collectMilestoneSnapshotsWorker"); diff --git a/lib/mq/init.ts b/lib/mq/init.ts index 688dd4a..17619b2 100644 --- a/lib/mq/init.ts +++ b/lib/mq/init.ts @@ -18,6 +18,11 @@ export async function initMQ() { await SnapshotQueue.upsertJobScheduler("snapshotTick", { every: 1 * SECOND, immediately: true, + }, { + opts: { + removeOnComplete: 1, + removeOnFail: 1, + }, }); await SnapshotQueue.upsertJobScheduler("collectMilestoneSnapshots", { every: 5 * MINUTE, diff --git a/src/worker.ts b/src/worker.ts index ba2b510..6781035 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -14,12 +14,14 @@ import { Deno.addSignalListener("SIGINT", async () => { logger.log("SIGINT Received: Shutting down workers...", "mq"); await latestVideoWorker.close(true); + await snapshotWorker.close(true); Deno.exit(); }); Deno.addSignalListener("SIGTERM", async () => { logger.log("SIGTERM Received: Shutting down workers...", "mq"); await latestVideoWorker.close(true); + await snapshotWorker.close(true); Deno.exit(); });