fix: unlimited addition of milestone tasks

This commit is contained in:
alikia2x (寒寒) 2025-04-07 00:28:28 +08:00
parent 38c0cbd371
commit da1bea7f41
Signed by: alikia2x
GPG Key ID: 56209E0CCD8420C6
3 changed files with 16 additions and 6 deletions

View File

@ -71,6 +71,14 @@ export async function videoHasActiveSchedule(client: Client, aid: number) {
return res.rows.length > 0; return res.rows.length > 0;
} }
export async function videoHasActiveScheduleWithType(client: Client, aid: number, type: string) {
const res = await client.queryObject<{ status: string }>(
`SELECT status FROM snapshot_schedule WHERE aid = $1 AND (status = 'pending' OR status = 'processing') AND type = $2`,
[aid, type],
);
return res.rows.length > 0;
}
export async function videoHasProcessingSchedule(client: Client, aid: number) { export async function videoHasProcessingSchedule(client: Client, aid: number) {
const res = await client.queryObject<{ status: string }>( const res = await client.queryObject<{ status: string }>(
`SELECT status FROM snapshot_schedule WHERE aid = $1 AND status = 'processing'`, `SELECT status FROM snapshot_schedule WHERE aid = $1 AND status = 'processing'`,
@ -193,7 +201,7 @@ export async function scheduleSnapshot(
targetTime: number, targetTime: number,
force: boolean = false, force: boolean = false,
) { ) {
if (await videoHasActiveSchedule(client, aid) && !force) return; if (await videoHasActiveScheduleWithType(client, aid, type) && !force) return;
let adjustedTime = new Date(targetTime); let adjustedTime = new Date(targetTime);
if (type !== "milestone" && type !== "new") { if (type !== "milestone" && type !== "new") {
adjustedTime = await adjustSnapshotTime(new Date(targetTime), 1000, redis); adjustedTime = await adjustSnapshotTime(new Date(targetTime), 1000, redis);

View File

@ -61,6 +61,7 @@ export const bulkSnapshotTickWorker = async (_job: Job) => {
map: dataMap, map: dataMap,
}, { priority: 3 }); }, { priority: 3 });
} }
return `OK`
} catch (e) { } catch (e) {
logger.error(e as Error); logger.error(e as Error);
} finally { } finally {
@ -87,6 +88,7 @@ export const snapshotTickWorker = async (_job: Job) => {
id: Number(schedule.id), id: Number(schedule.id),
type: schedule.type ?? "normal", type: schedule.type ?? "normal",
}, { priority }); }, { priority });
return `OK`;
} }
} catch (e) { } catch (e) {
logger.error(e as Error); logger.error(e as Error);
@ -161,7 +163,7 @@ export const collectMilestoneSnapshotsWorker = async (_job: Job) => {
const minInterval = 1 * SECOND; const minInterval = 1 * SECOND;
const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval); const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval);
const targetTime = now + delay; const targetTime = now + delay;
await scheduleSnapshot(client, aid, "milestone", targetTime, true); await scheduleSnapshot(client, aid, "milestone", targetTime);
logger.log(`Scheduled milestone snapshot for aid ${aid} in ${(delay / MINUTE).toFixed(2)} mins.`, "mq"); logger.log(`Scheduled milestone snapshot for aid ${aid} in ${(delay / MINUTE).toFixed(2)} mins.`, "mq");
} }
} catch (e) { } catch (e) {

View File

@ -30,8 +30,8 @@ export async function initMQ() {
immediately: true, immediately: true,
}, { }, {
opts: { opts: {
removeOnComplete: 1, removeOnComplete: 300,
removeOnFail: 1, removeOnFail: 600,
}, },
}); });
@ -40,8 +40,8 @@ export async function initMQ() {
immediately: true, immediately: true,
}, { }, {
opts: { opts: {
removeOnComplete: 1, removeOnComplete: 60,
removeOnFail: 1, removeOnFail: 600,
}, },
}); });