diff --git a/packages/crawler/db/snapshotSchedule.ts b/packages/crawler/db/snapshotSchedule.ts index 6fcfdc1..d10340d 100644 --- a/packages/crawler/db/snapshotSchedule.ts +++ b/packages/crawler/db/snapshotSchedule.ts @@ -63,18 +63,6 @@ export async function snapshotScheduleExists(sql: Psql, id: number) { return rows.length > 0; } -export async function videoHasActiveSchedule(sql: Psql, aid: number) { - const rows = await sql<{ status: string }[]>` - SELECT status - FROM snapshot_schedule - WHERE aid = ${aid} - AND (status = 'pending' - OR status = 'processing' - ) - ` - return rows.length > 0; -} - export async function videoHasActiveScheduleWithType(sql: Psql, aid: number, type: string) { const rows = await sql<{ status: string }[]>` SELECT status FROM snapshot_schedule @@ -292,23 +280,23 @@ export async function adjustSnapshotTime( } export async function getSnapshotsInNextSecond(sql: Psql) { - const rows = await sql` - SELECT * - FROM snapshot_schedule - WHERE started_at <= NOW() + INTERVAL '1 seconds' AND status = 'pending' AND type != 'normal' - ORDER BY - CASE - WHEN type = 'milestone' THEN 0 - ELSE 1 - END, - started_at - LIMIT 10; - ` - return rows; + return sql` + SELECT * + FROM snapshot_schedule + WHERE started_at <= NOW() + INTERVAL '1 seconds' + AND status = 'pending' + AND type != 'normal' + ORDER BY CASE + WHEN type = 'milestone' THEN 0 + ELSE 1 + END, + started_at + LIMIT 10; + `; } export async function getBulkSnapshotsInNextSecond(sql: Psql) { - const rows = await sql` + return sql` SELECT * FROM snapshot_schedule WHERE (started_at <= NOW() + INTERVAL '15 seconds') @@ -320,27 +308,33 @@ export async function getBulkSnapshotsInNextSecond(sql: Psql) { END, started_at LIMIT 1000; - ` - return rows; + `; } export async function setSnapshotStatus(sql: Psql, id: number, status: string) { - return await sql` - UPDATE snapshot_schedule SET status = ${status} WHERE id = ${id} + return sql` + UPDATE snapshot_schedule + SET status = ${status} + WHERE id = ${id} `; } export async function bulkSetSnapshotStatus(sql: Psql, ids: number[], status: string) { - return await sql` - UPDATE snapshot_schedule SET status = ${status} WHERE id = ANY(${ids}) + return sql` + UPDATE snapshot_schedule + SET status = ${status} + WHERE id = ANY (${ids}) `; } -export async function getVideosWithoutActiveSnapshotSchedule(sql: Psql) { +export async function getVideosWithoutActiveSnapshotScheduleByType(sql: Psql, type: string) { const rows = await sql<{ aid: string }[]>` SELECT s.aid FROM songs s - LEFT JOIN snapshot_schedule ss ON s.aid = ss.aid AND (ss.status = 'pending' OR ss.status = 'processing') + LEFT JOIN snapshot_schedule ss ON + s.aid = ss.aid AND + (ss.status = 'pending' OR ss.status = 'processing') AND + ss.type = ${type} WHERE ss.aid IS NULL `; return rows.map((r) => Number(r.aid)); diff --git a/packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts b/packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts index 2cbcd08..b17472b 100644 --- a/packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts +++ b/packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts @@ -16,8 +16,8 @@ export const dispatchMilestoneSnapshotsWorker = async (_job: Job) => { if (eta > 144) continue; const now = Date.now(); const scheduledNextSnapshotDelay = eta * HOUR; - const maxInterval = 1 * HOUR; - const minInterval = 1 * SECOND; + const maxInterval = 1.2 * HOUR; + const minInterval = 2 * SECOND; const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval); const targetTime = now + delay; await scheduleSnapshot(sql, aid, "milestone", targetTime); diff --git a/packages/crawler/mq/exec/dispatchRegularSnapshots.ts b/packages/crawler/mq/exec/dispatchRegularSnapshots.ts index 49a7893..3c4feb7 100644 --- a/packages/crawler/mq/exec/dispatchRegularSnapshots.ts +++ b/packages/crawler/mq/exec/dispatchRegularSnapshots.ts @@ -1,7 +1,10 @@ import { Job } from "bullmq"; import { getLatestVideoSnapshot } from "db/snapshot.ts"; import { truncate } from "utils/truncate.ts"; -import { getVideosWithoutActiveSnapshotSchedule, scheduleSnapshot } from "db/snapshotSchedule.ts"; +import { + getVideosWithoutActiveSnapshotScheduleByType, + scheduleSnapshot +} from "db/snapshotSchedule.ts"; import logger from "@core/log/logger.ts"; import { HOUR, MINUTE, WEEK } from "@core/const/time.ts"; import { lockManager } from "@core/mq/lockManager.ts"; @@ -17,7 +20,7 @@ export const dispatchRegularSnapshotsWorker = async (_job: Job): Promise = } await lockManager.acquireLock("dispatchRegularSnapshots", 30 * 60); - const aids = await getVideosWithoutActiveSnapshotSchedule(sql); + const aids = await getVideosWithoutActiveSnapshotScheduleByType(sql, "normal"); for (const rawAid of aids) { const aid = Number(rawAid); const latestSnapshot = await getLatestVideoSnapshot(sql, aid); diff --git a/packages/crawler/mq/exec/snapshotVideo.ts b/packages/crawler/mq/exec/snapshotVideo.ts index 59f05db..e638974 100644 --- a/packages/crawler/mq/exec/snapshotVideo.ts +++ b/packages/crawler/mq/exec/snapshotVideo.ts @@ -78,8 +78,9 @@ export const snapshotVideoWorker = async (job: Job): Promise => { logger.warn( `ETA (${etaHoursString}) too long for milestone snapshot. aid: ${aid}.`, "mq", - "fn:dispatchRegularSnapshotsWorker", + "fn:snapshotVideoWorker", ); + return; } const now = Date.now(); const targetTime = now + eta * HOUR; @@ -92,7 +93,7 @@ export const snapshotVideoWorker = async (job: Job): Promise => { logger.warn( `No available proxy for aid ${job.data.aid}.`, "mq", - "fn:takeSnapshotForVideoWorker", + "fn:snapshotVideoWorker", ); await setSnapshotStatus(sql, id, "no_proxy"); await scheduleSnapshot(sql, aid, type, Date.now() + retryInterval); @@ -102,16 +103,12 @@ export const snapshotVideoWorker = async (job: Job): Promise => { logger.warn( `Failed to proxy request for aid ${job.data.aid}: ${e.message}`, "mq", - "fn:takeSnapshotForVideoWorker", + "fn:snapshotVideoWorker", ); await setSnapshotStatus(sql, id, "failed"); await scheduleSnapshot(sql, aid, type, Date.now() + retryInterval); } - logger.error(e as Error, "mq", "fn:takeSnapshotForVideoWorker"); + logger.error(e as Error, "mq", "fn:snapshotVideoWorker"); await setSnapshotStatus(sql, id, "failed"); } - finally { - await lockManager.releaseLock("dispatchRegularSnapshots"); - }; - return; };