diff --git a/packages/crawler/db/withConnection.ts b/packages/crawler/db/withConnection.ts index 0d8c29f..a965f41 100644 --- a/packages/crawler/db/withConnection.ts +++ b/packages/crawler/db/withConnection.ts @@ -11,7 +11,7 @@ import { db } from "db/init.ts"; */ export async function withDbConnection( operation: (client: Client) => Promise, - errorHandling?: (error: unknown) => void, + errorHandling?: (error: unknown, client: Client) => void, cleanup?: () => void, ): Promise { const client = await db.connect(); @@ -19,7 +19,7 @@ export async function withDbConnection( return await operation(client); } catch (error) { if (errorHandling) { - errorHandling(error); + errorHandling(error, client); return; } throw error; diff --git a/packages/crawler/mq/exec/dispatchRegularSnapshots.ts b/packages/crawler/mq/exec/dispatchRegularSnapshots.ts index 7d7f1c2..be9e02c 100644 --- a/packages/crawler/mq/exec/dispatchRegularSnapshots.ts +++ b/packages/crawler/mq/exec/dispatchRegularSnapshots.ts @@ -9,8 +9,8 @@ import { HOUR, MINUTE, WEEK } from "@std/datetime"; import { lockManager } from "../lockManager.ts"; import { getRegularSnapshotInterval } from "../task/regularSnapshotInterval.ts"; -export const dispatchRegularSnapshotsWorker = (_job: Job): Promise => - withDbConnection(async (client: Client) => { +export const dispatchRegularSnapshotsWorker = async (_job: Job): Promise => + await withDbConnection(async (client: Client) => { const startedAt = Date.now(); if (await lockManager.isLocked("dispatchRegularSnapshots")) { logger.log("dispatchRegularSnapshots is already running", "mq"); diff --git a/packages/crawler/mq/exec/executors.ts b/packages/crawler/mq/exec/executors.ts index 63b353a..3aab9f2 100644 --- a/packages/crawler/mq/exec/executors.ts +++ b/packages/crawler/mq/exec/executors.ts @@ -4,4 +4,5 @@ export * from "./collectSongs.ts"; export * from "./takeBulkSnapshot.ts"; export * from "./archiveSnapshots.ts"; export * from "./dispatchMilestoneSnapshots.ts"; -export * from "./dispatchRegularSnapshots.ts"; \ No newline at end of file +export * from "./dispatchRegularSnapshots.ts"; +export * from "./snapshotVideo.ts"; \ No newline at end of file diff --git a/packages/crawler/mq/exec/snapshotTick.ts b/packages/crawler/mq/exec/snapshotTick.ts index 5465f52..4e9bddb 100644 --- a/packages/crawler/mq/exec/snapshotTick.ts +++ b/packages/crawler/mq/exec/snapshotTick.ts @@ -100,85 +100,6 @@ export const closetMilestone = (views: number) => { return 10000000; }; -export const takeSnapshotForVideoWorker = async (job: Job) => { - const id = job.data.id; - const aid = Number(job.data.aid); - const type = job.data.type; - const task = snapshotTypeToTaskMap[type] ?? "snapshotVideo"; - const client = await db.connect(); - const retryInterval = type === "milestone" ? 5 * SECOND : 2 * MINUTE; - const exists = await snapshotScheduleExists(client, id); - if (!exists) { - client.release(); - return; - } - const status = await getBiliVideoStatus(client, aid); - if (status !== 0) { - client.release(); - return `REFUSE_WORKING_BILI_STATUS_${status}`; - } - try { - await setSnapshotStatus(client, id, "processing"); - const stat = await insertVideoSnapshot(client, aid, task); - if (typeof stat === "number") { - await setBiliVideoStatus(client, aid, stat); - await setSnapshotStatus(client, id, "completed"); - return `GET_BILI_STATUS_${stat}`; - } - await setSnapshotStatus(client, id, "completed"); - if (type === "normal") { - const interval = await getRegularSnapshotInterval(client, aid); - logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq"); - await scheduleSnapshot(client, aid, type, Date.now() + interval * HOUR); - return `DONE`; - } else if (type === "new") { - const publihsedAt = await getSongsPublihsedAt(client, aid); - const timeSincePublished = stat.time - publihsedAt!; - const viewsPerHour = stat.views / timeSincePublished * HOUR; - if (timeSincePublished > 48 * HOUR) { - return `DONE`; - } - if (timeSincePublished > 2 * HOUR && viewsPerHour < 10) { - return `DONE`; - } - let intervalMins = 240; - if (viewsPerHour > 50) { - intervalMins = 120; - } - if (viewsPerHour > 100) { - intervalMins = 60; - } - if (viewsPerHour > 1000) { - intervalMins = 15; - } - await scheduleSnapshot(client, aid, type, Date.now() + intervalMins * MINUTE, true); - } - if (type !== "milestone") return `DONE`; - const eta = await getAdjustedShortTermETA(client, aid); - if (eta > 144) return "ETA_TOO_LONG"; - const now = Date.now(); - const targetTime = now + eta * HOUR; - await scheduleSnapshot(client, aid, type, targetTime); - await setSnapshotStatus(client, id, "completed"); - return `DONE`; - } catch (e) { - if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") { - logger.warn( - `No available proxy for aid ${job.data.aid}.`, - "mq", - "fn:takeSnapshotForVideoWorker", - ); - await setSnapshotStatus(client, id, "no_proxy"); - await scheduleSnapshot(client, aid, type, Date.now() + retryInterval); - return; - } - logger.error(e as Error, "mq", "fn:takeSnapshotForVideoWorker"); - await setSnapshotStatus(client, id, "failed"); - } finally { - client.release(); - } -}; - export const scheduleCleanupWorker = async (_job: Job) => { const client = await db.connect(); try { diff --git a/packages/crawler/mq/exec/snapshotVideo.ts b/packages/crawler/mq/exec/snapshotVideo.ts new file mode 100644 index 0000000..20de59b --- /dev/null +++ b/packages/crawler/mq/exec/snapshotVideo.ts @@ -0,0 +1,107 @@ +import { Job } from "npm:bullmq@5.45.2"; +import { withDbConnection } from "db/withConnection.ts"; +import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; +import { scheduleSnapshot, setSnapshotStatus, snapshotScheduleExists } from "db/snapshotSchedule.ts"; +import logger from "log/logger.ts"; +import { HOUR, MINUTE, SECOND } from "@std/datetime"; +import { lockManager } from "mq/lockManager.ts"; +import { getBiliVideoStatus, setBiliVideoStatus } from "db/allData.ts"; +import { insertVideoSnapshot } from "mq/task/getVideoStats.ts"; +import { getSongsPublihsedAt } from "db/songs.ts"; +import { getAdjustedShortTermETA } from "mq/scheduling.ts"; +import { NetSchedulerError } from "@core/net/delegate.ts"; + +const snapshotTypeToTaskMap: { [key: string]: string } = { + "milestone": "snapshotMilestoneVideo", + "normal": "snapshotVideo", + "new": "snapshotMilestoneVideo", +}; + +export const snapshotVideoWorker = async (job: Job): Promise => { + const id = job.data.id; + const aid = Number(job.data.aid); + const type = job.data.type; + const task = snapshotTypeToTaskMap[type] ?? "snapshotVideo"; + const retryInterval = type === "milestone" ? 5 * SECOND : 2 * MINUTE; + await withDbConnection(async (client: Client) => { + const exists = await snapshotScheduleExists(client, id); + if (!exists) { + return; + } + const status = await getBiliVideoStatus(client, aid); + if (status !== 0) { + logger.warn( + `Bilibili return status ${status} when snapshoting for ${aid}.`, + "mq", + "fn:dispatchRegularSnapshotsWorker", + ); + return; + } + await setSnapshotStatus(client, id, "processing"); + const stat = await insertVideoSnapshot(client, aid, task); + if (typeof stat === "number") { + await setBiliVideoStatus(client, aid, stat); + await setSnapshotStatus(client, id, "completed"); + logger.warn( + `Bilibili return status ${status} when snapshoting for ${aid}.`, + "mq", + "fn:dispatchRegularSnapshotsWorker", + ); + return; + } + await setSnapshotStatus(client, id, "completed"); + if (type === "new") { + const publihsedAt = await getSongsPublihsedAt(client, aid); + const timeSincePublished = stat.time - publihsedAt!; + const viewsPerHour = stat.views / timeSincePublished * HOUR; + if (timeSincePublished > 48 * HOUR) { + return; + } + if (timeSincePublished > 2 * HOUR && viewsPerHour < 10) { + return; + } + let intervalMins = 240; + if (viewsPerHour > 50) { + intervalMins = 120; + } + if (viewsPerHour > 100) { + intervalMins = 60; + } + if (viewsPerHour > 1000) { + intervalMins = 15; + } + await scheduleSnapshot(client, aid, type, Date.now() + intervalMins * MINUTE, true); + } + if (type !== "milestone") return; + const eta = await getAdjustedShortTermETA(client, aid); + if (eta > 144) { + const etaHoursString = eta.toFixed(2) + " hrs"; + logger.warn( + `ETA (${etaHoursString}) too long for milestone snapshot. aid: ${aid}.`, + "mq", + "fn:dispatchRegularSnapshotsWorker", + ); + } + const now = Date.now(); + const targetTime = now + eta * HOUR; + await scheduleSnapshot(client, aid, type, targetTime); + await setSnapshotStatus(client, id, "completed"); + return; + }, async (e, client) => { + if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") { + logger.warn( + `No available proxy for aid ${job.data.aid}.`, + "mq", + "fn:takeSnapshotForVideoWorker", + ); + await setSnapshotStatus(client, id, "no_proxy"); + await scheduleSnapshot(client, aid, type, Date.now() + retryInterval); + return; + } + logger.error(e as Error, "mq", "fn:takeSnapshotForVideoWorker"); + await setSnapshotStatus(client, id, "failed"); + }, async () => { + await lockManager.releaseLock("dispatchRegularSnapshots"); + }); + return; +}; diff --git a/packages/crawler/src/worker.ts b/packages/crawler/src/worker.ts index df3d4fc..84023fe 100644 --- a/packages/crawler/src/worker.ts +++ b/packages/crawler/src/worker.ts @@ -2,22 +2,18 @@ import { ConnectionOptions, Job, Worker } from "bullmq"; import { archiveSnapshotsWorker, collectSongsWorker, + dispatchMilestoneSnapshotsWorker, + dispatchRegularSnapshotsWorker, getLatestVideosWorker, getVideoInfoWorker, + snapshotVideoWorker, takeBulkSnapshotForVideosWorker, - dispatchMilestoneSnapshotsWorker, - dispatchRegularSnapshotsWorker } from "mq/exec/executors.ts"; import { redis } from "@core/db/redis.ts"; import logger from "log/logger.ts"; import { lockManager } from "mq/lockManager.ts"; import { WorkerError } from "mq/schema.ts"; -import { - bulkSnapshotTickWorker, - scheduleCleanupWorker, - snapshotTickWorker, - takeSnapshotForVideoWorker, -} from "mq/exec/snapshotTick.ts"; +import { bulkSnapshotTickWorker, scheduleCleanupWorker, snapshotTickWorker } from "mq/exec/snapshotTick.ts"; const releaseLockForJob = async (name: string) => { await lockManager.releaseLock(name); @@ -83,7 +79,7 @@ const snapshotWorker = new Worker( async (job: Job) => { switch (job.name) { case "snapshotVideo": - return await takeSnapshotForVideoWorker(job); + return await snapshotVideoWorker(job); case "snapshotTick": return await snapshotTickWorker(job); case "dispatchMilestoneSnapshots":