diff --git a/lib/db/snapshotSchedule.ts b/lib/db/snapshotSchedule.ts index 620df60..c056665 100644 --- a/lib/db/snapshotSchedule.ts +++ b/lib/db/snapshotSchedule.ts @@ -256,13 +256,25 @@ export async function getSnapshotsInNextSecond(client: Client) { const query = ` SELECT * FROM snapshot_schedule - WHERE started_at <= NOW() + INTERVAL '1 seconds' AND status = 'pending' + WHERE started_at <= NOW() + INTERVAL '1 seconds' AND status = 'pending' AND type != 'normal' ORDER BY CASE WHEN type = 'milestone' THEN 0 ELSE 1 END, started_at + LIMIT 10; + `; + const res = await client.queryObject(query, []); + return res.rows; +} + +export async function getBulkSnapshotsInNextSecond(client: Client) { + const query = ` + SELECT * + FROM snapshot_schedule + WHERE started_at <= NOW() + INTERVAL '15 seconds' AND status = 'pending' AND type = 'normal' + ORDER BY started_at LIMIT 100; `; const res = await client.queryObject(query, []); diff --git a/lib/mq/exec/snapshotTick.ts b/lib/mq/exec/snapshotTick.ts index a4aa147..319a20c 100644 --- a/lib/mq/exec/snapshotTick.ts +++ b/lib/mq/exec/snapshotTick.ts @@ -15,6 +15,7 @@ import { setSnapshotStatus, snapshotScheduleExists, videoHasProcessingSchedule, + getBulkSnapshotsInNextSecond } from "lib/db/snapshotSchedule.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { HOUR, MINUTE, SECOND, WEEK } from "$std/datetime/constants.ts"; @@ -39,10 +40,10 @@ const snapshotTypeToTaskMap: { [key: string]: string } = { "new": "snapshotMilestoneVideo", }; -export const snapshotTickWorker = async (_job: Job) => { +export const bulkSnapshotTickWorker = async (_job: Job) => { const client = await db.connect(); try { - const schedules = await getSnapshotsInNextSecond(client); + const schedules = await getBulkSnapshotsInNextSecond(client); const count = schedules.length; const groups = Math.ceil(count / 30); for (let i = 0; i < groups; i++) { @@ -60,6 +61,17 @@ export const snapshotTickWorker = async (_job: Job) => { map: dataMap, }, { priority: 3 }); } + } catch (e) { + logger.error(e as Error); + } finally { + client.release(); + } +}; + +export const snapshotTickWorker = async (_job: Job) => { + const client = await db.connect(); + try { + const schedules = await getSnapshotsInNextSecond(client); for (const schedule of schedules) { if (await videoHasProcessingSchedule(client, Number(schedule.aid))) { return `ALREADY_PROCESSING`; diff --git a/lib/mq/init.ts b/lib/mq/init.ts index e416988..d408f8e 100644 --- a/lib/mq/init.ts +++ b/lib/mq/init.ts @@ -35,6 +35,16 @@ export async function initMQ() { }, }); + await SnapshotQueue.upsertJobScheduler("bulkSnapshotTick", { + every: 15 * SECOND, + immediately: true, + }, { + opts: { + removeOnComplete: 1, + removeOnFail: 1, + }, + }); + await SnapshotQueue.upsertJobScheduler("collectMilestoneSnapshots", { every: 5 * MINUTE, immediately: true, diff --git a/src/worker.ts b/src/worker.ts index 883ec74..fae7b6a 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -11,7 +11,8 @@ import { snapshotTickWorker, takeSnapshotForVideoWorker, scheduleCleanupWorker, - takeBulkSnapshotForVideosWorker + takeBulkSnapshotForVideosWorker, + bulkSnapshotTickWorker } from "lib/mq/exec/snapshotTick.ts"; Deno.addSignalListener("SIGINT", async () => { @@ -88,6 +89,9 @@ const snapshotWorker = new Worker( case "bulkSnapshotVideo": await takeBulkSnapshotForVideosWorker(job); break; + case "bulkSnapshotTick": + await bulkSnapshotTickWorker(job); + break; default: break; }