ref: move some worker functions into individual files
This commit is contained in:
parent
0930bbe6f4
commit
e0a19499e1
29
packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts
Normal file
29
packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
import { Job } from "npm:bullmq@5.45.2";
|
||||||
|
import { withDbConnection } from "db/withConnection.ts";
|
||||||
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
|
import { getVideosNearMilestone } from "db/snapshot.ts";
|
||||||
|
import { getAdjustedShortTermETA } from "mq/scheduling.ts";
|
||||||
|
import { truncate } from "utils/truncate.ts";
|
||||||
|
import { scheduleSnapshot } from "db/snapshotSchedule.ts";
|
||||||
|
import logger from "log/logger.ts";
|
||||||
|
import { HOUR, MINUTE, SECOND } from "@std/datetime";
|
||||||
|
|
||||||
|
export const dispatchMilestoneSnapshotsWorker = (_job: Job): Promise<void> =>
|
||||||
|
withDbConnection(async (client: Client) => {
|
||||||
|
const videos = await getVideosNearMilestone(client);
|
||||||
|
for (const video of videos) {
|
||||||
|
const aid = Number(video.aid);
|
||||||
|
const eta = await getAdjustedShortTermETA(client, aid);
|
||||||
|
if (eta > 144) continue;
|
||||||
|
const now = Date.now();
|
||||||
|
const scheduledNextSnapshotDelay = eta * HOUR;
|
||||||
|
const maxInterval = 4 * HOUR;
|
||||||
|
const minInterval = 1 * SECOND;
|
||||||
|
const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval);
|
||||||
|
const targetTime = now + delay;
|
||||||
|
await scheduleSnapshot(client, aid, "milestone", targetTime);
|
||||||
|
logger.log(`Scheduled milestone snapshot for aid ${aid} in ${(delay / MINUTE).toFixed(2)} mins.`, "mq");
|
||||||
|
}
|
||||||
|
}, (e) => {
|
||||||
|
logger.error(e as Error, "mq", "fn:dispatchMilestoneSnapshotsWorker");
|
||||||
|
});
|
39
packages/crawler/mq/exec/dispatchRegularSnapshots.ts
Normal file
39
packages/crawler/mq/exec/dispatchRegularSnapshots.ts
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
import { Job } from "npm:bullmq@5.45.2";
|
||||||
|
import { withDbConnection } from "db/withConnection.ts";
|
||||||
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
|
import { getLatestVideoSnapshot } from "db/snapshot.ts";
|
||||||
|
import { truncate } from "utils/truncate.ts";
|
||||||
|
import { getVideosWithoutActiveSnapshotSchedule, scheduleSnapshot } from "db/snapshotSchedule.ts";
|
||||||
|
import logger from "log/logger.ts";
|
||||||
|
import { HOUR, MINUTE, WEEK } from "@std/datetime";
|
||||||
|
import { lockManager } from "../lockManager.ts";
|
||||||
|
import { getRegularSnapshotInterval } from "../task/regularSnapshotInterval.ts";
|
||||||
|
|
||||||
|
export const dispatchRegularSnapshotsWorker = (_job: Job): Promise<void> =>
|
||||||
|
withDbConnection(async (client: Client) => {
|
||||||
|
const startedAt = Date.now();
|
||||||
|
if (await lockManager.isLocked("dispatchRegularSnapshots")) {
|
||||||
|
logger.log("dispatchRegularSnapshots is already running", "mq");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
await lockManager.acquireLock("dispatchRegularSnapshots", 30 * 60);
|
||||||
|
|
||||||
|
const aids = await getVideosWithoutActiveSnapshotSchedule(client);
|
||||||
|
for (const rawAid of aids) {
|
||||||
|
const aid = Number(rawAid);
|
||||||
|
const latestSnapshot = await getLatestVideoSnapshot(client, aid);
|
||||||
|
const now = Date.now();
|
||||||
|
const lastSnapshotedAt = latestSnapshot?.time ?? now;
|
||||||
|
const interval = await getRegularSnapshotInterval(client, aid);
|
||||||
|
logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq");
|
||||||
|
const targetTime = truncate(lastSnapshotedAt + interval * HOUR, now + 1, now + 100000 * WEEK);
|
||||||
|
await scheduleSnapshot(client, aid, "normal", targetTime);
|
||||||
|
if (now - startedAt > 25 * MINUTE) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, (e) => {
|
||||||
|
logger.error(e as Error, "mq", "fn:regularSnapshotsWorker");
|
||||||
|
}, async () => {
|
||||||
|
await lockManager.releaseLock("dispatchRegularSnapshots");
|
||||||
|
});
|
@ -3,3 +3,5 @@ export * from "./getVideoInfo.ts";
|
|||||||
export * from "./collectSongs.ts";
|
export * from "./collectSongs.ts";
|
||||||
export * from "./takeBulkSnapshot.ts";
|
export * from "./takeBulkSnapshot.ts";
|
||||||
export * from "./archiveSnapshots.ts";
|
export * from "./archiveSnapshots.ts";
|
||||||
|
export * from "./dispatchMilestoneSnapshots.ts";
|
||||||
|
export * from "./dispatchRegularSnapshots.ts";
|
@ -104,62 +104,6 @@ export const closetMilestone = (views: number) => {
|
|||||||
return 10000000;
|
return 10000000;
|
||||||
};
|
};
|
||||||
|
|
||||||
export const collectMilestoneSnapshotsWorker = async (_job: Job) => {
|
|
||||||
const client = await db.connect();
|
|
||||||
try {
|
|
||||||
const videos = await getVideosNearMilestone(client);
|
|
||||||
for (const video of videos) {
|
|
||||||
const aid = Number(video.aid);
|
|
||||||
const eta = await getAdjustedShortTermETA(client, aid);
|
|
||||||
if (eta > 144) continue;
|
|
||||||
const now = Date.now();
|
|
||||||
const scheduledNextSnapshotDelay = eta * HOUR;
|
|
||||||
const maxInterval = 4 * HOUR;
|
|
||||||
const minInterval = 1 * SECOND;
|
|
||||||
const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval);
|
|
||||||
const targetTime = now + delay;
|
|
||||||
await scheduleSnapshot(client, aid, "milestone", targetTime);
|
|
||||||
logger.log(`Scheduled milestone snapshot for aid ${aid} in ${(delay / MINUTE).toFixed(2)} mins.`, "mq");
|
|
||||||
}
|
|
||||||
} catch (e) {
|
|
||||||
logger.error(e as Error, "mq", "fn:collectMilestoneSnapshotsWorker");
|
|
||||||
} finally {
|
|
||||||
client.release();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
export const regularSnapshotsWorker = async (_job: Job) => {
|
|
||||||
const client = await db.connect();
|
|
||||||
const startedAt = Date.now();
|
|
||||||
if (await lockManager.isLocked("dispatchRegularSnapshots")) {
|
|
||||||
logger.log("dispatchRegularSnapshots is already running", "mq");
|
|
||||||
client.release();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
await lockManager.acquireLock("dispatchRegularSnapshots", 30 * 60);
|
|
||||||
try {
|
|
||||||
const aids = await getVideosWithoutActiveSnapshotSchedule(client);
|
|
||||||
for (const rawAid of aids) {
|
|
||||||
const aid = Number(rawAid);
|
|
||||||
const latestSnapshot = await getLatestVideoSnapshot(client, aid);
|
|
||||||
const now = Date.now();
|
|
||||||
const lastSnapshotedAt = latestSnapshot?.time ?? now;
|
|
||||||
const interval = await getRegularSnapshotInterval(client, aid);
|
|
||||||
logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq");
|
|
||||||
const targetTime = truncate(lastSnapshotedAt + interval * HOUR, now + 1, now + 100000 * WEEK);
|
|
||||||
await scheduleSnapshot(client, aid, "normal", targetTime);
|
|
||||||
if (now - startedAt > 25 * MINUTE) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (e) {
|
|
||||||
logger.error(e as Error, "mq", "fn:regularSnapshotsWorker");
|
|
||||||
} finally {
|
|
||||||
await lockManager.releaseLock("dispatchRegularSnapshots");
|
|
||||||
client.release();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
export const takeSnapshotForVideoWorker = async (job: Job) => {
|
export const takeSnapshotForVideoWorker = async (job: Job) => {
|
||||||
const id = job.data.id;
|
const id = job.data.id;
|
||||||
const aid = Number(job.data.aid);
|
const aid = Number(job.data.aid);
|
||||||
|
@ -45,7 +45,7 @@ export async function initMQ() {
|
|||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
await SnapshotQueue.upsertJobScheduler("collectMilestoneSnapshots", {
|
await SnapshotQueue.upsertJobScheduler("dispatchMilestoneSnapshots", {
|
||||||
every: 5 * MINUTE,
|
every: 5 * MINUTE,
|
||||||
immediately: true,
|
immediately: true,
|
||||||
});
|
});
|
||||||
|
@ -5,6 +5,8 @@ import {
|
|||||||
getLatestVideosWorker,
|
getLatestVideosWorker,
|
||||||
getVideoInfoWorker,
|
getVideoInfoWorker,
|
||||||
takeBulkSnapshotForVideosWorker,
|
takeBulkSnapshotForVideosWorker,
|
||||||
|
dispatchMilestoneSnapshotsWorker,
|
||||||
|
dispatchRegularSnapshotsWorker
|
||||||
} from "mq/exec/executors.ts";
|
} from "mq/exec/executors.ts";
|
||||||
import { redis } from "@core/db/redis.ts";
|
import { redis } from "@core/db/redis.ts";
|
||||||
import logger from "log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
@ -12,8 +14,6 @@ import { lockManager } from "mq/lockManager.ts";
|
|||||||
import { WorkerError } from "mq/schema.ts";
|
import { WorkerError } from "mq/schema.ts";
|
||||||
import {
|
import {
|
||||||
bulkSnapshotTickWorker,
|
bulkSnapshotTickWorker,
|
||||||
collectMilestoneSnapshotsWorker,
|
|
||||||
regularSnapshotsWorker,
|
|
||||||
scheduleCleanupWorker,
|
scheduleCleanupWorker,
|
||||||
snapshotTickWorker,
|
snapshotTickWorker,
|
||||||
takeSnapshotForVideoWorker,
|
takeSnapshotForVideoWorker,
|
||||||
@ -86,10 +86,10 @@ const snapshotWorker = new Worker(
|
|||||||
return await takeSnapshotForVideoWorker(job);
|
return await takeSnapshotForVideoWorker(job);
|
||||||
case "snapshotTick":
|
case "snapshotTick":
|
||||||
return await snapshotTickWorker(job);
|
return await snapshotTickWorker(job);
|
||||||
case "collectMilestoneSnapshots":
|
case "dispatchMilestoneSnapshots":
|
||||||
return await collectMilestoneSnapshotsWorker(job);
|
return await dispatchMilestoneSnapshotsWorker(job);
|
||||||
case "dispatchRegularSnapshots":
|
case "dispatchRegularSnapshots":
|
||||||
return await regularSnapshotsWorker(job);
|
return await dispatchRegularSnapshotsWorker(job);
|
||||||
case "scheduleCleanup":
|
case "scheduleCleanup":
|
||||||
return await scheduleCleanupWorker(job);
|
return await scheduleCleanupWorker(job);
|
||||||
case "bulkSnapshotVideo":
|
case "bulkSnapshotVideo":
|
||||||
|
Loading…
Reference in New Issue
Block a user