diff --git a/packages/crawler/db/snapshotSchedule.ts b/packages/crawler/db/snapshotSchedule.ts index fa56f44..7fb4080 100644 --- a/packages/crawler/db/snapshotSchedule.ts +++ b/packages/crawler/db/snapshotSchedule.ts @@ -2,7 +2,7 @@ import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { SnapshotScheduleType } from "@core/db/schema"; import logger from "log/logger.ts"; import { MINUTE } from "$std/datetime/constants.ts"; -import { redis } from "../../core/db/redis.ts"; +import { redis } from "@core/db/redis.ts"; import { Redis } from "ioredis"; const REDIS_KEY = "cvsa:snapshot_window_counts"; @@ -272,11 +272,17 @@ export async function getSnapshotsInNextSecond(client: Client) { export async function getBulkSnapshotsInNextSecond(client: Client) { const query = ` - SELECT * - FROM snapshot_schedule - WHERE started_at <= NOW() + INTERVAL '15 seconds' AND status = 'pending' AND type = 'normal' - ORDER BY started_at - LIMIT 1000; + SELECT * + FROM snapshot_schedule + WHERE (started_at <= NOW() + INTERVAL '15 seconds') + AND status = 'pending' + AND (type = 'normal' OR type = 'archive') + ORDER BY CASE + WHEN type = 'normal' THEN 1 + WHEN type = 'archive' THEN 2 + END, + started_at + LIMIT 1000; `; const res = await client.queryObject(query, []); return res.rows; @@ -306,3 +312,15 @@ export async function getVideosWithoutActiveSnapshotSchedule(client: Client) { const res = await client.queryObject<{ aid: number }>(query, []); return res.rows.map((r) => Number(r.aid)); } + +export async function getAllVideosWithoutActiveSnapshotSchedule(client: Client) { + const query: string = ` + SELECT s.aid + FROM bilibili_metadata s + LEFT JOIN snapshot_schedule ss ON s.aid = ss.aid AND (ss.status = 'pending' OR ss.status = 'processing') + WHERE ss.aid IS NULL + `; + const res = await client.queryObject<{ aid: number }>(query, []); + return res.rows.map((r) => Number(r.aid)); +} + diff --git a/packages/crawler/mq/exec/snapshotTick.ts b/packages/crawler/mq/exec/snapshotTick.ts index 39f9ca1..9855b1a 100644 --- a/packages/crawler/mq/exec/snapshotTick.ts +++ b/packages/crawler/mq/exec/snapshotTick.ts @@ -6,7 +6,7 @@ import { bulkScheduleSnapshot, bulkSetSnapshotStatus, findClosestSnapshot, - findSnapshotBefore, + findSnapshotBefore, getAllVideosWithoutActiveSnapshotSchedule, getBulkSnapshotsInNextSecond, getLatestSnapshot, getSnapshotsInNextSecond, @@ -28,6 +28,7 @@ import { lockManager } from "mq/lockManager.ts"; import { getSongsPublihsedAt } from "db/songs.ts"; import { bulkGetVideoStats } from "net/bulkGetVideoStats.ts"; import { getAdjustedShortTermETA } from "../scheduling.ts"; +import {SnapshotScheduleType} from "@core/db/schema"; const priorityMap: { [key: string]: number } = { "milestone": 1, @@ -52,13 +53,8 @@ export const bulkSnapshotTickWorker = async (_job: Job) => { const filteredAids = await bulkGetVideosWithoutProcessingSchedules(client, aids); if (filteredAids.length === 0) continue; await bulkSetSnapshotStatus(client, filteredAids, "processing"); - const dataMap: { [key: number]: number } = {}; - for (const schedule of group) { - const id = Number(schedule.id); - dataMap[id] = Number(schedule.aid); - } await SnapshotQueue.add("bulkSnapshotVideo", { - map: dataMap, + schedules: group, }, { priority: 3 }); } return `OK` @@ -146,6 +142,38 @@ const getRegularSnapshotInterval = async (client: Client, aid: number) => { return 6; }; +export const archiveSnapshotsWorker = async (_job: Job) => { + const client = await db.connect(); + const startedAt = Date.now(); + if (await lockManager.isLocked("dispatchArchiveSnapshots")) { + logger.log("dispatchArchiveSnapshots is already running", "mq"); + client.release(); + return; + } + await lockManager.acquireLock("dispatchArchiveSnapshots", 30 * 60); + try { + const aids = await getAllVideosWithoutActiveSnapshotSchedule(client); + for (const rawAid of aids) { + const aid = Number(rawAid); + const latestSnapshot = await getLatestVideoSnapshot(client, aid); + const now = Date.now(); + const lastSnapshotedAt = latestSnapshot?.time ?? now; + const interval = 168; + logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq"); + const targetTime = lastSnapshotedAt + interval * HOUR; + await scheduleSnapshot(client, aid, "archive", targetTime); + if (now - startedAt > 250 * MINUTE) { + return; + } + } + } catch (e) { + logger.error(e as Error, "mq", "fn:archiveSnapshotsWorker"); + } finally { + await lockManager.releaseLock("dispatchArchiveSnapshots"); + client.release(); + } +}; + export const regularSnapshotsWorker = async (_job: Job) => { const client = await db.connect(); const startedAt = Date.now(); @@ -179,13 +207,14 @@ export const regularSnapshotsWorker = async (_job: Job) => { }; export const takeBulkSnapshotForVideosWorker = async (job: Job) => { - const dataMap: { [key: number]: number } = job.data.map; - const ids = Object.keys(dataMap).map((id) => Number(id)); + const schedules: SnapshotScheduleType[] = job.data.schedules; + const ids = schedules.map((schedule) => Number(schedule.id)); const aidsToFetch: number[] = []; const client = await db.connect(); try { - for (const id of ids) { - const aid = Number(dataMap[id]); + for (const schedule of schedules) { + const aid = Number(schedule.aid); + const id = Number(schedule.id); const exists = await snapshotScheduleExists(client, id); if (!exists) { continue; @@ -220,7 +249,11 @@ export const takeBulkSnapshotForVideosWorker = async (job: Job) => { logger.log(`Taken snapshot for video ${aid} in bulk.`, "net", "fn:takeBulkSnapshotForVideosWorker"); } await bulkSetSnapshotStatus(client, ids, "completed"); - for (const aid of aidsToFetch) { + + for (const schedule of schedules) { + const aid = Number(schedule.aid); + const type = schedule.type; + if (type == 'archive') continue; const interval = await getRegularSnapshotInterval(client, aid); logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq"); await scheduleSnapshot(client, aid, "normal", Date.now() + interval * HOUR); diff --git a/packages/crawler/mq/init.ts b/packages/crawler/mq/init.ts index 9db7afc..18703b7 100644 --- a/packages/crawler/mq/init.ts +++ b/packages/crawler/mq/init.ts @@ -1,4 +1,4 @@ -import { MINUTE, SECOND } from "$std/datetime/constants.ts"; +import { HOUR, MINUTE, SECOND } from "$std/datetime/constants.ts"; import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "mq/index.ts"; import logger from "log/logger.ts"; import { initSnapshotWindowCounts } from "db/snapshotSchedule.ts"; @@ -55,6 +55,11 @@ export async function initMQ() { immediately: true, }); + await SnapshotQueue.upsertJobScheduler("dispatchArchiveSnapshots", { + every: 6 * HOUR, + immediately: true, + }); + await SnapshotQueue.upsertJobScheduler("scheduleCleanup", { every: 30 * MINUTE, immediately: true, diff --git a/packages/crawler/src/worker.ts b/packages/crawler/src/worker.ts index cde3f89..9c5a902 100644 --- a/packages/crawler/src/worker.ts +++ b/packages/crawler/src/worker.ts @@ -6,6 +6,7 @@ import { lockManager } from "mq/lockManager.ts"; import { WorkerError } from "mq/schema.ts"; import { getVideoInfoWorker } from "mq/exec/getLatestVideos.ts"; import { + archiveSnapshotsWorker, bulkSnapshotTickWorker, collectMilestoneSnapshotsWorker, regularSnapshotsWorker, @@ -82,6 +83,8 @@ const snapshotWorker = new Worker( return await takeBulkSnapshotForVideosWorker(job); case "bulkSnapshotTick": return await bulkSnapshotTickWorker(job); + case "dispatchArchiveSnapshots": + return await archiveSnapshotsWorker(job); default: break; }