From 8652ac8fb736ab07b227aaa796d6b1fe299dff4c Mon Sep 17 00:00:00 2001 From: alikia2x Date: Sun, 23 Mar 2025 19:45:52 +0800 Subject: [PATCH] fix: bugs of snapshot scheduling --- lib/db/snapshotSchedule.ts | 56 ++++++++++++++++++++++++------------- lib/mq/exec/snapshotTick.ts | 51 +++++++++++++++++++++++---------- lib/mq/init.ts | 6 ++++ src/worker.ts | 5 +++- 4 files changed, 84 insertions(+), 34 deletions(-) diff --git a/lib/db/snapshotSchedule.ts b/lib/db/snapshotSchedule.ts index 697a680..9ce304b 100644 --- a/lib/db/snapshotSchedule.ts +++ b/lib/db/snapshotSchedule.ts @@ -1,7 +1,7 @@ -import { DAY, MINUTE } from "$std/datetime/constants.ts"; -import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; -import { formatTimestampToPsql } from "lib/utils/formatTimestampToPostgre.ts"; -import { SnapshotScheduleType } from "./schema.d.ts"; +import {DAY, HOUR, MINUTE} from "$std/datetime/constants.ts"; +import {Client} from "https://deno.land/x/postgres@v0.19.3/mod.ts"; +import {formatTimestampToPsql} from "lib/utils/formatTimestampToPostgre.ts"; +import {SnapshotScheduleType} from "./schema.d.ts"; import logger from "../log/logger.ts"; /* @@ -98,7 +98,8 @@ export async function getSnapshotScheduleCountWithinRange(client: Client, start: * @param targetTime Scheduled time for snapshot. (Timestamp in milliseconds) */ export async function scheduleSnapshot(client: Client, aid: number, type: string, targetTime: number) { - const adjustedTime = (await adjustSnapshotTime(client, new Date(targetTime))); + const allowedCount = type === "milestone" ? 2000 : 800; + const adjustedTime = await adjustSnapshotTime(client, new Date(targetTime), allowedCount); logger.log(`Scheduled snapshot for ${aid} at ${adjustedTime.toISOString()}`, "mq", "fn:scheduleSnapshot"); return client.queryObject( `INSERT INTO snapshot_schedule (aid, type, started_at) VALUES ($1, $2, $3)`, @@ -110,18 +111,20 @@ export async function scheduleSnapshot(client: Client, aid: number, type: string * Adjust the trigger time of the snapshot to ensure it does not exceed the frequency limit * @param client PostgreSQL client * @param expectedStartTime The expected snapshot time + * @param allowedCounts The number of snapshots allowed in the 5-minutes windows. * @returns The adjusted actual snapshot time */ export async function adjustSnapshotTime( client: Client, expectedStartTime: Date, + allowedCounts: number = 2000 ): Promise { const findWindowQuery = ` WITH windows AS ( SELECT generate_series( - $1::timestamp, -- Start time: current time truncated to the nearest 5-minute window - $2::timestamp, -- End time: 24 hours after the target time window starts - INTERVAL '5 MINUTES' + $1::timestamp, -- Start time: current time truncated to the nearest 5-minute window + $2::timestamp, -- End time: 24 hours after the target time window starts + INTERVAL '5 MINUTES' ) AS window_start ) SELECT w.window_start @@ -130,30 +133,34 @@ export async function adjustSnapshotTime( AND s.started_at < w.window_start + INTERVAL '5 MINUTES' AND s.status = 'pending' GROUP BY w.window_start - HAVING COUNT(s.*) < 2000 + HAVING COUNT(s.*) < ${allowedCounts} ORDER BY w.window_start LIMIT 1; `; - const now = new Date(new Date().getTime() + 5 * MINUTE); - const nowTruncated = truncateTo5MinInterval(now); - const currentWindowStart = truncateTo5MinInterval(expectedStartTime); - const end = new Date(currentWindowStart.getTime() + 1 * DAY); + const now = new Date(); + const targetTime = expectedStartTime.getTime(); + let start = new Date(targetTime - 2 * HOUR); + if (start.getTime() <= now.getTime()) { + start = now; + } + const startTruncated = truncateTo5MinInterval(start); + const end = new Date(startTruncated.getTime() + 1 * DAY); const windowResult = await client.queryObject<{ window_start: Date }>( findWindowQuery, - [nowTruncated, end], + [startTruncated, end], ); + const windowStart = windowResult.rows[0]?.window_start; if (!windowStart) { return expectedStartTime; } - // Returns windowStart if it is within the next 5 minutes if (windowStart.getTime() > new Date().getTime() + 5 * MINUTE) { - return windowStart - } - else { + const randomDelay = Math.floor(Math.random() * 5 * MINUTE); + return new Date(windowStart.getTime() + randomDelay); + } else { return expectedStartTime; } } @@ -189,8 +196,19 @@ export async function getSnapshotsInNextSecond(client: Client) { } export async function setSnapshotStatus(client: Client, id: number, status: string) { - return client.queryObject( + return await client.queryObject( `UPDATE snapshot_schedule SET status = $2 WHERE id = $1`, [id, status], ); } + +export async function getVideosWithoutActiveSnapshotSchedule(client: Client) { + const query: string = ` + SELECT s.aid + FROM songs s + LEFT JOIN snapshot_schedule ss ON s.aid = ss.aid AND (ss.status = 'pending' OR ss.status = 'processing') + WHERE ss.aid IS NULL + `; + const res = await client.queryObject<{ aid: number }>(query, []); + return res.rows.map((r) => Number(r.aid)); +} diff --git a/lib/mq/exec/snapshotTick.ts b/lib/mq/exec/snapshotTick.ts index 253629e..0d1bf44 100644 --- a/lib/mq/exec/snapshotTick.ts +++ b/lib/mq/exec/snapshotTick.ts @@ -4,7 +4,7 @@ import { getVideosNearMilestone } from "lib/db/snapshot.ts"; import { findClosestSnapshot, getLatestSnapshot, - getSnapshotsInNextSecond, + getSnapshotsInNextSecond, getVideosWithoutActiveSnapshotSchedule, hasAtLeast2Snapshots, scheduleSnapshot, setSnapshotStatus, @@ -15,13 +15,14 @@ import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { HOUR, MINUTE, SECOND } from "$std/datetime/constants.ts"; import logger from "lib/log/logger.ts"; import { SnapshotQueue } from "lib/mq/index.ts"; -import { insertVideoSnapshot } from "../task/getVideoStats.ts"; -import { NetSchedulerError } from "../scheduler.ts"; -import { setBiliVideoStatus } from "../../db/allData.ts"; -import {truncate} from "../../utils/truncate.ts"; +import { insertVideoSnapshot } from "lib/mq/task/getVideoStats.ts"; +import { NetSchedulerError } from "lib/mq/scheduler.ts"; +import { setBiliVideoStatus } from "lib/db/allData.ts"; +import { truncate } from "lib/utils/truncate.ts"; const priorityMap: { [key: string]: number } = { "milestone": 1, + "normal": 3, }; const snapshotTypeToTaskMap: { [key: string]: string } = { @@ -73,7 +74,7 @@ const getAdjustedShortTermETA = async (client: Client, aid: number) => { const snapshotsEnough = await hasAtLeast2Snapshots(client, aid); if (!snapshotsEnough) return 0; - const currentTimestamp = new Date().getTime() + const currentTimestamp = new Date().getTime(); const timeIntervals = [3 * MINUTE, 20 * MINUTE, 1 * HOUR, 3 * HOUR, 6 * HOUR]; const DELTA = 0.00001; let minETAHours = Infinity; @@ -90,7 +91,7 @@ const getAdjustedShortTermETA = async (client: Client, aid: number) => { const viewsToIncrease = target - latestSnapshot.views; const eta = viewsToIncrease / (speed + DELTA); let factor = log(2.97 / log(viewsToIncrease + 1), 1.14); - factor = truncate(factor, 3, 100) + factor = truncate(factor, 3, 100); const adjustedETA = eta / factor; if (adjustedETA < minETAHours) { minETAHours = adjustedETA; @@ -104,13 +105,13 @@ export const collectMilestoneSnapshotsWorker = async (_job: Job) => { try { const videos = await getVideosNearMilestone(client); for (const video of videos) { - const aid = Number(video.aid) + const aid = Number(video.aid); if (await videoHasActiveSchedule(client, aid)) continue; const eta = await getAdjustedShortTermETA(client, aid); if (eta > 72) continue; const now = Date.now(); const scheduledNextSnapshotDelay = eta * HOUR; - const maxInterval = 60 * MINUTE; + const maxInterval = 4 * HOUR; const minInterval = 1 * SECOND; const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval); const targetTime = now + delay; @@ -123,11 +124,31 @@ export const collectMilestoneSnapshotsWorker = async (_job: Job) => { } }; +export const regularSnapshotsWorker = async (_job: Job) => { + const client = await db.connect(); + try { + const aids = await getVideosWithoutActiveSnapshotSchedule(client); + for (const rawAid of aids) { + const aid = Number(rawAid); + if (await videoHasActiveSchedule(client, aid)) continue; + const now = Date.now(); + const targetTime = now + 24 * HOUR; + await scheduleSnapshot(client, aid, "normal", targetTime); + } + } catch (e) { + logger.error(e as Error, "mq", "fn:regularSnapshotsWorker"); + } finally { + client.release(); + } +}; + export const takeSnapshotForVideoWorker = async (job: Job) => { const id = job.data.id; - const aid = job.data.aid; - const task = snapshotTypeToTaskMap[job.data.type] ?? "snapshotVideo"; + const aid = Number(job.data.aid); + const type = job.data.type; + const task = snapshotTypeToTaskMap[type] ?? "snapshotVideo"; const client = await db.connect(); + const retryInterval = type === "milestone" ? 5 * SECOND : 2 * MINUTE; try { if (await videoHasProcessingSchedule(client, aid)) { return `ALREADY_PROCESSING`; @@ -139,12 +160,14 @@ export const takeSnapshotForVideoWorker = async (job: Job) => { await setSnapshotStatus(client, id, "completed"); return `BILI_STATUS_${stat}`; } + await setSnapshotStatus(client, id, "completed"); + if (type !== "milestone") return `DONE`; const eta = await getAdjustedShortTermETA(client, aid); if (eta > 72) return "ETA_TOO_LONG"; const now = Date.now(); const targetTime = now + eta * HOUR; - await setSnapshotStatus(client, id, "completed"); - await scheduleSnapshot(client, aid, "milestone", targetTime); + await scheduleSnapshot(client, aid, type, targetTime); + return `DONE`; } catch (e) { if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") { logger.warn( @@ -153,7 +176,7 @@ export const takeSnapshotForVideoWorker = async (job: Job) => { "fn:takeSnapshotForVideoWorker", ); await setSnapshotStatus(client, id, "completed"); - await scheduleSnapshot(client, aid, "milestone", Date.now() + 5 * SECOND); + await scheduleSnapshot(client, aid, type, Date.now() + retryInterval); return; } logger.error(e as Error, "mq", "fn:takeSnapshotForVideoWorker"); diff --git a/lib/mq/init.ts b/lib/mq/init.ts index 17619b2..f019f28 100644 --- a/lib/mq/init.ts +++ b/lib/mq/init.ts @@ -24,10 +24,16 @@ export async function initMQ() { removeOnFail: 1, }, }); + await SnapshotQueue.upsertJobScheduler("collectMilestoneSnapshots", { every: 5 * MINUTE, immediately: true, }); + await SnapshotQueue.upsertJobScheduler("dispatchRegularSnapshots", { + every: 30 * MINUTE, + immediately: true, + }); + logger.log("Message queue initialized."); } diff --git a/src/worker.ts b/src/worker.ts index 6781035..7362864 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -6,7 +6,7 @@ import { lockManager } from "lib/mq/lockManager.ts"; import { WorkerError } from "lib/mq/schema.ts"; import { getVideoInfoWorker } from "lib/mq/exec/getLatestVideos.ts"; import { - collectMilestoneSnapshotsWorker, + collectMilestoneSnapshotsWorker, regularSnapshotsWorker, snapshotTickWorker, takeSnapshotForVideoWorker, } from "lib/mq/exec/snapshotTick.ts"; @@ -76,6 +76,9 @@ const snapshotWorker = new Worker( case "collectMilestoneSnapshots": await collectMilestoneSnapshotsWorker(job); break; + case "dispatchRegularSnapshots": + await regularSnapshotsWorker(job); + break; default: break; }