From 7ac2d2c21753d9f589b157b4f100a5565a302b6a Mon Sep 17 00:00:00 2001 From: alikia2x Date: Sun, 9 Mar 2025 19:36:19 +0800 Subject: [PATCH] update: completed snapshot for videos close to milestone --- lib/db/snapshot.ts | 84 +++++++++++++++++++++++++++++++++++-- lib/log/logger.ts | 7 +++- lib/mq/exec/snapshotTick.ts | 72 +++++++++++++++++++------------ lib/mq/scheduler.ts | 4 ++ lib/utils/formatSeconds.ts | 9 ++++ src/worker.ts | 6 +-- 6 files changed, 146 insertions(+), 36 deletions(-) create mode 100644 lib/utils/formatSeconds.ts diff --git a/lib/db/snapshot.ts b/lib/db/snapshot.ts index 9002894..0b46b5e 100644 --- a/lib/db/snapshot.ts +++ b/lib/db/snapshot.ts @@ -1,5 +1,7 @@ +import { DAY, SECOND } from "$std/datetime/constants.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { VideoSnapshotType } from "lib/db/schema.d.ts"; +import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts"; export async function getSongsNearMilestone(client: Client) { const queryResult = await client.queryObject(` @@ -42,16 +44,92 @@ export async function getSongsNearMilestone(client: Client) { return { ...row, aid: Number(row.aid), - } + }; }); } export async function getUnsnapshotedSongs(client: Client) { - const queryResult = await client.queryObject<{aid: bigint}>(` + const queryResult = await client.queryObject<{ aid: bigint }>(` SELECT DISTINCT s.aid FROM songs s LEFT JOIN video_snapshot v ON s.aid = v.aid WHERE v.aid IS NULL; `); - return queryResult.rows.map(row => Number(row.aid)); + return queryResult.rows.map((row) => Number(row.aid)); +} + +export async function getSongSnapshotCount(client: Client, aid: number) { + const queryResult = await client.queryObject<{ count: number }>(` + SELECT COUNT(*) AS count + FROM video_snapshot + WHERE aid = $1; + `, [aid]); + return queryResult.rows[0].count; +} + +export async function songEligibleForMilestoneSnapshot(client: Client, aid: number) { + const count = await getSongSnapshotCount(client, aid); + if (count < 2) { + return true; + } + const queryResult = await client.queryObject<{ views1: number, created_at1: string, views2: number, created_at2: string }>( + ` + WITH latest_snapshot AS ( + SELECT + aid, + views, + created_at + FROM video_snapshot + WHERE aid = $1 + ORDER BY created_at DESC + LIMIT 1 + ), + pairs AS ( + SELECT + a.views AS views1, + a.created_at AS created_at1, + b.views AS views2, + b.created_at AS created_at2, + (b.created_at - a.created_at) AS interval + FROM video_snapshot a + JOIN latest_snapshot b + ON a.aid = b.aid + AND a.created_at < b.created_at + ) + SELECT + views1, + created_at1, + views2, + created_at2 + FROM ( + SELECT + *, + ROW_NUMBER() OVER ( + ORDER BY + CASE WHEN interval <= INTERVAL '3 days' THEN 0 ELSE 1 END, + CASE WHEN interval <= INTERVAL '3 days' THEN -interval ELSE interval END + ) AS rn + FROM pairs + ) ranked + WHERE rn = 1; + `, + [aid], + ); + if (queryResult.rows.length === 0) { + return true; + } + const recentViewsData = queryResult.rows[0]; + const time1 = parseTimestampFromPsql(recentViewsData.created_at1); + const time2 = parseTimestampFromPsql(recentViewsData.created_at2); + const intervalSec = (time2 - time1) / SECOND; + const views1 = recentViewsData.views1; + const views2 = recentViewsData.views2; + const viewsDiff = views2 - views1; + if (viewsDiff == 0) { + return false; + } + const nextMilestone = views2 >= 100000 ? 1000000 : 100000; + const expectedViewsDiff = nextMilestone - views2; + const expectedIntervalSec = expectedViewsDiff / viewsDiff * intervalSec; + return expectedIntervalSec <= 3 * DAY; } diff --git a/lib/log/logger.ts b/lib/log/logger.ts index bbaafb8..10a741a 100644 --- a/lib/log/logger.ts +++ b/lib/log/logger.ts @@ -52,7 +52,7 @@ const createTransport = (level: string, filename: string) => { }); }; -const verboseLogPath = Deno.env.get("LOG_VERBOSE") ?? "logs/verbose.log"; +const sillyLogPath = Deno.env.get("LOG_VERBOSE") ?? "logs/verbose.log"; const warnLogPath = Deno.env.get("LOG_WARN") ?? "logs/warn.log"; const errorLogPath = Deno.env.get("LOG_ERROR") ?? "logs/error.log"; @@ -68,13 +68,16 @@ const winstonLogger = winston.createLogger({ customFormat, ), }), - createTransport("verbose", verboseLogPath), + createTransport("silly", sillyLogPath), createTransport("warn", warnLogPath), createTransport("error", errorLogPath), ], }); const logger = { + silly: (message: string, service?: string, codePath?: string) => { + winstonLogger.silly(message, { service, codePath }); + }, verbose: (message: string, service?: string, codePath?: string) => { winstonLogger.verbose(message, { service, codePath }); }, diff --git a/lib/mq/exec/snapshotTick.ts b/lib/mq/exec/snapshotTick.ts index ad4faa4..c7581b4 100644 --- a/lib/mq/exec/snapshotTick.ts +++ b/lib/mq/exec/snapshotTick.ts @@ -1,13 +1,15 @@ import { Job } from "bullmq"; import { MINUTE, SECOND } from "$std/datetime/constants.ts"; +import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { db } from "lib/db/init.ts"; -import { getSongsNearMilestone, getUnsnapshotedSongs } from "lib/db/snapshot.ts"; +import { getSongsNearMilestone, getUnsnapshotedSongs, songEligibleForMilestoneSnapshot } from "lib/db/snapshot.ts"; import { SnapshotQueue } from "lib/mq/index.ts"; import { insertVideoStats } from "lib/mq/task/getVideoStats.ts"; import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts"; import { redis } from "lib/db/redis.ts"; import { NetSchedulerError } from "lib/mq/scheduler.ts"; import logger from "lib/log/logger.ts"; +import { formatSeconds } from "lib/utils/formatSeconds.ts"; async function snapshotScheduled(aid: number) { try { @@ -43,19 +45,24 @@ interface SongNearMilestone { replies: number; } -async function processMilestoneSnapshots(vidoesNearMilestone: SongNearMilestone[]) { +async function processMilestoneSnapshots(client: Client, vidoesNearMilestone: SongNearMilestone[]) { let i = 0; for (const snapshot of vidoesNearMilestone) { if (await snapshotScheduled(snapshot.aid)) { + logger.silly(`Video ${snapshot.aid} is already scheduled for snapshot`, "mq", "fn:processMilestoneSnapshots"); + continue; + } + if (await songEligibleForMilestoneSnapshot(client, snapshot.aid) === false) { + logger.silly(`Video ${snapshot.aid} is not eligible for milestone snapshot`, "mq", "fn:processMilestoneSnapshots"); continue; } const factor = Math.floor(i / 8); const delayTime = factor * SECOND * 2; - SnapshotQueue.add("snapshotMilestoneVideo", { + await SnapshotQueue.add("snapshotMilestoneVideo", { aid: snapshot.aid, currentViews: snapshot.views, snapshotedAt: snapshot.created_at, - }, { delay: delayTime }); + }, { delay: delayTime, priority: 1 }); await setSnapshotScheduled(snapshot.aid, true, 20 * 60); i++; } @@ -65,13 +72,14 @@ async function processUnsnapshotedVideos(unsnapshotedVideos: number[]) { let i = 0; for (const aid of unsnapshotedVideos) { if (await snapshotScheduled(aid)) { + logger.silly(`Video ${aid} is already scheduled for snapshot`, "mq", "fn:processUnsnapshotedVideos"); continue; } const factor = Math.floor(i / 5); const delayTime = factor * SECOND * 4; - SnapshotQueue.add("snapshotVideo", { + await SnapshotQueue.add("snapshotVideo", { aid, - }, { delay: delayTime }); + }, { delay: delayTime, priority: 3 }); await setSnapshotScheduled(aid, true, 6 * 60 * 60); i++; } @@ -81,7 +89,7 @@ export const snapshotTickWorker = async (_job: Job) => { const client = await db.connect(); try { const vidoesNearMilestone = await getSongsNearMilestone(client); - await processMilestoneSnapshots(vidoesNearMilestone); + await processMilestoneSnapshots(client, vidoesNearMilestone); const unsnapshotedVideos = await getUnsnapshotedSongs(client); await processUnsnapshotedVideos(unsnapshotedVideos); @@ -94,30 +102,42 @@ export const takeSnapshotForMilestoneVideoWorker = async (job: Job) => { const client = await db.connect(); await setSnapshotScheduled(job.data.aid, true, 20 * 60); try { - const { aid, currentViews, lastSnapshoted } = job.data; + const { aid, currentViews, snapshotedAt } = job.data; + const lastSnapshoted = snapshotedAt; const stat = await insertVideoStats(client, aid, "snapshotMilestoneVideo"); if (stat == null) { - setSnapshotScheduled(aid, false, 0); + await setSnapshotScheduled(aid, false, 0); return; } const nextMilestone = currentViews >= 100000 ? 1000000 : 100000; if (stat.views >= nextMilestone) { - setSnapshotScheduled(aid, false, 0); + await setSnapshotScheduled(aid, false, 0); return; } + const DELTA = 0.001; const intervalSeconds = (Date.now() - parseTimestampFromPsql(lastSnapshoted)) / SECOND; const viewsIncrement = stat.views - currentViews; - const incrementSpeed = viewsIncrement / intervalSeconds; + const incrementSpeed = viewsIncrement / (intervalSeconds + DELTA); const viewsToIncrease = nextMilestone - stat.views; - const eta = viewsToIncrease / incrementSpeed; + const eta = viewsToIncrease / (incrementSpeed + DELTA); const scheduledNextSnapshotDelay = eta * SECOND / 3; const maxInterval = 20 * MINUTE; const delay = Math.min(scheduledNextSnapshotDelay, maxInterval); - SnapshotQueue.add("snapshotMilestoneVideo", { + await SnapshotQueue.add("snapshotMilestoneVideo", { aid, currentViews: stat.views, snapshotedAt: stat.time, - }, { delay }); + }, { delay, priority: 1}); + await job.updateData({ + ...job.data, + updatedViews: stat.views, + updatedTime: new Date(stat.time).toISOString(), + etaInMins: eta / 60, + }); + logger.log( + `Scheduled next milestone snapshot for ${aid} in ${formatSeconds(delay / 1000)}, current views: ${stat.views}`, + "mq", + ); } catch (e) { if (e instanceof NetSchedulerError && e.code === "NO_AVAILABLE_PROXY") { logger.warn( @@ -125,11 +145,11 @@ export const takeSnapshotForMilestoneVideoWorker = async (job: Job) => { "mq", "fn:takeSnapshotForMilestoneVideoWorker", ); - SnapshotQueue.add("snapshotMilestoneVideo", { + await SnapshotQueue.add("snapshotMilestoneVideo", { aid: job.data.aid, currentViews: job.data.currentViews, snapshotedAt: job.data.snapshotedAt, - }, { delay: 5 * SECOND }); + }, { delay: 5 * SECOND, priority: 1 }); return; } throw e; @@ -144,29 +164,29 @@ export const takeSnapshotForVideoWorker = async (job: Job) => { try { const { aid } = job.data; const stat = await insertVideoStats(client, aid, "getVideoInfo"); + logger.log(`Taken snapshot for ${aid}`, "mq"); if (stat == null) { setSnapshotScheduled(aid, false, 0); return; } + await job.updateData({ + ...job.data, + updatedViews: stat.views, + updatedTime: new Date(stat.time).toISOString(), + }); const nearMilestone = (stat.views >= 90000 && stat.views < 100000) || (stat.views >= 900000 && stat.views < 1000000); if (nearMilestone) { - SnapshotQueue.add("snapshotMilestoneVideo", { + await SnapshotQueue.add("snapshotMilestoneVideo", { aid, currentViews: stat.views, snapshotedAt: stat.time, - }, { delay: 0 }); + }, { delay: 0, priority: 1 }); } + await setSnapshotScheduled(aid, false, 0); } catch (e) { if (e instanceof NetSchedulerError && e.code === "NO_AVAILABLE_PROXY") { - logger.warn( - `No available proxy for aid ${job.data.aid}.`, - "mq", - "fn:takeSnapshotForMilestoneVideoWorker", - ); - SnapshotQueue.add("snapshotVideo", { - aid: job.data.aid, - }, { delay: 10 * SECOND }); + await setSnapshotScheduled(job.data.aid, false, 0); return; } throw e; diff --git a/lib/mq/scheduler.ts b/lib/mq/scheduler.ts index aa2d11f..2b7d7ce 100644 --- a/lib/mq/scheduler.ts +++ b/lib/mq/scheduler.ts @@ -311,6 +311,10 @@ const biliLimiterConfig: RateLimiterConfig[] = [ window: new SlidingWindow(redis, 1), max: 6, }, + { + window: new SlidingWindow(redis, 5), + max: 20, + }, { window: new SlidingWindow(redis, 30), max: 100, diff --git a/lib/utils/formatSeconds.ts b/lib/utils/formatSeconds.ts new file mode 100644 index 0000000..ffabb22 --- /dev/null +++ b/lib/utils/formatSeconds.ts @@ -0,0 +1,9 @@ +export const formatSeconds = (seconds: number) => { + if (seconds < 60) { + return `${(seconds).toFixed(1)}s`; + } + if (seconds < 3600) { + return `${Math.floor(seconds / 60)}m${seconds % 60}s`; + } + return `${Math.floor(seconds / 3600)}h ${((seconds % 3600) / 60).toFixed(2)}m`; +}; diff --git a/src/worker.ts b/src/worker.ts index 51d3bf8..fd68339 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -69,13 +69,9 @@ const snapshotWorker = new Worker( break; } }, - { connection: redis, concurrency: 20, removeOnComplete: { count: 1440 } }, + { connection: redis, concurrency: 10, removeOnComplete: { count: 2000 } }, ); -snapshotWorker.on("active", () => { - logger.log("Worker (snapshot) activated.", "mq"); -}) - snapshotWorker.on("error", (err) => { const e = err as WorkerError; logger.error(e.rawError, e.service, e.codePath);