From 3028dc13c7756e5f66ef34ecb19042fad80ec730 Mon Sep 17 00:00:00 2001 From: alikia2x Date: Mon, 24 Mar 2025 02:35:55 +0800 Subject: [PATCH] improve: performance when dispatching jobs --- lib/db/snapshotSchedule.ts | 163 ++++++++++++++++++------------- lib/mq/exec/snapshotTick.ts | 2 +- lib/mq/init.ts | 75 ++++++++------ src/worker.ts | 2 +- test/db/snapshotSchedule.test.ts | 18 ---- 5 files changed, 143 insertions(+), 117 deletions(-) delete mode 100644 test/db/snapshotSchedule.test.ts diff --git a/lib/db/snapshotSchedule.ts b/lib/db/snapshotSchedule.ts index 4377d94..bc8a039 100644 --- a/lib/db/snapshotSchedule.ts +++ b/lib/db/snapshotSchedule.ts @@ -2,7 +2,62 @@ import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { formatTimestampToPsql } from "lib/utils/formatTimestampToPostgre.ts"; import { SnapshotScheduleType } from "./schema.d.ts"; import logger from "lib/log/logger.ts"; -import { MINUTE } from "$std/datetime/constants.ts"; +import { DAY, MINUTE } from "$std/datetime/constants.ts"; +import { redis } from "lib/db/redis.ts"; +import { Redis } from "ioredis"; + +const WINDOW_SIZE = 2880; // 每天 2880 个 5 分钟窗口 +const REDIS_KEY = "cvsa:snapshot_window_counts"; // Redis Key 名称 + +// 获取当前时间对应的窗口索引 +function getCurrentWindowIndex(): number { + const now = new Date(); + const minutesSinceMidnight = now.getHours() * 60 + now.getMinutes(); + const currentWindow = Math.floor(minutesSinceMidnight / 5); + return currentWindow; +} + +// 刷新内存数组 +export async function refreshSnapshotWindowCounts(client: Client, redisClient: Redis) { + const now = new Date(); + const startTime = now.getTime(); + + const result = await client.queryObject<{ window_start: Date; count: number }>` + SELECT + date_trunc('hour', started_at) + + (EXTRACT(minute FROM started_at)::int / 5 * INTERVAL '5 minutes') AS window_start, + COUNT(*) AS count + FROM snapshot_schedule + WHERE started_at >= NOW() AND status = 'pending' AND started_at <= NOW() + INTERVAL '10 days' + GROUP BY 1 + ORDER BY window_start + ` + + await redisClient.del(REDIS_KEY); + + for (const row of result.rows) { + const offset = Math.floor((row.window_start.getTime() - startTime) / (5 * MINUTE)); + if (offset >= 0 && offset < WINDOW_SIZE) { + await redisClient.hset(REDIS_KEY, offset.toString(), Number(row.count)); + } + } +} + +export async function initSnapshotWindowCounts(client: Client, redisClient: Redis) { + await refreshSnapshotWindowCounts(client, redisClient); + setInterval(async () => { + await refreshSnapshotWindowCounts(client, redisClient); + }, 5 * MINUTE); +} + +async function getWindowCount(redisClient: Redis, offset: number): Promise { + const count = await redisClient.hget(REDIS_KEY, offset.toString()); + return count ? parseInt(count, 10) : 0; +} + +async function updateWindowCount(redisClient: Redis, offset: number, increment: number): Promise { + await redisClient.hincrby(REDIS_KEY, offset.toString(), increment); +} export async function snapshotScheduleExists(client: Client, id: number) { const res = await client.queryObject<{ id: number }>( @@ -12,9 +67,6 @@ export async function snapshotScheduleExists(client: Client, id: number) { return res.rows.length > 0; } -/* - Returns true if the specified `aid` has at least one record with "pending" or "processing" status. -*/ export async function videoHasActiveSchedule(client: Client, aid: number) { const res = await client.queryObject<{ status: string }>( `SELECT status FROM snapshot_schedule WHERE aid = $1 AND (status = 'pending' OR status = 'processing')`, @@ -107,8 +159,10 @@ export async function getSnapshotScheduleCountWithinRange(client: Client, start: */ export async function scheduleSnapshot(client: Client, aid: number, type: string, targetTime: number) { if (await videoHasActiveSchedule(client, aid)) return; - const allowedCount = type === "milestone" ? 2000 : 800; - const adjustedTime = await adjustSnapshotTime(client, new Date(targetTime), allowedCount); + let adjustedTime = new Date(targetTime); + if (type !== "milestone") { + adjustedTime = await adjustSnapshotTime(new Date(targetTime), 1000, redis); + } logger.log(`Scheduled snapshot for ${aid} at ${adjustedTime.toISOString()}`, "mq", "fn:scheduleSnapshot"); return client.queryObject( `INSERT INTO snapshot_schedule (aid, type, started_at) VALUES ($1, $2, $3)`, @@ -116,74 +170,51 @@ export async function scheduleSnapshot(client: Client, aid: number, type: string ); } -/** - * Adjust the trigger time of the snapshot to ensure it does not exceed the frequency limit - * @param client PostgreSQL client - * @param expectedStartTime The expected snapshot time - * @param allowedCounts The number of snapshots allowed in a 5-minute window (default: 2000) - * @returns The adjusted actual snapshot time within the first available window - */ export async function adjustSnapshotTime( - client: Client, expectedStartTime: Date, - allowedCounts: number = 2000, + allowedCounts: number = 1000, + redisClient: Redis, ): Promise { - // Query to find the closest available window by checking both past and future windows - const findWindowQuery = ` - WITH base AS ( - SELECT - date_trunc('minute', $1::timestamp) - - (EXTRACT(minute FROM $1::timestamp)::int % 5 * INTERVAL '1 minute') AS base_time - ), - offsets AS ( - SELECT generate_series(-100, 100) AS "offset" - ), - candidate_windows AS ( - SELECT - (base.base_time + ("offset" * INTERVAL '5 minutes')) AS window_start, - ABS("offset") AS distance - FROM base - CROSS JOIN offsets - ) - SELECT - window_start - FROM - candidate_windows cw - LEFT JOIN - snapshot_schedule s - ON - s.started_at >= cw.window_start - AND s.started_at < cw.window_start + INTERVAL '5 minutes' - AND s.status = 'pending' - GROUP BY - cw.window_start, cw.distance - HAVING - COUNT(s.*) < $2 - ORDER BY - cw.distance, cw.window_start - LIMIT 1; - `; + const currentWindow = getCurrentWindowIndex(); - try { - // Execute query to find the first available window - const windowResult = await client.queryObject<{ window_start: Date }>( - findWindowQuery, - [expectedStartTime, allowedCounts], - ); + // 计算目标窗口偏移量 + const targetOffset = Math.floor((expectedStartTime.getTime() - Date.now()) / (5 * 60 * 1000)); - // If no available window found, return original time (may exceed limit) - if (windowResult.rows.length === 0) { - return expectedStartTime; + // 在 Redis 中查找可用窗口 + for (let i = 0; i < WINDOW_SIZE; i++) { + const offset = (currentWindow + targetOffset + i) % WINDOW_SIZE; + const count = await getWindowCount(redisClient, offset); + + if (count < allowedCounts) { + // 找到可用窗口,更新计数 + await updateWindowCount(redisClient, offset, 1); + + // 计算具体时间 + const windowStart = new Date(Date.now() + offset * 5 * 60 * 1000); + const randomDelay = Math.floor(Math.random() * 5 * 60 * 1000); + return new Date(windowStart.getTime() + randomDelay); } + } - // Get the target window start time - const windowStart = windowResult.rows[0].window_start; + // 如果没有找到可用窗口,返回原始时间 + return expectedStartTime; +} - // Add random delay within the 5-minute window to distribute load - const randomDelay = Math.floor(Math.random() * 5 * MINUTE); - return new Date(windowStart.getTime() + randomDelay); - } catch { - return expectedStartTime; // Fallback to original time +export async function cleanupExpiredWindows(redisClient: Redis): Promise { + const now = new Date(); + const startTime = new Date(now.getTime() - 10 * DAY); // 保留最近 10 天的数据 + + // 获取所有窗口索引 + const allOffsets = await redisClient.hkeys(REDIS_KEY); + + // 删除过期窗口 + for (const offsetStr of allOffsets) { + const offset = parseInt(offsetStr, 10); + const windowStart = new Date(startTime.getTime() + offset * 5 * MINUTE); + + if (windowStart < startTime) { + await redisClient.hdel(REDIS_KEY, offsetStr); + } } } diff --git a/lib/mq/exec/snapshotTick.ts b/lib/mq/exec/snapshotTick.ts index 7eb93e2..a9a377a 100644 --- a/lib/mq/exec/snapshotTick.ts +++ b/lib/mq/exec/snapshotTick.ts @@ -148,7 +148,7 @@ export const regularSnapshotsWorker = async (_job: Job) => { const targetTime = truncate(lastSnapshotedAt + 24 * HOUR, now + 1, now + 100000 * WEEK); await scheduleSnapshot(client, aid, "normal", targetTime); if (now - startedAt > 25 * MINUTE) { - return; + return; } } } catch (e) { diff --git a/lib/mq/init.ts b/lib/mq/init.ts index 8466280..b149e76 100644 --- a/lib/mq/init.ts +++ b/lib/mq/init.ts @@ -1,41 +1,54 @@ import { MINUTE, SECOND } from "$std/datetime/constants.ts"; import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "lib/mq/index.ts"; import logger from "lib/log/logger.ts"; +import { initSnapshotWindowCounts } from "lib/db/snapshotSchedule.ts"; +import { db } from "lib/db/init.ts"; +import { redis } from "lib/db/redis.ts"; export async function initMQ() { - await LatestVideosQueue.upsertJobScheduler("getLatestVideos", { - every: 1 * MINUTE, - immediately: true, - }); - await ClassifyVideoQueue.upsertJobScheduler("classifyVideos", { - every: 5 * MINUTE, - immediately: true, - }); - await LatestVideosQueue.upsertJobScheduler("collectSongs", { - every: 3 * MINUTE, - immediately: true, - }); - await SnapshotQueue.upsertJobScheduler("snapshotTick", { - every: 1 * SECOND, - immediately: true, - }, { - opts: { - removeOnComplete: 1, - removeOnFail: 1, - }, - }); + const client = await db.connect(); + try { + await initSnapshotWindowCounts(client, redis); - await SnapshotQueue.upsertJobScheduler("collectMilestoneSnapshots", { - every: 5 * MINUTE, - immediately: true, - }); + await LatestVideosQueue.upsertJobScheduler("getLatestVideos", { + every: 1 * MINUTE, + immediately: true, + }); - await SnapshotQueue.upsertJobScheduler("dispatchRegularSnapshots", { - every: 30 * MINUTE, - immediately: true, - }); + await ClassifyVideoQueue.upsertJobScheduler("classifyVideos", { + every: 5 * MINUTE, + immediately: true, + }); - await SnapshotQueue.removeJobScheduler("scheduleSnapshotTick"); + await LatestVideosQueue.upsertJobScheduler("collectSongs", { + every: 3 * MINUTE, + immediately: true, + }); - logger.log("Message queue initialized."); + await SnapshotQueue.upsertJobScheduler("snapshotTick", { + every: 1 * SECOND, + immediately: true, + }, { + opts: { + removeOnComplete: 1, + removeOnFail: 1, + }, + }); + + await SnapshotQueue.upsertJobScheduler("collectMilestoneSnapshots", { + every: 5 * MINUTE, + immediately: true, + }); + + await SnapshotQueue.upsertJobScheduler("dispatchRegularSnapshots", { + every: 30 * MINUTE, + immediately: true, + }); + + await SnapshotQueue.removeJobScheduler("scheduleSnapshotTick"); + + logger.log("Message queue initialized."); + } finally { + client.release(); + } } diff --git a/src/worker.ts b/src/worker.ts index aa11c25..c2f4d7d 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -94,4 +94,4 @@ snapshotWorker.on("error", (err) => { snapshotWorker.on("closed", async () => { await lockManager.releaseLock("dispatchRegularSnapshots"); -}) +}); diff --git a/test/db/snapshotSchedule.test.ts b/test/db/snapshotSchedule.test.ts deleted file mode 100644 index a5e1d6a..0000000 --- a/test/db/snapshotSchedule.test.ts +++ /dev/null @@ -1,18 +0,0 @@ -import { assertEquals, assertInstanceOf, assertNotEquals } from "@std/assert"; -import { findClosestSnapshot } from "lib/db/snapshotSchedule.ts"; -import { postgresConfig } from "lib/db/pgConfig.ts"; -import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; - -Deno.test("Snapshot Schedule - getShortTermTimeFeaturesForVideo", async () => { - const client = new Client(postgresConfig); - try { - const result = await findClosestSnapshot(client, 247308539, new Date(1741983383000)); - assertNotEquals(result, null); - const created_at = result!.created_at; - const views = result!.views; - assertInstanceOf(created_at, Date); - assertEquals(typeof views, "number"); - } finally { - client.end(); - } -});