diff --git a/lib/db/snapshotSchedule.ts b/lib/db/snapshotSchedule.ts index 158fef9..9c64d1d 100644 --- a/lib/db/snapshotSchedule.ts +++ b/lib/db/snapshotSchedule.ts @@ -84,6 +84,14 @@ export async function videoHasProcessingSchedule(client: Client, aid: number) { return res.rows.length > 0; } +export async function bulkGetVideosWithoutProcessingSchedules(client: Client, aids: number[]) { + const res = await client.queryObject<{ aid: number }>( + `SELECT aid FROM snapshot_schedule WHERE aid = ANY($1) AND status != 'processing' GROUP BY aid`, + [aids], + ); + return res.rows.map((row) => row.aid); +} + interface Snapshot { created_at: number; views: number; @@ -196,6 +204,12 @@ export async function scheduleSnapshot(client: Client, aid: number, type: string ); } +export async function bulkScheduleSnapshot(client: Client, aids: number[], type: string, targetTime: number, force: boolean = false) { + for (const aid of aids) { + await scheduleSnapshot(client, aid, type, targetTime, force); + } +} + export async function adjustSnapshotTime( expectedStartTime: Date, allowedCounts: number = 1000, diff --git a/lib/mq/exec/snapshotTick.ts b/lib/mq/exec/snapshotTick.ts index f29dfce..3f1203b 100644 --- a/lib/mq/exec/snapshotTick.ts +++ b/lib/mq/exec/snapshotTick.ts @@ -2,6 +2,8 @@ import { Job } from "bullmq"; import { db } from "lib/db/init.ts"; import { getLatestVideoSnapshot, getVideosNearMilestone } from "lib/db/snapshot.ts"; import { + bulkGetVideosWithoutProcessingSchedules, + bulkScheduleSnapshot, bulkSetSnapshotStatus, findClosestSnapshot, findSnapshotBefore, @@ -41,6 +43,22 @@ export const snapshotTickWorker = async (_job: Job) => { const client = await db.connect(); try { const schedules = await getSnapshotsInNextSecond(client); + const count = schedules.length; + const groups = Math.ceil(count / 30); + for (let i = 0; i < groups; i++) { + const group = schedules.slice(i * 30, (i + 1) * 30); + const aids = group.map((schedule) => schedule.aid); + const filteredAids = await bulkGetVideosWithoutProcessingSchedules(client, aids); + if (filteredAids.length === 0) continue; + await bulkSetSnapshotStatus(client, filteredAids, "processing"); + const dataMap: { [key: number]: number } = {}; + for (const schedule of group) { + dataMap[schedule.id] = schedule.aid; + } + await SnapshotQueue.add("bulkSnapshotVideo", { + map: dataMap, + }, { priority: 3 }); + } for (const schedule of schedules) { if (await videoHasProcessingSchedule(client, schedule.aid)) { return `ALREADY_PROCESSING`; @@ -174,7 +192,7 @@ export const regularSnapshotsWorker = async (_job: Job) => { const now = Date.now(); const lastSnapshotedAt = latestSnapshot?.time ?? now; const interval = await getRegularSnapshotInterval(client, aid); - logger.log(`Schedule regular snapshot for aid ${aid} in ${interval} hours.`, "mq") + logger.log(`Schedule regular snapshot for aid ${aid} in ${interval} hours.`, "mq"); const targetTime = truncate(lastSnapshotedAt + interval * HOUR, now + 1, now + 100000 * WEEK); await scheduleSnapshot(client, aid, "normal", targetTime); if (now - startedAt > 25 * MINUTE) { @@ -190,7 +208,7 @@ export const regularSnapshotsWorker = async (_job: Job) => { }; export const takeBulkSnapshotForVideosWorker = async (job: Job) => { - const dataMap: {[key: number]: number} = job.data.map; + const dataMap: { [key: number]: number } = job.data.map; const ids = Object.keys(dataMap).map((id) => Number(id)); const aidsToFetch: number[] = []; const client = await db.connect(); @@ -199,20 +217,47 @@ export const takeBulkSnapshotForVideosWorker = async (job: Job) => { const aid = Number(dataMap[id]); const exists = await snapshotScheduleExists(client, id); if (!exists) { - continue + continue; } aidsToFetch.push(aid); } const data = await bulkGetVideoStats(aidsToFetch); if (typeof data === "number") { await bulkSetSnapshotStatus(client, ids, "failed"); + await bulkScheduleSnapshot(client, aidsToFetch, "normal", Date.now() + 15 * SECOND); return `GET_BILI_STATUS_${data}`; } - } - finally { + for (const video of data) { + const aid = video.id; + const stat = video.cnt_info; + const views = stat.play; + const danmakus = stat.danmaku; + const replies = stat.reply; + const likes = stat.thumb_up; + const coins = stat.coin; + const shares = stat.share; + const favorites = stat.collect; + const query: string = ` + INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + `; + await client.queryObject( + query, + [aid, views, danmakus, replies, likes, coins, shares, favorites], + ); + + logger.log(`Taken snapshot for video ${aid} in bulk.`, "net", "fn:takeBulkSnapshotForVideosWorker"); + } + for (const aid of aidsToFetch) { + const interval = await getRegularSnapshotInterval(client, aid); + logger.log(`Schedule regular snapshot for aid ${aid} in ${interval} hours.`, "mq"); + await scheduleSnapshot(client, aid, "normal", Date.now() + interval * HOUR); + } + return `DONE`; + } finally { client.release(); } -} +}; export const takeSnapshotForVideoWorker = async (job: Job) => { const id = job.data.id; @@ -242,7 +287,7 @@ export const takeSnapshotForVideoWorker = async (job: Job) => { await setSnapshotStatus(client, id, "completed"); if (type === "normal") { const interval = await getRegularSnapshotInterval(client, aid); - logger.log(`Schedule regular snapshot for aid ${aid} in ${interval} hours.`, "mq") + logger.log(`Schedule regular snapshot for aid ${aid} in ${interval} hours.`, "mq"); await scheduleSnapshot(client, aid, type, Date.now() + interval * HOUR); return `DONE`; } else if (type === "new") { @@ -309,7 +354,11 @@ export const scheduleCleanupWorker = async (_job: Job) => { const type = row.type; await setSnapshotStatus(client, id, "timeout"); await scheduleSnapshot(client, aid, type, Date.now() + 10 * SECOND); - logger.log(`Schedule ${id} has no response received for 5 minutes, rescheduled.`, "mq", "fn:scheduleCleanupWorker") + logger.log( + `Schedule ${id} has no response received for 5 minutes, rescheduled.`, + "mq", + "fn:scheduleCleanupWorker", + ); } } catch (e) { logger.error(e as Error, "mq", "fn:scheduleCleanupWorker"); diff --git a/src/worker.ts b/src/worker.ts index c8cbb6d..883ec74 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -10,7 +10,8 @@ import { regularSnapshotsWorker, snapshotTickWorker, takeSnapshotForVideoWorker, - scheduleCleanupWorker + scheduleCleanupWorker, + takeBulkSnapshotForVideosWorker } from "lib/mq/exec/snapshotTick.ts"; Deno.addSignalListener("SIGINT", async () => { @@ -84,6 +85,9 @@ const snapshotWorker = new Worker( case "scheduleCleanup": await scheduleCleanupWorker(job); break; + case "bulkSnapshotVideo": + await takeBulkSnapshotForVideosWorker(job); + break; default: break; }