From 0be961e70958aafd69605443bd79335dc2bc49fd Mon Sep 17 00:00:00 2001 From: alikia2x Date: Sun, 23 Mar 2025 23:31:16 +0800 Subject: [PATCH] improve: target time finding --- lib/db/snapshot.ts | 9 ++- lib/db/snapshotSchedule.ts | 115 +++++++++++++++++------------------- lib/mq/exec/snapshotTick.ts | 11 +++- lib/mq/init.ts | 2 +- src/worker.ts | 3 +- 5 files changed, 72 insertions(+), 68 deletions(-) diff --git a/lib/db/snapshot.ts b/lib/db/snapshot.ts index 34ffe82..c160d79 100644 --- a/lib/db/snapshot.ts +++ b/lib/db/snapshot.ts @@ -22,11 +22,14 @@ export async function getVideosNearMilestone(client: Client) { } export async function getLatestVideoSnapshot(client: Client, aid: number): Promise { - const queryResult = await client.queryObject(` + const queryResult = await client.queryObject( + ` SELECT * FROM latest_video_snapshot WHERE aid = $1 - `, [aid]); + `, + [aid], + ); if (queryResult.rows.length === 0) { return null; } @@ -35,6 +38,6 @@ export async function getLatestVideoSnapshot(client: Client, aid: number): Promi ...row, aid: Number(row.aid), time: new Date(row.time).getTime(), - } + }; })[0]; } diff --git a/lib/db/snapshotSchedule.ts b/lib/db/snapshotSchedule.ts index 2c56f0d..4377d94 100644 --- a/lib/db/snapshotSchedule.ts +++ b/lib/db/snapshotSchedule.ts @@ -1,8 +1,8 @@ -import {DAY, HOUR, MINUTE} from "$std/datetime/constants.ts"; -import {Client} from "https://deno.land/x/postgres@v0.19.3/mod.ts"; -import {formatTimestampToPsql} from "lib/utils/formatTimestampToPostgre.ts"; -import {SnapshotScheduleType} from "./schema.d.ts"; +import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; +import { formatTimestampToPsql } from "lib/utils/formatTimestampToPostgre.ts"; +import { SnapshotScheduleType } from "./schema.d.ts"; import logger from "lib/log/logger.ts"; +import { MINUTE } from "$std/datetime/constants.ts"; export async function snapshotScheduleExists(client: Client, id: number) { const res = await client.queryObject<{ id: number }>( @@ -120,78 +120,73 @@ export async function scheduleSnapshot(client: Client, aid: number, type: string * Adjust the trigger time of the snapshot to ensure it does not exceed the frequency limit * @param client PostgreSQL client * @param expectedStartTime The expected snapshot time - * @param allowedCounts The number of snapshots allowed in the 5-minutes windows. - * @returns The adjusted actual snapshot time + * @param allowedCounts The number of snapshots allowed in a 5-minute window (default: 2000) + * @returns The adjusted actual snapshot time within the first available window */ export async function adjustSnapshotTime( client: Client, expectedStartTime: Date, - allowedCounts: number = 2000 + allowedCounts: number = 2000, ): Promise { + // Query to find the closest available window by checking both past and future windows const findWindowQuery = ` - WITH windows AS ( - SELECT generate_series( - $1::timestamp, -- Start time: current time truncated to the nearest 5-minute window - $2::timestamp, -- End time: 24 hours after the target time window starts - INTERVAL '5 MINUTES' - ) AS window_start - ) - SELECT w.window_start - FROM windows w - LEFT JOIN snapshot_schedule s ON s.started_at >= w.window_start - AND s.started_at < w.window_start + INTERVAL '5 MINUTES' + WITH base AS ( + SELECT + date_trunc('minute', $1::timestamp) + - (EXTRACT(minute FROM $1::timestamp)::int % 5 * INTERVAL '1 minute') AS base_time + ), + offsets AS ( + SELECT generate_series(-100, 100) AS "offset" + ), + candidate_windows AS ( + SELECT + (base.base_time + ("offset" * INTERVAL '5 minutes')) AS window_start, + ABS("offset") AS distance + FROM base + CROSS JOIN offsets + ) + SELECT + window_start + FROM + candidate_windows cw + LEFT JOIN + snapshot_schedule s + ON + s.started_at >= cw.window_start + AND s.started_at < cw.window_start + INTERVAL '5 minutes' AND s.status = 'pending' - GROUP BY w.window_start - HAVING COUNT(s.*) < ${allowedCounts} - ORDER BY w.window_start - LIMIT 1; - `; - const now = new Date(); - const targetTime = expectedStartTime.getTime(); - let start = new Date(targetTime - 2 * HOUR); - if (start.getTime() <= now.getTime()) { - start = now; - } - const startTruncated = truncateTo5MinInterval(start); - const end = new Date(startTruncated.getTime() + 1 * DAY); + GROUP BY + cw.window_start, cw.distance + HAVING + COUNT(s.*) < $2 + ORDER BY + cw.distance, cw.window_start + LIMIT 1; + `; - const windowResult = await client.queryObject<{ window_start: Date }>( - findWindowQuery, - [startTruncated, end], - ); + try { + // Execute query to find the first available window + const windowResult = await client.queryObject<{ window_start: Date }>( + findWindowQuery, + [expectedStartTime, allowedCounts], + ); + // If no available window found, return original time (may exceed limit) + if (windowResult.rows.length === 0) { + return expectedStartTime; + } - const windowStart = windowResult.rows[0]?.window_start; - if (!windowStart) { - return expectedStartTime; - } + // Get the target window start time + const windowStart = windowResult.rows[0].window_start; - if (windowStart.getTime() > new Date().getTime() + 5 * MINUTE) { + // Add random delay within the 5-minute window to distribute load const randomDelay = Math.floor(Math.random() * 5 * MINUTE); return new Date(windowStart.getTime() + randomDelay); - } else { - return expectedStartTime; + } catch { + return expectedStartTime; // Fallback to original time } } -/** - * Truncate the timestamp to the nearest 5-minute interval - * @param timestamp The timestamp - * @returns The truncated time - */ -function truncateTo5MinInterval(timestamp: Date): Date { - const minutes = timestamp.getMinutes() - (timestamp.getMinutes() % 5); - return new Date( - timestamp.getFullYear(), - timestamp.getMonth(), - timestamp.getDate(), - timestamp.getHours(), - minutes, - 0, - 0, - ); -} - export async function getSnapshotsInNextSecond(client: Client) { const query = ` SELECT * diff --git a/lib/mq/exec/snapshotTick.ts b/lib/mq/exec/snapshotTick.ts index 9843788..b94eb81 100644 --- a/lib/mq/exec/snapshotTick.ts +++ b/lib/mq/exec/snapshotTick.ts @@ -1,10 +1,11 @@ import { Job } from "bullmq"; import { db } from "lib/db/init.ts"; -import {getLatestVideoSnapshot, getVideosNearMilestone} from "lib/db/snapshot.ts"; +import { getLatestVideoSnapshot, getVideosNearMilestone } from "lib/db/snapshot.ts"; import { findClosestSnapshot, getLatestSnapshot, - getSnapshotsInNextSecond, getVideosWithoutActiveSnapshotSchedule, + getSnapshotsInNextSecond, + getVideosWithoutActiveSnapshotSchedule, hasAtLeast2Snapshots, scheduleSnapshot, setSnapshotStatus, @@ -12,7 +13,7 @@ import { videoHasProcessingSchedule, } from "lib/db/snapshotSchedule.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; -import { WEEK, HOUR, MINUTE, SECOND } from "$std/datetime/constants.ts"; +import { HOUR, MINUTE, SECOND, WEEK } from "$std/datetime/constants.ts"; import logger from "lib/log/logger.ts"; import { SnapshotQueue } from "lib/mq/index.ts"; import { insertVideoSnapshot } from "lib/mq/task/getVideoStats.ts"; @@ -107,6 +108,7 @@ const getAdjustedShortTermETA = async (client: Client, aid: number) => { export const collectMilestoneSnapshotsWorker = async (_job: Job) => { const client = await db.connect(); + const startedAt = Date.now(); try { const videos = await getVideosNearMilestone(client); for (const video of videos) { @@ -120,6 +122,9 @@ export const collectMilestoneSnapshotsWorker = async (_job: Job) => { const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval); const targetTime = now + delay; await scheduleSnapshot(client, aid, "milestone", targetTime); + if (now - startedAt > 25 * MINUTE) { + return; + } } } catch (e) { logger.error(e as Error, "mq", "fn:collectMilestoneSnapshotsWorker"); diff --git a/lib/mq/init.ts b/lib/mq/init.ts index 0e377f1..8466280 100644 --- a/lib/mq/init.ts +++ b/lib/mq/init.ts @@ -35,7 +35,7 @@ export async function initMQ() { immediately: true, }); - await SnapshotQueue.removeJobScheduler('scheduleSnapshotTick'); + await SnapshotQueue.removeJobScheduler("scheduleSnapshotTick"); logger.log("Message queue initialized."); } diff --git a/src/worker.ts b/src/worker.ts index 7362864..8198d09 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -6,7 +6,8 @@ import { lockManager } from "lib/mq/lockManager.ts"; import { WorkerError } from "lib/mq/schema.ts"; import { getVideoInfoWorker } from "lib/mq/exec/getLatestVideos.ts"; import { - collectMilestoneSnapshotsWorker, regularSnapshotsWorker, + collectMilestoneSnapshotsWorker, + regularSnapshotsWorker, snapshotTickWorker, takeSnapshotForVideoWorker, } from "lib/mq/exec/snapshotTick.ts";