ref: extract snapshotVideoWorker into individual file

This commit is contained in:
alikia2x (寒寒) 2025-04-14 05:36:07 +08:00
parent c7dd1cfc2e
commit 3d9e98c949
Signed by: alikia2x
GPG Key ID: 56209E0CCD8420C6
6 changed files with 118 additions and 93 deletions

View File

@ -11,7 +11,7 @@ import { db } from "db/init.ts";
*/
export async function withDbConnection<T>(
operation: (client: Client) => Promise<T>,
errorHandling?: (error: unknown) => void,
errorHandling?: (error: unknown, client: Client) => void,
cleanup?: () => void,
): Promise<T | undefined> {
const client = await db.connect();
@ -19,7 +19,7 @@ export async function withDbConnection<T>(
return await operation(client);
} catch (error) {
if (errorHandling) {
errorHandling(error);
errorHandling(error, client);
return;
}
throw error;

View File

@ -9,8 +9,8 @@ import { HOUR, MINUTE, WEEK } from "@std/datetime";
import { lockManager } from "../lockManager.ts";
import { getRegularSnapshotInterval } from "../task/regularSnapshotInterval.ts";
export const dispatchRegularSnapshotsWorker = (_job: Job): Promise<void> =>
withDbConnection(async (client: Client) => {
export const dispatchRegularSnapshotsWorker = async (_job: Job): Promise<void> =>
await withDbConnection(async (client: Client) => {
const startedAt = Date.now();
if (await lockManager.isLocked("dispatchRegularSnapshots")) {
logger.log("dispatchRegularSnapshots is already running", "mq");

View File

@ -4,4 +4,5 @@ export * from "./collectSongs.ts";
export * from "./takeBulkSnapshot.ts";
export * from "./archiveSnapshots.ts";
export * from "./dispatchMilestoneSnapshots.ts";
export * from "./dispatchRegularSnapshots.ts";
export * from "./dispatchRegularSnapshots.ts";
export * from "./snapshotVideo.ts";

View File

@ -100,85 +100,6 @@ export const closetMilestone = (views: number) => {
return 10000000;
};
export const takeSnapshotForVideoWorker = async (job: Job) => {
const id = job.data.id;
const aid = Number(job.data.aid);
const type = job.data.type;
const task = snapshotTypeToTaskMap[type] ?? "snapshotVideo";
const client = await db.connect();
const retryInterval = type === "milestone" ? 5 * SECOND : 2 * MINUTE;
const exists = await snapshotScheduleExists(client, id);
if (!exists) {
client.release();
return;
}
const status = await getBiliVideoStatus(client, aid);
if (status !== 0) {
client.release();
return `REFUSE_WORKING_BILI_STATUS_${status}`;
}
try {
await setSnapshotStatus(client, id, "processing");
const stat = await insertVideoSnapshot(client, aid, task);
if (typeof stat === "number") {
await setBiliVideoStatus(client, aid, stat);
await setSnapshotStatus(client, id, "completed");
return `GET_BILI_STATUS_${stat}`;
}
await setSnapshotStatus(client, id, "completed");
if (type === "normal") {
const interval = await getRegularSnapshotInterval(client, aid);
logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq");
await scheduleSnapshot(client, aid, type, Date.now() + interval * HOUR);
return `DONE`;
} else if (type === "new") {
const publihsedAt = await getSongsPublihsedAt(client, aid);
const timeSincePublished = stat.time - publihsedAt!;
const viewsPerHour = stat.views / timeSincePublished * HOUR;
if (timeSincePublished > 48 * HOUR) {
return `DONE`;
}
if (timeSincePublished > 2 * HOUR && viewsPerHour < 10) {
return `DONE`;
}
let intervalMins = 240;
if (viewsPerHour > 50) {
intervalMins = 120;
}
if (viewsPerHour > 100) {
intervalMins = 60;
}
if (viewsPerHour > 1000) {
intervalMins = 15;
}
await scheduleSnapshot(client, aid, type, Date.now() + intervalMins * MINUTE, true);
}
if (type !== "milestone") return `DONE`;
const eta = await getAdjustedShortTermETA(client, aid);
if (eta > 144) return "ETA_TOO_LONG";
const now = Date.now();
const targetTime = now + eta * HOUR;
await scheduleSnapshot(client, aid, type, targetTime);
await setSnapshotStatus(client, id, "completed");
return `DONE`;
} catch (e) {
if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") {
logger.warn(
`No available proxy for aid ${job.data.aid}.`,
"mq",
"fn:takeSnapshotForVideoWorker",
);
await setSnapshotStatus(client, id, "no_proxy");
await scheduleSnapshot(client, aid, type, Date.now() + retryInterval);
return;
}
logger.error(e as Error, "mq", "fn:takeSnapshotForVideoWorker");
await setSnapshotStatus(client, id, "failed");
} finally {
client.release();
}
};
export const scheduleCleanupWorker = async (_job: Job) => {
const client = await db.connect();
try {

View File

@ -0,0 +1,107 @@
import { Job } from "npm:bullmq@5.45.2";
import { withDbConnection } from "db/withConnection.ts";
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { scheduleSnapshot, setSnapshotStatus, snapshotScheduleExists } from "db/snapshotSchedule.ts";
import logger from "log/logger.ts";
import { HOUR, MINUTE, SECOND } from "@std/datetime";
import { lockManager } from "mq/lockManager.ts";
import { getBiliVideoStatus, setBiliVideoStatus } from "db/allData.ts";
import { insertVideoSnapshot } from "mq/task/getVideoStats.ts";
import { getSongsPublihsedAt } from "db/songs.ts";
import { getAdjustedShortTermETA } from "mq/scheduling.ts";
import { NetSchedulerError } from "@core/net/delegate.ts";
const snapshotTypeToTaskMap: { [key: string]: string } = {
"milestone": "snapshotMilestoneVideo",
"normal": "snapshotVideo",
"new": "snapshotMilestoneVideo",
};
export const snapshotVideoWorker = async (job: Job): Promise<void> => {
const id = job.data.id;
const aid = Number(job.data.aid);
const type = job.data.type;
const task = snapshotTypeToTaskMap[type] ?? "snapshotVideo";
const retryInterval = type === "milestone" ? 5 * SECOND : 2 * MINUTE;
await withDbConnection(async (client: Client) => {
const exists = await snapshotScheduleExists(client, id);
if (!exists) {
return;
}
const status = await getBiliVideoStatus(client, aid);
if (status !== 0) {
logger.warn(
`Bilibili return status ${status} when snapshoting for ${aid}.`,
"mq",
"fn:dispatchRegularSnapshotsWorker",
);
return;
}
await setSnapshotStatus(client, id, "processing");
const stat = await insertVideoSnapshot(client, aid, task);
if (typeof stat === "number") {
await setBiliVideoStatus(client, aid, stat);
await setSnapshotStatus(client, id, "completed");
logger.warn(
`Bilibili return status ${status} when snapshoting for ${aid}.`,
"mq",
"fn:dispatchRegularSnapshotsWorker",
);
return;
}
await setSnapshotStatus(client, id, "completed");
if (type === "new") {
const publihsedAt = await getSongsPublihsedAt(client, aid);
const timeSincePublished = stat.time - publihsedAt!;
const viewsPerHour = stat.views / timeSincePublished * HOUR;
if (timeSincePublished > 48 * HOUR) {
return;
}
if (timeSincePublished > 2 * HOUR && viewsPerHour < 10) {
return;
}
let intervalMins = 240;
if (viewsPerHour > 50) {
intervalMins = 120;
}
if (viewsPerHour > 100) {
intervalMins = 60;
}
if (viewsPerHour > 1000) {
intervalMins = 15;
}
await scheduleSnapshot(client, aid, type, Date.now() + intervalMins * MINUTE, true);
}
if (type !== "milestone") return;
const eta = await getAdjustedShortTermETA(client, aid);
if (eta > 144) {
const etaHoursString = eta.toFixed(2) + " hrs";
logger.warn(
`ETA (${etaHoursString}) too long for milestone snapshot. aid: ${aid}.`,
"mq",
"fn:dispatchRegularSnapshotsWorker",
);
}
const now = Date.now();
const targetTime = now + eta * HOUR;
await scheduleSnapshot(client, aid, type, targetTime);
await setSnapshotStatus(client, id, "completed");
return;
}, async (e, client) => {
if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") {
logger.warn(
`No available proxy for aid ${job.data.aid}.`,
"mq",
"fn:takeSnapshotForVideoWorker",
);
await setSnapshotStatus(client, id, "no_proxy");
await scheduleSnapshot(client, aid, type, Date.now() + retryInterval);
return;
}
logger.error(e as Error, "mq", "fn:takeSnapshotForVideoWorker");
await setSnapshotStatus(client, id, "failed");
}, async () => {
await lockManager.releaseLock("dispatchRegularSnapshots");
});
return;
};

View File

@ -2,22 +2,18 @@ import { ConnectionOptions, Job, Worker } from "bullmq";
import {
archiveSnapshotsWorker,
collectSongsWorker,
dispatchMilestoneSnapshotsWorker,
dispatchRegularSnapshotsWorker,
getLatestVideosWorker,
getVideoInfoWorker,
snapshotVideoWorker,
takeBulkSnapshotForVideosWorker,
dispatchMilestoneSnapshotsWorker,
dispatchRegularSnapshotsWorker
} from "mq/exec/executors.ts";
import { redis } from "@core/db/redis.ts";
import logger from "log/logger.ts";
import { lockManager } from "mq/lockManager.ts";
import { WorkerError } from "mq/schema.ts";
import {
bulkSnapshotTickWorker,
scheduleCleanupWorker,
snapshotTickWorker,
takeSnapshotForVideoWorker,
} from "mq/exec/snapshotTick.ts";
import { bulkSnapshotTickWorker, scheduleCleanupWorker, snapshotTickWorker } from "mq/exec/snapshotTick.ts";
const releaseLockForJob = async (name: string) => {
await lockManager.releaseLock(name);
@ -83,7 +79,7 @@ const snapshotWorker = new Worker(
async (job: Job) => {
switch (job.name) {
case "snapshotVideo":
return await takeSnapshotForVideoWorker(job);
return await snapshotVideoWorker(job);
case "snapshotTick":
return await snapshotTickWorker(job);
case "dispatchMilestoneSnapshots":