diff --git a/.idea/deno.xml b/.idea/deno.xml index 2e4b145..decfd9a 100644 --- a/.idea/deno.xml +++ b/.idea/deno.xml @@ -1,6 +1,6 @@ - \ No newline at end of file diff --git a/packages/backend/videoInfo.ts b/packages/backend/videoInfo.ts index 6ea974c..fd8bf89 100644 --- a/packages/backend/videoInfo.ts +++ b/packages/backend/videoInfo.ts @@ -10,7 +10,6 @@ import type { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import type { BlankEnv, BlankInput } from "hono/types"; import type { VideoInfoData } from "@core/net/bilibili.d.ts"; - const redis = new Redis({ maxRetriesPerRequest: null }); const CACHE_EXPIRATION_SECONDS = 60; @@ -39,7 +38,6 @@ async function insertVideoSnapshot(client: Client, data: VideoInfoData) { logger.log(`Inserted into snapshot for video ${aid} by videoInfo API.`, "api", "fn:insertVideoSnapshot"); } - export const videoInfoHandler = createHandlers(async (c: ContextType) => { const client = c.get("db"); try { @@ -83,4 +81,4 @@ export const videoInfoHandler = createHandlers(async (c: ContextType) => { return c.json({ message: "Unhandled error", error: e }, 500); } } -}); \ No newline at end of file +}); diff --git a/packages/crawler/db/snapshotSchedule.ts b/packages/crawler/db/snapshotSchedule.ts index fa56f44..d6050ac 100644 --- a/packages/crawler/db/snapshotSchedule.ts +++ b/packages/crawler/db/snapshotSchedule.ts @@ -1,8 +1,8 @@ import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { SnapshotScheduleType } from "@core/db/schema"; import logger from "log/logger.ts"; -import { MINUTE } from "$std/datetime/constants.ts"; -import { redis } from "../../core/db/redis.ts"; +import { MINUTE } from "@std/datetime"; +import { redis } from "@core/db/redis.ts"; import { Redis } from "ioredis"; const REDIS_KEY = "cvsa:snapshot_window_counts"; diff --git a/packages/crawler/deno.json b/packages/crawler/deno.json index 7ceed53..dee1698 100644 --- a/packages/crawler/deno.json +++ b/packages/crawler/deno.json @@ -23,6 +23,7 @@ "imports": { "@std/assert": "jsr:@std/assert@1", "$std/": "https://deno.land/std@0.216.0/", + "@std/datetime": "jsr:@std/datetime@^0.225.4", "@huggingface/transformers": "npm:@huggingface/transformers@3.0.0", "bullmq": "npm:bullmq", "mq/": "./mq/", diff --git a/packages/crawler/mq/exec/classifyVideo.ts b/packages/crawler/mq/exec/classifyVideo.ts index c813b7b..f4c1f43 100644 --- a/packages/crawler/mq/exec/classifyVideo.ts +++ b/packages/crawler/mq/exec/classifyVideo.ts @@ -8,7 +8,7 @@ import { lockManager } from "mq/lockManager.ts"; import { aidExistsInSongs } from "db/songs.ts"; import { insertIntoSongs } from "mq/task/collectSongs.ts"; import { scheduleSnapshot } from "db/snapshotSchedule.ts"; -import { MINUTE } from "$std/datetime/constants.ts"; +import { MINUTE } from "@std/datetime"; export const classifyVideoWorker = async (job: Job) => { const client = await db.connect(); diff --git a/packages/crawler/mq/exec/collectSongs.ts b/packages/crawler/mq/exec/collectSongs.ts new file mode 100644 index 0000000..dc059fc --- /dev/null +++ b/packages/crawler/mq/exec/collectSongs.ts @@ -0,0 +1,12 @@ +import { Job } from "npm:bullmq@5.45.2"; +import { db } from "db/init.ts"; +import { collectSongs } from "mq/task/collectSongs.ts"; + +export const collectSongsWorker = async (_job: Job): Promise => { + const client = await db.connect(); + try { + await collectSongs(client); + } finally { + client.release(); + } +}; diff --git a/packages/crawler/mq/exec/executors.ts b/packages/crawler/mq/exec/executors.ts new file mode 100644 index 0000000..f3fe95b --- /dev/null +++ b/packages/crawler/mq/exec/executors.ts @@ -0,0 +1,3 @@ +export * from "mq/exec/getLatestVideos.ts"; +export * from "./getVideoInfo.ts"; +export * from "./collectSongs.ts"; diff --git a/packages/crawler/mq/exec/getLatestVideos.ts b/packages/crawler/mq/exec/getLatestVideos.ts index 7a19738..4112ecd 100644 --- a/packages/crawler/mq/exec/getLatestVideos.ts +++ b/packages/crawler/mq/exec/getLatestVideos.ts @@ -1,8 +1,6 @@ import { Job } from "bullmq"; import { queueLatestVideos } from "mq/task/queueLatestVideo.ts"; import { db } from "db/init.ts"; -import { insertVideoInfo } from "mq/task/getVideoDetails.ts"; -import { collectSongs } from "mq/task/collectSongs.ts"; export const getLatestVideosWorker = async (_job: Job): Promise => { const client = await db.connect(); @@ -12,26 +10,3 @@ export const getLatestVideosWorker = async (_job: Job): Promise => { client.release(); } }; - -export const collectSongsWorker = async (_job: Job): Promise => { - const client = await db.connect(); - try { - await collectSongs(client); - } finally { - client.release(); - } -}; - -export const getVideoInfoWorker = async (job: Job): Promise => { - const client = await db.connect(); - try { - const aid = job.data.aid; - if (!aid) { - return 3; - } - await insertVideoInfo(client, aid); - return 0; - } finally { - client.release(); - } -}; diff --git a/packages/crawler/mq/exec/getVideoInfo.ts b/packages/crawler/mq/exec/getVideoInfo.ts new file mode 100644 index 0000000..3bdf038 --- /dev/null +++ b/packages/crawler/mq/exec/getVideoInfo.ts @@ -0,0 +1,17 @@ +import { Job } from "npm:bullmq@5.45.2"; +import { db } from "db/init.ts"; +import { insertVideoInfo } from "mq/task/getVideoDetails.ts"; + +export const getVideoInfoWorker = async (job: Job): Promise => { + const client = await db.connect(); + try { + const aid = job.data.aid; + if (!aid) { + return 3; + } + await insertVideoInfo(client, aid); + return 0; + } finally { + client.release(); + } +}; diff --git a/packages/crawler/mq/exec/snapshotTick.ts b/packages/crawler/mq/exec/snapshotTick.ts index 1c15bc4..6b8365a 100644 --- a/packages/crawler/mq/exec/snapshotTick.ts +++ b/packages/crawler/mq/exec/snapshotTick.ts @@ -3,12 +3,8 @@ import { db } from "db/init.ts"; import { getLatestVideoSnapshot, getVideosNearMilestone } from "db/snapshot.ts"; import { bulkGetVideosWithoutProcessingSchedules, - bulkScheduleSnapshot, bulkSetSnapshotStatus, - findClosestSnapshot, - findSnapshotBefore, getBulkSnapshotsInNextSecond, - getLatestSnapshot, getSnapshotsInNextSecond, getVideosWithoutActiveSnapshotSchedule, scheduleSnapshot, @@ -16,8 +12,7 @@ import { snapshotScheduleExists, videoHasProcessingSchedule, } from "db/snapshotSchedule.ts"; -import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; -import { HOUR, MINUTE, SECOND, WEEK } from "$std/datetime/constants.ts"; +import { HOUR, MINUTE, SECOND, WEEK } from "@std/datetime"; import logger from "log/logger.ts"; import { SnapshotQueue } from "mq/index.ts"; import { insertVideoSnapshot } from "mq/task/getVideoStats.ts"; @@ -26,8 +21,8 @@ import { getBiliVideoStatus, setBiliVideoStatus } from "db/allData.ts"; import { truncate } from "utils/truncate.ts"; import { lockManager } from "mq/lockManager.ts"; import { getSongsPublihsedAt } from "db/songs.ts"; -import { bulkGetVideoStats } from "net/bulkGetVideoStats.ts"; import { getAdjustedShortTermETA } from "../scheduling.ts"; +import { getRegularSnapshotInterval } from "../task/regularSnapshotInterval.ts"; const priorityMap: { [key: string]: number } = { "milestone": 1, @@ -61,7 +56,7 @@ export const bulkSnapshotTickWorker = async (_job: Job) => { map: dataMap, }, { priority: 3 }); } - return `OK` + return `OK`; } catch (e) { logger.error(e as Error); } finally { @@ -127,25 +122,6 @@ export const collectMilestoneSnapshotsWorker = async (_job: Job) => { } }; -const getRegularSnapshotInterval = async (client: Client, aid: number) => { - const now = Date.now(); - const date = new Date(now - 24 * HOUR); - let oldSnapshot = await findSnapshotBefore(client, aid, date); - if (!oldSnapshot) oldSnapshot = await findClosestSnapshot(client, aid, date); - const latestSnapshot = await getLatestSnapshot(client, aid); - if (!oldSnapshot || !latestSnapshot) return 0; - if (oldSnapshot.created_at === latestSnapshot.created_at) return 0; - const hoursDiff = (latestSnapshot.created_at - oldSnapshot.created_at) / HOUR; - if (hoursDiff < 8) return 24; - const viewsDiff = latestSnapshot.views - oldSnapshot.views; - if (viewsDiff === 0) return 72; - const speedPerDay = viewsDiff / (hoursDiff + 0.001) * 24; - if (speedPerDay < 6) return 36; - if (speedPerDay < 120) return 24; - if (speedPerDay < 320) return 12; - return 6; -}; - export const regularSnapshotsWorker = async (_job: Job) => { const client = await db.connect(); const startedAt = Date.now(); @@ -178,72 +154,6 @@ export const regularSnapshotsWorker = async (_job: Job) => { } }; -export const takeBulkSnapshotForVideosWorker = async (job: Job) => { - const dataMap: { [key: number]: number } = job.data.map; - const ids = Object.keys(dataMap).map((id) => Number(id)); - const aidsToFetch: number[] = []; - const client = await db.connect(); - try { - for (const id of ids) { - const aid = Number(dataMap[id]); - const exists = await snapshotScheduleExists(client, id); - if (!exists) { - continue; - } - aidsToFetch.push(aid); - } - const data = await bulkGetVideoStats(aidsToFetch); - if (typeof data === "number") { - await bulkSetSnapshotStatus(client, ids, "failed"); - await bulkScheduleSnapshot(client, aidsToFetch, "normal", Date.now() + 15 * SECOND); - return `GET_BILI_STATUS_${data}`; - } - for (const video of data) { - const aid = video.id; - const stat = video.cnt_info; - const views = stat.play; - const danmakus = stat.danmaku; - const replies = stat.reply; - const likes = stat.thumb_up; - const coins = stat.coin; - const shares = stat.share; - const favorites = stat.collect; - const query: string = ` - INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8) - `; - await client.queryObject( - query, - [aid, views, danmakus, replies, likes, coins, shares, favorites], - ); - - logger.log(`Taken snapshot for video ${aid} in bulk.`, "net", "fn:takeBulkSnapshotForVideosWorker"); - } - await bulkSetSnapshotStatus(client, ids, "completed"); - for (const aid of aidsToFetch) { - const interval = await getRegularSnapshotInterval(client, aid); - logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq"); - await scheduleSnapshot(client, aid, "normal", Date.now() + interval * HOUR); - } - return `DONE`; - } catch (e) { - if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") { - logger.warn( - `No available proxy for bulk request now.`, - "mq", - "fn:takeBulkSnapshotForVideosWorker", - ); - await bulkSetSnapshotStatus(client, ids, "completed"); - await bulkScheduleSnapshot(client, aidsToFetch, "normal", Date.now() + 2 * MINUTE); - return; - } - logger.error(e as Error, "mq", "fn:takeBulkSnapshotForVideosWorker"); - await bulkSetSnapshotStatus(client, ids, "failed"); - } finally { - client.release(); - } -}; - export const takeSnapshotForVideoWorker = async (job: Job) => { const id = job.data.id; const aid = Number(job.data.aid); diff --git a/packages/crawler/mq/exec/takeBulkSnapshot.ts b/packages/crawler/mq/exec/takeBulkSnapshot.ts new file mode 100644 index 0000000..f32de60 --- /dev/null +++ b/packages/crawler/mq/exec/takeBulkSnapshot.ts @@ -0,0 +1,79 @@ +import { Job } from "npm:bullmq@5.45.2"; +import { db } from "db/init.ts"; +import { + bulkScheduleSnapshot, + bulkSetSnapshotStatus, + scheduleSnapshot, + snapshotScheduleExists, +} from "db/snapshotSchedule.ts"; +import { bulkGetVideoStats } from "net/bulkGetVideoStats.ts"; +import logger from "log/logger.ts"; +import { NetSchedulerError } from "@core/net/delegate.ts"; +import { HOUR, MINUTE, SECOND } from "@std/datetime"; +import { getRegularSnapshotInterval } from "../task/regularSnapshotInterval.ts"; + +export const takeBulkSnapshotForVideosWorker = async (job: Job) => { + const dataMap: { [key: number]: number } = job.data.map; + const ids = Object.keys(dataMap).map((id) => Number(id)); + const aidsToFetch: number[] = []; + const client = await db.connect(); + try { + for (const id of ids) { + const aid = Number(dataMap[id]); + const exists = await snapshotScheduleExists(client, id); + if (!exists) { + continue; + } + aidsToFetch.push(aid); + } + const data = await bulkGetVideoStats(aidsToFetch); + if (typeof data === "number") { + await bulkSetSnapshotStatus(client, ids, "failed"); + await bulkScheduleSnapshot(client, aidsToFetch, "normal", Date.now() + 15 * SECOND); + return `GET_BILI_STATUS_${data}`; + } + for (const video of data) { + const aid = video.id; + const stat = video.cnt_info; + const views = stat.play; + const danmakus = stat.danmaku; + const replies = stat.reply; + const likes = stat.thumb_up; + const coins = stat.coin; + const shares = stat.share; + const favorites = stat.collect; + const query: string = ` + INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + `; + await client.queryObject( + query, + [aid, views, danmakus, replies, likes, coins, shares, favorites], + ); + + logger.log(`Taken snapshot for video ${aid} in bulk.`, "net", "fn:takeBulkSnapshotForVideosWorker"); + } + await bulkSetSnapshotStatus(client, ids, "completed"); + for (const aid of aidsToFetch) { + const interval = await getRegularSnapshotInterval(client, aid); + logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq"); + await scheduleSnapshot(client, aid, "normal", Date.now() + interval * HOUR); + } + return `DONE`; + } catch (e) { + if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") { + logger.warn( + `No available proxy for bulk request now.`, + "mq", + "fn:takeBulkSnapshotForVideosWorker", + ); + await bulkSetSnapshotStatus(client, ids, "completed"); + await bulkScheduleSnapshot(client, aidsToFetch, "normal", Date.now() + 2 * MINUTE); + return; + } + logger.error(e as Error, "mq", "fn:takeBulkSnapshotForVideosWorker"); + await bulkSetSnapshotStatus(client, ids, "failed"); + } finally { + client.release(); + } +}; diff --git a/packages/crawler/mq/executors.ts b/packages/crawler/mq/executors.ts deleted file mode 100644 index 1e486e1..0000000 --- a/packages/crawler/mq/executors.ts +++ /dev/null @@ -1 +0,0 @@ -export * from "mq/exec/getLatestVideos.ts"; diff --git a/packages/crawler/mq/init.ts b/packages/crawler/mq/init.ts index 9db7afc..4bffb4d 100644 --- a/packages/crawler/mq/init.ts +++ b/packages/crawler/mq/init.ts @@ -1,4 +1,4 @@ -import { MINUTE, SECOND } from "$std/datetime/constants.ts"; +import { MINUTE, SECOND } from "@std/datetime"; import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "mq/index.ts"; import logger from "log/logger.ts"; import { initSnapshotWindowCounts } from "db/snapshotSchedule.ts"; diff --git a/packages/crawler/mq/scheduling.ts b/packages/crawler/mq/scheduling.ts index e2f59e0..554874d 100644 --- a/packages/crawler/mq/scheduling.ts +++ b/packages/crawler/mq/scheduling.ts @@ -8,7 +8,7 @@ import { findClosestSnapshot, getLatestSnapshot, hasAtLeast2Snapshots } from "db import { truncate } from "utils/truncate.ts"; import { closetMilestone } from "./exec/snapshotTick.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; -import { HOUR, MINUTE } from "$std/datetime/constants.ts"; +import { HOUR, MINUTE } from "@std/datetime"; const log = (value: number, base: number = 10) => Math.log(value) / Math.log(base); diff --git a/packages/crawler/mq/task/collectSongs.ts b/packages/crawler/mq/task/collectSongs.ts index 389ca06..ca186f2 100644 --- a/packages/crawler/mq/task/collectSongs.ts +++ b/packages/crawler/mq/task/collectSongs.ts @@ -2,7 +2,7 @@ import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { aidExistsInSongs, getNotCollectedSongs } from "db/songs.ts"; import logger from "log/logger.ts"; import { scheduleSnapshot } from "db/snapshotSchedule.ts"; -import { MINUTE } from "$std/datetime/constants.ts"; +import { MINUTE } from "@std/datetime"; export async function collectSongs(client: Client) { const aids = await getNotCollectedSongs(client); diff --git a/packages/crawler/mq/task/getVideoDetails.ts b/packages/crawler/mq/task/getVideoDetails.ts index fa5dd2f..72addf5 100644 --- a/packages/crawler/mq/task/getVideoDetails.ts +++ b/packages/crawler/mq/task/getVideoDetails.ts @@ -4,7 +4,7 @@ import { formatTimestampToPsql } from "utils/formatTimestampToPostgre.ts"; import logger from "log/logger.ts"; import { ClassifyVideoQueue } from "mq/index.ts"; import { userExistsInBiliUsers, videoExistsInAllData } from "db/allData.ts"; -import { HOUR, SECOND } from "$std/datetime/constants.ts"; +import { HOUR, SECOND } from "@std/datetime"; export async function insertVideoInfo(client: Client, aid: number) { const videoExists = await videoExistsInAllData(client, aid); diff --git a/packages/crawler/mq/task/queueLatestVideo.ts b/packages/crawler/mq/task/queueLatestVideo.ts index d8b3993..47f8bcc 100644 --- a/packages/crawler/mq/task/queueLatestVideo.ts +++ b/packages/crawler/mq/task/queueLatestVideo.ts @@ -2,7 +2,7 @@ import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { getLatestVideoAids } from "net/getLatestVideoAids.ts"; import { videoExistsInAllData } from "db/allData.ts"; import { sleep } from "utils/sleep.ts"; -import { SECOND } from "$std/datetime/constants.ts"; +import { SECOND } from "@std/datetime"; import logger from "log/logger.ts"; import { LatestVideosQueue } from "mq/index.ts"; diff --git a/packages/crawler/mq/task/regularSnapshotInterval.ts b/packages/crawler/mq/task/regularSnapshotInterval.ts new file mode 100644 index 0000000..84d4b0b --- /dev/null +++ b/packages/crawler/mq/task/regularSnapshotInterval.ts @@ -0,0 +1,22 @@ +import { findClosestSnapshot, findSnapshotBefore, getLatestSnapshot } from "db/snapshotSchedule.ts"; +import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; +import { HOUR } from "@std/datetime"; + +export const getRegularSnapshotInterval = async (client: Client, aid: number) => { + const now = Date.now(); + const date = new Date(now - 24 * HOUR); + let oldSnapshot = await findSnapshotBefore(client, aid, date); + if (!oldSnapshot) oldSnapshot = await findClosestSnapshot(client, aid, date); + const latestSnapshot = await getLatestSnapshot(client, aid); + if (!oldSnapshot || !latestSnapshot) return 0; + if (oldSnapshot.created_at === latestSnapshot.created_at) return 0; + const hoursDiff = (latestSnapshot.created_at - oldSnapshot.created_at) / HOUR; + if (hoursDiff < 8) return 24; + const viewsDiff = latestSnapshot.views - oldSnapshot.views; + if (viewsDiff === 0) return 72; + const speedPerDay = viewsDiff / (hoursDiff + 0.001) * 24; + if (speedPerDay < 6) return 36; + if (speedPerDay < 120) return 24; + if (speedPerDay < 320) return 12; + return 6; +}; diff --git a/packages/crawler/net/getVideoInfo.ts b/packages/crawler/net/getVideoInfo.ts index 227559d..ea87918 100644 --- a/packages/crawler/net/getVideoInfo.ts +++ b/packages/crawler/net/getVideoInfo.ts @@ -48,4 +48,4 @@ export async function getVideoInfoByBV(bvid: string, task: string): Promise { logger.log("SIGINT Received: Shutting down workers...", "mq");