From da1bea7f410910a0f11d3543a1068add136fb004 Mon Sep 17 00:00:00 2001 From: alikia2x Date: Mon, 7 Apr 2025 00:28:28 +0800 Subject: [PATCH] fix: unlimited addition of milestone tasks --- packages/crawler/db/snapshotSchedule.ts | 10 +++++++++- packages/crawler/mq/exec/snapshotTick.ts | 4 +++- packages/crawler/mq/init.ts | 8 ++++---- 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/packages/crawler/db/snapshotSchedule.ts b/packages/crawler/db/snapshotSchedule.ts index b8aec48..f06d0c9 100644 --- a/packages/crawler/db/snapshotSchedule.ts +++ b/packages/crawler/db/snapshotSchedule.ts @@ -71,6 +71,14 @@ export async function videoHasActiveSchedule(client: Client, aid: number) { return res.rows.length > 0; } +export async function videoHasActiveScheduleWithType(client: Client, aid: number, type: string) { + const res = await client.queryObject<{ status: string }>( + `SELECT status FROM snapshot_schedule WHERE aid = $1 AND (status = 'pending' OR status = 'processing') AND type = $2`, + [aid, type], + ); + return res.rows.length > 0; +} + export async function videoHasProcessingSchedule(client: Client, aid: number) { const res = await client.queryObject<{ status: string }>( `SELECT status FROM snapshot_schedule WHERE aid = $1 AND status = 'processing'`, @@ -193,7 +201,7 @@ export async function scheduleSnapshot( targetTime: number, force: boolean = false, ) { - if (await videoHasActiveSchedule(client, aid) && !force) return; + if (await videoHasActiveScheduleWithType(client, aid, type) && !force) return; let adjustedTime = new Date(targetTime); if (type !== "milestone" && type !== "new") { adjustedTime = await adjustSnapshotTime(new Date(targetTime), 1000, redis); diff --git a/packages/crawler/mq/exec/snapshotTick.ts b/packages/crawler/mq/exec/snapshotTick.ts index f12db84..215dd70 100644 --- a/packages/crawler/mq/exec/snapshotTick.ts +++ b/packages/crawler/mq/exec/snapshotTick.ts @@ -61,6 +61,7 @@ export const bulkSnapshotTickWorker = async (_job: Job) => { map: dataMap, }, { priority: 3 }); } + return `OK` } catch (e) { logger.error(e as Error); } finally { @@ -87,6 +88,7 @@ export const snapshotTickWorker = async (_job: Job) => { id: Number(schedule.id), type: schedule.type ?? "normal", }, { priority }); + return `OK`; } } catch (e) { logger.error(e as Error); @@ -161,7 +163,7 @@ export const collectMilestoneSnapshotsWorker = async (_job: Job) => { const minInterval = 1 * SECOND; const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval); const targetTime = now + delay; - await scheduleSnapshot(client, aid, "milestone", targetTime, true); + await scheduleSnapshot(client, aid, "milestone", targetTime); logger.log(`Scheduled milestone snapshot for aid ${aid} in ${(delay / MINUTE).toFixed(2)} mins.`, "mq"); } } catch (e) { diff --git a/packages/crawler/mq/init.ts b/packages/crawler/mq/init.ts index 4a302d1..13cbd53 100644 --- a/packages/crawler/mq/init.ts +++ b/packages/crawler/mq/init.ts @@ -30,8 +30,8 @@ export async function initMQ() { immediately: true, }, { opts: { - removeOnComplete: 1, - removeOnFail: 1, + removeOnComplete: 300, + removeOnFail: 600, }, }); @@ -40,8 +40,8 @@ export async function initMQ() { immediately: true, }, { opts: { - removeOnComplete: 1, - removeOnFail: 1, + removeOnComplete: 60, + removeOnFail: 600, }, });