diff --git a/lib/db/snapshotSchedule.ts b/lib/db/snapshotSchedule.ts index e77f225..4e6e244 100644 --- a/lib/db/snapshotSchedule.ts +++ b/lib/db/snapshotSchedule.ts @@ -217,7 +217,7 @@ export async function getSnapshotsInNextSecond(client: Client) { ELSE 1 END, started_at - LIMIT 3; + LIMIT 10; `; const res = await client.queryObject(query, []); return res.rows; diff --git a/lib/mq/exec/snapshotTick.ts b/lib/mq/exec/snapshotTick.ts index f14ebfd..b96083b 100644 --- a/lib/mq/exec/snapshotTick.ts +++ b/lib/mq/exec/snapshotTick.ts @@ -31,7 +31,7 @@ const priorityMap: { [key: string]: number } = { const snapshotTypeToTaskMap: { [key: string]: string } = { "milestone": "snapshotMilestoneVideo", "normal": "snapshotVideo", - "new": "snapshotMilestoneVideo" + "new": "snapshotMilestoneVideo", }; export const snapshotTickWorker = async (_job: Job) => { @@ -134,7 +134,7 @@ export const collectMilestoneSnapshotsWorker = async (_job: Job) => { const getRegularSnapshotInterval = async (client: Client, aid: number) => { const now = Date.now(); - const date = new Date(now - 24 * HOUR); + const date = new Date(now - 24 * HOUR); const oldSnapshot = await findClosestSnapshot(client, aid, date); const latestSnapshot = await getLatestSnapshot(client, aid); if (!oldSnapshot || !latestSnapshot) return 0; @@ -148,7 +148,7 @@ const getRegularSnapshotInterval = async (client: Client, aid: number) => { if (speedPerDay < 120) return 24; if (speedPerDay < 320) return 12; return 6; -} +}; export const regularSnapshotsWorker = async (_job: Job) => { const client = await db.connect(); @@ -207,26 +207,25 @@ export const takeSnapshotForVideoWorker = async (job: Job) => { const interval = await getRegularSnapshotInterval(client, aid); await scheduleSnapshot(client, aid, type, Date.now() + interval * HOUR); return `DONE`; - } - else if (type === "new") { + } else if (type === "new") { const publihsedAt = await getSongsPublihsedAt(client, aid); const timeSincePublished = stat.time - publihsedAt!; const viewsPerHour = stat.views / timeSincePublished * HOUR; if (timeSincePublished > 48 * HOUR) { - return `DONE` + return `DONE`; } if (timeSincePublished > 2 * HOUR && viewsPerHour < 10) { - return `DONE` + return `DONE`; } let intervalMins = 240; if (viewsPerHour > 50) { - intervalMins = 120; + intervalMins = 120; } if (viewsPerHour > 100) { - intervalMins = 60; + intervalMins = 60; } if (viewsPerHour > 1000) { - intervalMins = 15; + intervalMins = 15; } await scheduleSnapshot(client, aid, type, Date.now() + intervalMins * MINUTE); } @@ -254,3 +253,28 @@ export const takeSnapshotForVideoWorker = async (job: Job) => { client.release(); } }; + +export const scheduleCleanupWorker = async (_job: Job) => { + const client = await db.connect(); + try { + const query = ` + SELECT id, aid, type + FROM snapshot_schedule + WHERE status IN ('pending', 'processing') + AND started_at < NOW() - INTERVAL '5 minutes' + `; + const { rows } = await client.queryObject<{ id: bigint; aid: bigint; type: string }>(query); + if (rows.length === 0) return; + for (const row of rows) { + const id = Number(row.id); + const aid = Number(row.aid); + const type = row.type; + await setSnapshotStatus(client, id, "timeout"); + await scheduleSnapshot(client, aid, type, Date.now() + 10 * SECOND); + } + } catch (e) { + logger.error(e as Error, "mq", "fn:scheduleCleanupWorker"); + } finally { + client.release(); + } +}; diff --git a/src/worker.ts b/src/worker.ts index c2f4d7d..ad0c9b6 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -10,6 +10,7 @@ import { regularSnapshotsWorker, snapshotTickWorker, takeSnapshotForVideoWorker, + scheduleCleanupWorker } from "lib/mq/exec/snapshotTick.ts"; Deno.addSignalListener("SIGINT", async () => { @@ -80,6 +81,9 @@ const snapshotWorker = new Worker( case "dispatchRegularSnapshots": await regularSnapshotsWorker(job); break; + case "scheduleCleanup": + await scheduleCleanupWorker(job); + break; default: break; }