ref: extract scheduleCleanup into individual file
improve: logic of scheduleCleanup
This commit is contained in:
parent
651eef0b9e
commit
7689e687ff
@ -6,3 +6,5 @@ export * from "./archiveSnapshots.ts";
|
|||||||
export * from "./dispatchMilestoneSnapshots.ts";
|
export * from "./dispatchMilestoneSnapshots.ts";
|
||||||
export * from "./dispatchRegularSnapshots.ts";
|
export * from "./dispatchRegularSnapshots.ts";
|
||||||
export * from "./snapshotVideo.ts";
|
export * from "./snapshotVideo.ts";
|
||||||
|
export * from "./scheduleCleanup.ts";
|
||||||
|
export * from "./snapshotTick.ts";
|
45
packages/crawler/mq/exec/scheduleCleanup.ts
Normal file
45
packages/crawler/mq/exec/scheduleCleanup.ts
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
import { Job } from "npm:bullmq@5.45.2";
|
||||||
|
import { withDbConnection } from "db/withConnection.ts";
|
||||||
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
|
import logger from "log/logger.ts";
|
||||||
|
import { scheduleSnapshot, setSnapshotStatus } from "db/snapshotSchedule.ts";
|
||||||
|
import { SECOND } from "@std/datetime";
|
||||||
|
import { getTimeoutSchedulesCount } from "mq/task/getTimeoutSchedulesCount.ts";
|
||||||
|
import { removeAllTimeoutSchedules } from "mq/task/removeAllTimeoutSchedules.ts";
|
||||||
|
|
||||||
|
export const scheduleCleanupWorker = async (_job: Job): Promise<void> =>
|
||||||
|
await withDbConnection<void>(async (client: Client) => {
|
||||||
|
if (await getTimeoutSchedulesCount(client) > 2000) {
|
||||||
|
await removeAllTimeoutSchedules(client);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const query: string = `
|
||||||
|
SELECT id, aid, type
|
||||||
|
FROM snapshot_schedule
|
||||||
|
WHERE status IN ('pending', 'processing')
|
||||||
|
AND started_at < NOW() - INTERVAL '30 minutes'
|
||||||
|
UNION
|
||||||
|
SELECT id, aid, type
|
||||||
|
FROM snapshot_schedule
|
||||||
|
WHERE status IN ('pending', 'processing')
|
||||||
|
AND started_at < NOW() - INTERVAL '2 minutes'
|
||||||
|
AND type = 'milestone'
|
||||||
|
`;
|
||||||
|
const { rows } = await client.queryObject<{ id: bigint; aid: bigint; type: string }>(query);
|
||||||
|
if (rows.length === 0) return;
|
||||||
|
for (const row of rows) {
|
||||||
|
const id = Number(row.id);
|
||||||
|
const aid = Number(row.aid);
|
||||||
|
const type = row.type;
|
||||||
|
await setSnapshotStatus(client, id, "timeout");
|
||||||
|
await scheduleSnapshot(client, aid, type, Date.now() + 10 * SECOND);
|
||||||
|
logger.log(
|
||||||
|
`Schedule ${id} has not received any response in a while, rescheduled.`,
|
||||||
|
"mq",
|
||||||
|
"fn:scheduleCleanupWorker",
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}, (e) => {
|
||||||
|
logger.error(e as Error, "mq", "fn:scheduleCleanupWorker");
|
||||||
|
});
|
@ -5,11 +5,9 @@ import {
|
|||||||
bulkSetSnapshotStatus,
|
bulkSetSnapshotStatus,
|
||||||
getBulkSnapshotsInNextSecond,
|
getBulkSnapshotsInNextSecond,
|
||||||
getSnapshotsInNextSecond,
|
getSnapshotsInNextSecond,
|
||||||
scheduleSnapshot,
|
|
||||||
setSnapshotStatus,
|
setSnapshotStatus,
|
||||||
videoHasProcessingSchedule,
|
videoHasProcessingSchedule,
|
||||||
} from "db/snapshotSchedule.ts";
|
} from "db/snapshotSchedule.ts";
|
||||||
import { SECOND } from "@std/datetime";
|
|
||||||
import logger from "log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
import { SnapshotQueue } from "mq/index.ts";
|
import { SnapshotQueue } from "mq/index.ts";
|
||||||
|
|
||||||
@ -86,33 +84,3 @@ export const closetMilestone = (views: number) => {
|
|||||||
if (views < 1000000) return 1000000;
|
if (views < 1000000) return 1000000;
|
||||||
return 10000000;
|
return 10000000;
|
||||||
};
|
};
|
||||||
|
|
||||||
export const scheduleCleanupWorker = async (_job: Job) => {
|
|
||||||
const client = await db.connect();
|
|
||||||
try {
|
|
||||||
const query = `
|
|
||||||
SELECT id, aid, type
|
|
||||||
FROM snapshot_schedule
|
|
||||||
WHERE status IN ('pending', 'processing')
|
|
||||||
AND started_at < NOW() - INTERVAL '30 minutes'
|
|
||||||
`;
|
|
||||||
const { rows } = await client.queryObject<{ id: bigint; aid: bigint; type: string }>(query);
|
|
||||||
if (rows.length === 0) return;
|
|
||||||
for (const row of rows) {
|
|
||||||
const id = Number(row.id);
|
|
||||||
const aid = Number(row.aid);
|
|
||||||
const type = row.type;
|
|
||||||
await setSnapshotStatus(client, id, "timeout");
|
|
||||||
await scheduleSnapshot(client, aid, type, Date.now() + 10 * SECOND);
|
|
||||||
logger.log(
|
|
||||||
`Schedule ${id} has no response received for 5 minutes, rescheduled.`,
|
|
||||||
"mq",
|
|
||||||
"fn:scheduleCleanupWorker",
|
|
||||||
);
|
|
||||||
}
|
|
||||||
} catch (e) {
|
|
||||||
logger.error(e as Error, "mq", "fn:scheduleCleanupWorker");
|
|
||||||
} finally {
|
|
||||||
client.release();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
@ -61,7 +61,7 @@ export async function initMQ() {
|
|||||||
});
|
});
|
||||||
|
|
||||||
await SnapshotQueue.upsertJobScheduler("scheduleCleanup", {
|
await SnapshotQueue.upsertJobScheduler("scheduleCleanup", {
|
||||||
every: 30 * MINUTE,
|
every: 2 * MINUTE,
|
||||||
immediately: true,
|
immediately: true,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
13
packages/crawler/mq/task/getTimeoutSchedulesCount.ts
Normal file
13
packages/crawler/mq/task/getTimeoutSchedulesCount.ts
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
|
|
||||||
|
export async function getTimeoutSchedulesCount(client: Client) {
|
||||||
|
const query: string = `
|
||||||
|
SELECT COUNT(id)
|
||||||
|
FROM snapshot_schedule
|
||||||
|
WHERE status IN ('pending', 'processing')
|
||||||
|
AND started_at < NOW() - INTERVAL '30 minutes'
|
||||||
|
`;
|
||||||
|
|
||||||
|
const { rows } = await client.queryObject<{ count: number }>(query);
|
||||||
|
return rows[0].count;
|
||||||
|
}
|
16
packages/crawler/mq/task/removeAllTimeoutSchedules.ts
Normal file
16
packages/crawler/mq/task/removeAllTimeoutSchedules.ts
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
|
import logger from "log/logger.ts";
|
||||||
|
|
||||||
|
export async function removeAllTimeoutSchedules(client: Client) {
|
||||||
|
logger.log(
|
||||||
|
"Too many timeout schedules, directly removing these schedules...",
|
||||||
|
"mq",
|
||||||
|
"fn:scheduleCleanupWorker",
|
||||||
|
);
|
||||||
|
const query: string = `
|
||||||
|
DELETE FROM snapshot_schedule
|
||||||
|
WHERE status IN ('pending', 'processing')
|
||||||
|
AND started_at < NOW() - INTERVAL '30 minutes'
|
||||||
|
`;
|
||||||
|
await client.queryObject(query);
|
||||||
|
}
|
@ -1,11 +1,14 @@
|
|||||||
import { ConnectionOptions, Job, Worker } from "bullmq";
|
import { ConnectionOptions, Job, Worker } from "bullmq";
|
||||||
import {
|
import {
|
||||||
archiveSnapshotsWorker,
|
archiveSnapshotsWorker,
|
||||||
|
bulkSnapshotTickWorker,
|
||||||
collectSongsWorker,
|
collectSongsWorker,
|
||||||
dispatchMilestoneSnapshotsWorker,
|
dispatchMilestoneSnapshotsWorker,
|
||||||
dispatchRegularSnapshotsWorker,
|
dispatchRegularSnapshotsWorker,
|
||||||
getLatestVideosWorker,
|
getLatestVideosWorker,
|
||||||
getVideoInfoWorker,
|
getVideoInfoWorker,
|
||||||
|
scheduleCleanupWorker,
|
||||||
|
snapshotTickWorker,
|
||||||
snapshotVideoWorker,
|
snapshotVideoWorker,
|
||||||
takeBulkSnapshotForVideosWorker,
|
takeBulkSnapshotForVideosWorker,
|
||||||
} from "mq/exec/executors.ts";
|
} from "mq/exec/executors.ts";
|
||||||
@ -13,7 +16,6 @@ import { redis } from "@core/db/redis.ts";
|
|||||||
import logger from "log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
import { lockManager } from "mq/lockManager.ts";
|
import { lockManager } from "mq/lockManager.ts";
|
||||||
import { WorkerError } from "mq/schema.ts";
|
import { WorkerError } from "mq/schema.ts";
|
||||||
import { bulkSnapshotTickWorker, scheduleCleanupWorker, snapshotTickWorker } from "mq/exec/snapshotTick.ts";
|
|
||||||
|
|
||||||
const releaseLockForJob = async (name: string) => {
|
const releaseLockForJob = async (name: string) => {
|
||||||
await lockManager.releaseLock(name);
|
await lockManager.releaseLock(name);
|
||||||
|
Loading…
Reference in New Issue
Block a user