diff --git a/packages/backend/root.ts b/packages/backend/root.ts index d8024c7..aa2a1e9 100644 --- a/packages/backend/root.ts +++ b/packages/backend/root.ts @@ -3,10 +3,10 @@ import { VERSION } from "./main.ts"; import { createHandlers } from "./utils.ts"; export const rootHandler = createHandlers((c) => { - let singer: Singer | Singer[] | null; + let singer: Singer | Singer[]; const shouldShowSpecialSinger = Math.random() < 0.016; if (getSingerForBirthday().length !== 0) { - singer = getSingerForBirthday(); + singer = JSON.parse(JSON.stringify(getSingerForBirthday())) as Singer[]; for (const s of singer) { delete s.birthday; s.message = `祝${s.name}生日快乐~`; diff --git a/packages/crawler/db/snapshot.ts b/packages/crawler/db/snapshot.ts index 119e353..25c582d 100644 --- a/packages/crawler/db/snapshot.ts +++ b/packages/crawler/db/snapshot.ts @@ -6,8 +6,16 @@ export async function getVideosNearMilestone(client: Client) { const queryResult = await client.queryObject(` SELECT ls.* FROM latest_video_snapshot ls + RIGHT JOIN songs ON songs.aid = ls.aid WHERE - views < 100000 OR + (views >= 50000 AND views < 100000) OR + (views >= 900000 AND views < 1000000) OR + (views >= 9900000 AND views < 10000000) + UNION + SELECT ls.* + FROM latest_video_snapshot ls + WHERE + (views >= 90000 AND views < 100000) OR (views >= 900000 AND views < 1000000) OR (views >= 9900000 AND views < 10000000) `); diff --git a/packages/crawler/db/snapshotSchedule.ts b/packages/crawler/db/snapshotSchedule.ts index d6050ac..24de1f4 100644 --- a/packages/crawler/db/snapshotSchedule.ts +++ b/packages/crawler/db/snapshotSchedule.ts @@ -1,7 +1,7 @@ import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { SnapshotScheduleType } from "@core/db/schema"; import logger from "log/logger.ts"; -import { MINUTE } from "@std/datetime"; +import { MINUTE } from "$std/datetime/constants.ts"; import { redis } from "@core/db/redis.ts"; import { Redis } from "ioredis"; @@ -272,11 +272,17 @@ export async function getSnapshotsInNextSecond(client: Client) { export async function getBulkSnapshotsInNextSecond(client: Client) { const query = ` - SELECT * - FROM snapshot_schedule - WHERE started_at <= NOW() + INTERVAL '15 seconds' AND status = 'pending' AND type = 'normal' - ORDER BY started_at - LIMIT 1000; + SELECT * + FROM snapshot_schedule + WHERE (started_at <= NOW() + INTERVAL '15 seconds') + AND status = 'pending' + AND (type = 'normal' OR type = 'archive') + ORDER BY CASE + WHEN type = 'normal' THEN 1 + WHEN type = 'archive' THEN 2 + END, + started_at + LIMIT 1000; `; const res = await client.queryObject(query, []); return res.rows; @@ -306,3 +312,14 @@ export async function getVideosWithoutActiveSnapshotSchedule(client: Client) { const res = await client.queryObject<{ aid: number }>(query, []); return res.rows.map((r) => Number(r.aid)); } + +export async function getAllVideosWithoutActiveSnapshotSchedule(client: Client) { + const query: string = ` + SELECT s.aid + FROM bilibili_metadata s + LEFT JOIN snapshot_schedule ss ON s.aid = ss.aid AND (ss.status = 'pending' OR ss.status = 'processing') + WHERE ss.aid IS NULL + `; + const res = await client.queryObject<{ aid: number }>(query, []); + return res.rows.map((r) => Number(r.aid)); +} diff --git a/packages/crawler/deno.json b/packages/crawler/deno.json index dee1698..1b1784d 100644 --- a/packages/crawler/deno.json +++ b/packages/crawler/deno.json @@ -12,7 +12,7 @@ "worker:filter": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/filterWorker.ts", "adder": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts", "bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts", - "all": "concurrently 'deno task worker:main' 'deno task adder' 'deno task bullui' 'deno task worker:filter'", + "all": "concurrently --restart-tries -1 'deno task worker:main' 'deno task adder' 'deno task bullui' 'deno task worker:filter'", "test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run" }, "lint": { diff --git a/packages/crawler/mq/exec/archiveSnapshots.ts b/packages/crawler/mq/exec/archiveSnapshots.ts new file mode 100644 index 0000000..a35738a --- /dev/null +++ b/packages/crawler/mq/exec/archiveSnapshots.ts @@ -0,0 +1,40 @@ +import { Job } from "npm:bullmq@5.45.2"; +import { getAllVideosWithoutActiveSnapshotSchedule, scheduleSnapshot } from "db/snapshotSchedule.ts"; +import { withDbConnection } from "db/withConnection.ts"; +import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; +import logger from "log/logger.ts"; +import { lockManager } from "mq/lockManager.ts"; +import { getLatestVideoSnapshot } from "db/snapshot.ts"; +import { HOUR, MINUTE } from "$std/datetime/constants.ts"; + +export const archiveSnapshotsWorker = async (_job: Job) => + await withDbConnection(async (client: Client) => { + const startedAt = Date.now(); + if (await lockManager.isLocked("dispatchArchiveSnapshots")) { + logger.log("dispatchArchiveSnapshots is already running", "mq"); + return; + } + await lockManager.acquireLock("dispatchArchiveSnapshots", 30 * 60); + const aids = await getAllVideosWithoutActiveSnapshotSchedule(client); + for (const rawAid of aids) { + const aid = Number(rawAid); + const latestSnapshot = await getLatestVideoSnapshot(client, aid); + const now = Date.now(); + const lastSnapshotedAt = latestSnapshot?.time ?? now; + const interval = 168; + logger.log( + `Scheduled archive snapshot for aid ${aid} in ${interval} hours.`, + "mq", + "fn:archiveSnapshotsWorker", + ); + const targetTime = lastSnapshotedAt + interval * HOUR; + await scheduleSnapshot(client, aid, "archive", targetTime); + if (now - startedAt > 250 * MINUTE) { + return; + } + } + }, (e) => { + logger.error(e as Error, "mq", "fn:archiveSnapshotsWorker"); + }, async () => { + await lockManager.releaseLock("dispatchArchiveSnapshots"); + }); diff --git a/packages/crawler/mq/exec/executors.ts b/packages/crawler/mq/exec/executors.ts index cddb5e4..70d4191 100644 --- a/packages/crawler/mq/exec/executors.ts +++ b/packages/crawler/mq/exec/executors.ts @@ -1,4 +1,5 @@ export * from "mq/exec/getLatestVideos.ts"; export * from "./getVideoInfo.ts"; export * from "./collectSongs.ts"; -export * from "./takeBulkSnapshot.ts"; \ No newline at end of file +export * from "./takeBulkSnapshot.ts"; +export * from "./archiveSnapshots.ts"; diff --git a/packages/crawler/mq/exec/getLatestVideos.ts b/packages/crawler/mq/exec/getLatestVideos.ts index db04b97..b05d393 100644 --- a/packages/crawler/mq/exec/getLatestVideos.ts +++ b/packages/crawler/mq/exec/getLatestVideos.ts @@ -3,8 +3,7 @@ import { queueLatestVideos } from "mq/task/queueLatestVideo.ts"; import { withDbConnection } from "db/withConnection.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; - export const getLatestVideosWorker = (_job: Job): Promise => withDbConnection(async (client: Client) => { - await queueLatestVideos(client) + await queueLatestVideos(client); }); diff --git a/packages/crawler/mq/exec/getVideoInfo.ts b/packages/crawler/mq/exec/getVideoInfo.ts index 5397faf..2ed2a8a 100644 --- a/packages/crawler/mq/exec/getVideoInfo.ts +++ b/packages/crawler/mq/exec/getVideoInfo.ts @@ -11,5 +11,5 @@ export const getVideoInfoWorker = async (job: Job): Promise => logger.warn("aid does not exists", "mq", "job:getVideoInfo"); return; } - await insertVideoInfo(client, aid) + await insertVideoInfo(client, aid); }); diff --git a/packages/crawler/mq/exec/snapshotTick.ts b/packages/crawler/mq/exec/snapshotTick.ts index 31f2776..1f387eb 100644 --- a/packages/crawler/mq/exec/snapshotTick.ts +++ b/packages/crawler/mq/exec/snapshotTick.ts @@ -4,6 +4,7 @@ import { getLatestVideoSnapshot, getVideosNearMilestone } from "db/snapshot.ts"; import { bulkGetVideosWithoutProcessingSchedules, bulkSetSnapshotStatus, + getAllVideosWithoutActiveSnapshotSchedule, getBulkSnapshotsInNextSecond, getSnapshotsInNextSecond, getVideosWithoutActiveSnapshotSchedule, @@ -47,13 +48,19 @@ export const bulkSnapshotTickWorker = async (_job: Job) => { const filteredAids = await bulkGetVideosWithoutProcessingSchedules(client, aids); if (filteredAids.length === 0) continue; await bulkSetSnapshotStatus(client, filteredAids, "processing"); - const dataMap: { [key: number]: number } = {}; - for (const schedule of group) { - const id = Number(schedule.id); - dataMap[id] = Number(schedule.aid); - } + const schedulesData = group.map((schedule) => { + return { + aid: Number(schedule.aid), + id: Number(schedule.id), + type: schedule.type, + created_at: schedule.created_at, + started_at: schedule.started_at, + finished_at: schedule.finished_at, + status: schedule.status, + }; + }); await SnapshotQueue.add("bulkSnapshotVideo", { - map: dataMap, + schedules: schedulesData, }, { priority: 3 }); } return `OK`; @@ -79,7 +86,7 @@ export const snapshotTickWorker = async (_job: Job) => { const aid = Number(schedule.aid); await setSnapshotStatus(client, schedule.id, "processing"); await SnapshotQueue.add("snapshotVideo", { - aid: aid, + aid: Number(aid), id: Number(schedule.id), type: schedule.type ?? "normal", }, { priority }); @@ -222,7 +229,7 @@ export const takeSnapshotForVideoWorker = async (job: Job) => { "mq", "fn:takeSnapshotForVideoWorker", ); - await setSnapshotStatus(client, id, "completed"); + await setSnapshotStatus(client, id, "no_proxy"); await scheduleSnapshot(client, aid, type, Date.now() + retryInterval); return; } diff --git a/packages/crawler/mq/exec/takeBulkSnapshot.ts b/packages/crawler/mq/exec/takeBulkSnapshot.ts index f32de60..eb27119 100644 --- a/packages/crawler/mq/exec/takeBulkSnapshot.ts +++ b/packages/crawler/mq/exec/takeBulkSnapshot.ts @@ -11,15 +11,17 @@ import logger from "log/logger.ts"; import { NetSchedulerError } from "@core/net/delegate.ts"; import { HOUR, MINUTE, SECOND } from "@std/datetime"; import { getRegularSnapshotInterval } from "../task/regularSnapshotInterval.ts"; +import { SnapshotScheduleType } from "@core/db/schema"; export const takeBulkSnapshotForVideosWorker = async (job: Job) => { - const dataMap: { [key: number]: number } = job.data.map; - const ids = Object.keys(dataMap).map((id) => Number(id)); + const schedules: SnapshotScheduleType[] = job.data.schedules; + const ids = schedules.map((schedule) => Number(schedule.id)); const aidsToFetch: number[] = []; const client = await db.connect(); try { - for (const id of ids) { - const aid = Number(dataMap[id]); + for (const schedule of schedules) { + const aid = Number(schedule.aid); + const id = Number(schedule.id); const exists = await snapshotScheduleExists(client, id); if (!exists) { continue; @@ -43,8 +45,8 @@ export const takeBulkSnapshotForVideosWorker = async (job: Job) => { const shares = stat.share; const favorites = stat.collect; const query: string = ` - INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) `; await client.queryObject( query, @@ -54,7 +56,11 @@ export const takeBulkSnapshotForVideosWorker = async (job: Job) => { logger.log(`Taken snapshot for video ${aid} in bulk.`, "net", "fn:takeBulkSnapshotForVideosWorker"); } await bulkSetSnapshotStatus(client, ids, "completed"); - for (const aid of aidsToFetch) { + + for (const schedule of schedules) { + const aid = Number(schedule.aid); + const type = schedule.type; + if (type == "archive") continue; const interval = await getRegularSnapshotInterval(client, aid); logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq"); await scheduleSnapshot(client, aid, "normal", Date.now() + interval * HOUR); @@ -67,8 +73,8 @@ export const takeBulkSnapshotForVideosWorker = async (job: Job) => { "mq", "fn:takeBulkSnapshotForVideosWorker", ); - await bulkSetSnapshotStatus(client, ids, "completed"); - await bulkScheduleSnapshot(client, aidsToFetch, "normal", Date.now() + 2 * MINUTE); + await bulkSetSnapshotStatus(client, ids, "no_proxy"); + await bulkScheduleSnapshot(client, aidsToFetch, "normal", Date.now() + 20 * MINUTE * Math.random()); return; } logger.error(e as Error, "mq", "fn:takeBulkSnapshotForVideosWorker"); diff --git a/packages/crawler/mq/init.ts b/packages/crawler/mq/init.ts index 4bffb4d..18703b7 100644 --- a/packages/crawler/mq/init.ts +++ b/packages/crawler/mq/init.ts @@ -1,4 +1,4 @@ -import { MINUTE, SECOND } from "@std/datetime"; +import { HOUR, MINUTE, SECOND } from "$std/datetime/constants.ts"; import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "mq/index.ts"; import logger from "log/logger.ts"; import { initSnapshotWindowCounts } from "db/snapshotSchedule.ts"; @@ -55,6 +55,11 @@ export async function initMQ() { immediately: true, }); + await SnapshotQueue.upsertJobScheduler("dispatchArchiveSnapshots", { + every: 6 * HOUR, + immediately: true, + }); + await SnapshotQueue.upsertJobScheduler("scheduleCleanup", { every: 30 * MINUTE, immediately: true, diff --git a/packages/crawler/src/worker.ts b/packages/crawler/src/worker.ts index 9b92913..51a84e1 100644 --- a/packages/crawler/src/worker.ts +++ b/packages/crawler/src/worker.ts @@ -1,5 +1,11 @@ import { ConnectionOptions, Job, Worker } from "bullmq"; -import { collectSongsWorker, getLatestVideosWorker, getVideoInfoWorker } from "mq/exec/executors.ts"; +import { + archiveSnapshotsWorker, + collectSongsWorker, + getLatestVideosWorker, + getVideoInfoWorker, + takeBulkSnapshotForVideosWorker, +} from "mq/exec/executors.ts"; import { redis } from "@core/db/redis.ts"; import logger from "log/logger.ts"; import { lockManager } from "mq/lockManager.ts"; @@ -12,10 +18,22 @@ import { snapshotTickWorker, takeSnapshotForVideoWorker, } from "mq/exec/snapshotTick.ts"; -import {takeBulkSnapshotForVideosWorker} from "mq/exec/executors.ts"; + +const releaseLockForJob = async (name: string) => { + await lockManager.releaseLock(name); + logger.log(`Released lock: ${name}`, "mq"); +}; + +const releaseAllLocks = async () => { + const locks = ["dispatchRegularSnapshots", "dispatchArchiveSnapshots", "getLatestVideos"]; + for (const lock of locks) { + await releaseLockForJob(lock); + } +}; Deno.addSignalListener("SIGINT", async () => { logger.log("SIGINT Received: Shutting down workers...", "mq"); + await releaseAllLocks(); await latestVideoWorker.close(true); await snapshotWorker.close(true); Deno.exit(); @@ -23,6 +41,7 @@ Deno.addSignalListener("SIGINT", async () => { Deno.addSignalListener("SIGTERM", async () => { logger.log("SIGTERM Received: Shutting down workers...", "mq"); + await releaseAllLocks(); await latestVideoWorker.close(true); await snapshotWorker.close(true); Deno.exit(); @@ -59,10 +78,6 @@ latestVideoWorker.on("error", (err) => { logger.error(e.rawError, e.service, e.codePath); }); -latestVideoWorker.on("closed", async () => { - await lockManager.releaseLock("getLatestVideos"); -}); - const snapshotWorker = new Worker( "snapshot", async (job: Job) => { @@ -81,6 +96,8 @@ const snapshotWorker = new Worker( return await takeBulkSnapshotForVideosWorker(job); case "bulkSnapshotTick": return await bulkSnapshotTickWorker(job); + case "dispatchArchiveSnapshots": + return await archiveSnapshotsWorker(job); default: break; } @@ -92,7 +109,3 @@ snapshotWorker.on("error", (err) => { const e = err as WorkerError; logger.error(e.rawError, e.service, e.codePath); }); - -snapshotWorker.on("closed", async () => { - await lockManager.releaseLock("dispatchRegularSnapshots"); -});