fix: mixing bulk tasks with other tasks
This commit is contained in:
parent
cd160c486e
commit
49098763f1
@ -256,13 +256,25 @@ export async function getSnapshotsInNextSecond(client: Client) {
|
|||||||
const query = `
|
const query = `
|
||||||
SELECT *
|
SELECT *
|
||||||
FROM snapshot_schedule
|
FROM snapshot_schedule
|
||||||
WHERE started_at <= NOW() + INTERVAL '1 seconds' AND status = 'pending'
|
WHERE started_at <= NOW() + INTERVAL '1 seconds' AND status = 'pending' AND type != 'normal'
|
||||||
ORDER BY
|
ORDER BY
|
||||||
CASE
|
CASE
|
||||||
WHEN type = 'milestone' THEN 0
|
WHEN type = 'milestone' THEN 0
|
||||||
ELSE 1
|
ELSE 1
|
||||||
END,
|
END,
|
||||||
started_at
|
started_at
|
||||||
|
LIMIT 10;
|
||||||
|
`;
|
||||||
|
const res = await client.queryObject<SnapshotScheduleType>(query, []);
|
||||||
|
return res.rows;
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function getBulkSnapshotsInNextSecond(client: Client) {
|
||||||
|
const query = `
|
||||||
|
SELECT *
|
||||||
|
FROM snapshot_schedule
|
||||||
|
WHERE started_at <= NOW() + INTERVAL '15 seconds' AND status = 'pending' AND type = 'normal'
|
||||||
|
ORDER BY started_at
|
||||||
LIMIT 100;
|
LIMIT 100;
|
||||||
`;
|
`;
|
||||||
const res = await client.queryObject<SnapshotScheduleType>(query, []);
|
const res = await client.queryObject<SnapshotScheduleType>(query, []);
|
||||||
|
@ -15,6 +15,7 @@ import {
|
|||||||
setSnapshotStatus,
|
setSnapshotStatus,
|
||||||
snapshotScheduleExists,
|
snapshotScheduleExists,
|
||||||
videoHasProcessingSchedule,
|
videoHasProcessingSchedule,
|
||||||
|
getBulkSnapshotsInNextSecond
|
||||||
} from "lib/db/snapshotSchedule.ts";
|
} from "lib/db/snapshotSchedule.ts";
|
||||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
import { HOUR, MINUTE, SECOND, WEEK } from "$std/datetime/constants.ts";
|
import { HOUR, MINUTE, SECOND, WEEK } from "$std/datetime/constants.ts";
|
||||||
@ -39,10 +40,10 @@ const snapshotTypeToTaskMap: { [key: string]: string } = {
|
|||||||
"new": "snapshotMilestoneVideo",
|
"new": "snapshotMilestoneVideo",
|
||||||
};
|
};
|
||||||
|
|
||||||
export const snapshotTickWorker = async (_job: Job) => {
|
export const bulkSnapshotTickWorker = async (_job: Job) => {
|
||||||
const client = await db.connect();
|
const client = await db.connect();
|
||||||
try {
|
try {
|
||||||
const schedules = await getSnapshotsInNextSecond(client);
|
const schedules = await getBulkSnapshotsInNextSecond(client);
|
||||||
const count = schedules.length;
|
const count = schedules.length;
|
||||||
const groups = Math.ceil(count / 30);
|
const groups = Math.ceil(count / 30);
|
||||||
for (let i = 0; i < groups; i++) {
|
for (let i = 0; i < groups; i++) {
|
||||||
@ -60,6 +61,17 @@ export const snapshotTickWorker = async (_job: Job) => {
|
|||||||
map: dataMap,
|
map: dataMap,
|
||||||
}, { priority: 3 });
|
}, { priority: 3 });
|
||||||
}
|
}
|
||||||
|
} catch (e) {
|
||||||
|
logger.error(e as Error);
|
||||||
|
} finally {
|
||||||
|
client.release();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
export const snapshotTickWorker = async (_job: Job) => {
|
||||||
|
const client = await db.connect();
|
||||||
|
try {
|
||||||
|
const schedules = await getSnapshotsInNextSecond(client);
|
||||||
for (const schedule of schedules) {
|
for (const schedule of schedules) {
|
||||||
if (await videoHasProcessingSchedule(client, Number(schedule.aid))) {
|
if (await videoHasProcessingSchedule(client, Number(schedule.aid))) {
|
||||||
return `ALREADY_PROCESSING`;
|
return `ALREADY_PROCESSING`;
|
||||||
|
@ -35,6 +35,16 @@ export async function initMQ() {
|
|||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
|
await SnapshotQueue.upsertJobScheduler("bulkSnapshotTick", {
|
||||||
|
every: 15 * SECOND,
|
||||||
|
immediately: true,
|
||||||
|
}, {
|
||||||
|
opts: {
|
||||||
|
removeOnComplete: 1,
|
||||||
|
removeOnFail: 1,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
await SnapshotQueue.upsertJobScheduler("collectMilestoneSnapshots", {
|
await SnapshotQueue.upsertJobScheduler("collectMilestoneSnapshots", {
|
||||||
every: 5 * MINUTE,
|
every: 5 * MINUTE,
|
||||||
immediately: true,
|
immediately: true,
|
||||||
|
@ -11,7 +11,8 @@ import {
|
|||||||
snapshotTickWorker,
|
snapshotTickWorker,
|
||||||
takeSnapshotForVideoWorker,
|
takeSnapshotForVideoWorker,
|
||||||
scheduleCleanupWorker,
|
scheduleCleanupWorker,
|
||||||
takeBulkSnapshotForVideosWorker
|
takeBulkSnapshotForVideosWorker,
|
||||||
|
bulkSnapshotTickWorker
|
||||||
} from "lib/mq/exec/snapshotTick.ts";
|
} from "lib/mq/exec/snapshotTick.ts";
|
||||||
|
|
||||||
Deno.addSignalListener("SIGINT", async () => {
|
Deno.addSignalListener("SIGINT", async () => {
|
||||||
@ -88,6 +89,9 @@ const snapshotWorker = new Worker(
|
|||||||
case "bulkSnapshotVideo":
|
case "bulkSnapshotVideo":
|
||||||
await takeBulkSnapshotForVideosWorker(job);
|
await takeBulkSnapshotForVideosWorker(job);
|
||||||
break;
|
break;
|
||||||
|
case "bulkSnapshotTick":
|
||||||
|
await bulkSnapshotTickWorker(job);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user