From 4851a69b9101aab399ed5caed62933ca2fe68faa Mon Sep 17 00:00:00 2001 From: alikia2x Date: Wed, 19 Nov 2025 02:28:05 +0800 Subject: [PATCH] add: more new snapshots for new songs, script for fullSnapshot --- .../{crawler => core}/net/getVideoDetails.ts | 0 packages/crawler/mq/exec/classifyVideo.ts | 3 + packages/crawler/mq/exec/getVideoInfo.ts | 2 +- .../app/components/SearchResults.tsx | 2 +- src/fixPubDate.ts | 6 +- src/fullSnapshot.ts | 142 ++++++++++++++++++ 6 files changed, 149 insertions(+), 6 deletions(-) rename packages/{crawler => core}/net/getVideoDetails.ts (100%) create mode 100644 src/fullSnapshot.ts diff --git a/packages/crawler/net/getVideoDetails.ts b/packages/core/net/getVideoDetails.ts similarity index 100% rename from packages/crawler/net/getVideoDetails.ts rename to packages/core/net/getVideoDetails.ts diff --git a/packages/crawler/mq/exec/classifyVideo.ts b/packages/crawler/mq/exec/classifyVideo.ts index c57c332..b361136 100644 --- a/packages/crawler/mq/exec/classifyVideo.ts +++ b/packages/crawler/mq/exec/classifyVideo.ts @@ -35,6 +35,9 @@ export const classifyVideoWorker = async (job: Job) => { const exists = await aidExistsInSongs(sql, aid); if (!exists && label !== 0) { + await scheduleSnapshot(sql, aid, "new", Date.now() + 1.5 * MINUTE, true); + await scheduleSnapshot(sql, aid, "new", Date.now() + 3 * MINUTE, true); + await scheduleSnapshot(sql, aid, "new", Date.now() + 5 * MINUTE, true); await scheduleSnapshot(sql, aid, "new", Date.now() + 10 * MINUTE, true); await insertIntoSongs(aid); } diff --git a/packages/crawler/mq/exec/getVideoInfo.ts b/packages/crawler/mq/exec/getVideoInfo.ts index 7546ec8..7538f3b 100644 --- a/packages/crawler/mq/exec/getVideoInfo.ts +++ b/packages/crawler/mq/exec/getVideoInfo.ts @@ -1,5 +1,5 @@ import { Job } from "bullmq"; -import { getVideoDetails } from "net/getVideoDetails"; +import { getVideoDetails } from "@core/net/getVideoDetails"; import logger from "@core/log"; import { ClassifyVideoQueue, latestVideosEventsProducer } from "mq/index"; import { diff --git a/packages/temp_frontend/app/components/SearchResults.tsx b/packages/temp_frontend/app/components/SearchResults.tsx index e657d93..7ac66e8 100644 --- a/packages/temp_frontend/app/components/SearchResults.tsx +++ b/packages/temp_frontend/app/components/SearchResults.tsx @@ -82,7 +82,7 @@ function SongResult({ result }: { result: Exclude["data"][nu 歌曲封面 )} diff --git a/src/fixPubDate.ts b/src/fixPubDate.ts index 295a016..f6f65ed 100644 --- a/src/fixPubDate.ts +++ b/src/fixPubDate.ts @@ -28,11 +28,9 @@ async function fixTimezoneError() { const candidates = await pg` SELECT aid, published_at FROM - bilibili_metadata + songs WHERE - published_at >= '2025-04-26' - AND published_at <= '2025-06-01' - AND status = 0 + published_at <= '2000-01-01' `; const query = sqlite.query(`SELECT data FROM bili_info_crawl WHERE aid = $aid`); for (const video of candidates) { diff --git a/src/fullSnapshot.ts b/src/fullSnapshot.ts new file mode 100644 index 0000000..904d86f --- /dev/null +++ b/src/fullSnapshot.ts @@ -0,0 +1,142 @@ +import arg from "arg"; +import logger from "@core/log"; +import { Database } from "bun:sqlite"; +import { getVideoDetails } from "@core/net/getVideoDetails"; +import { sql } from "@core/index"; + +const quit = (reason?: string) => { + reason && logger.error(reason); + process.exit(); +}; + +const args = arg({ + "--db": String, + "--aids": String +}); + +const dbPath = args["--db"]; +if (!dbPath) { + quit("Missing --db "); +} + +const sqlite = new Database(dbPath); +const pg = sql; + +const getAids = async () => { + const aidsFile = args["--aids"]; + if (aidsFile) { + return (await Bun.file(aidsFile).text()).split("\n").map(Number); + } + const aids = await sql<{ aid: number }[]>`SELECT aid FROM bilibili_metadata`; + return aids.map((row: any) => row.aid); +}; + +async function addCandidates() { + const aids = await getAids(); + + logger.log(`Retrieved ${aids.length} from production DB.`); + + const existingAids = sqlite + .prepare("SELECT aid FROM bili_info_crawl") + .all() + .map((row: any) => row.aid); + logger.log(`We have ${existingAids.length} from local DB.`); + + const existingAidsSet = new Set(existingAids); + + const newAids = aids.filter((aid) => !existingAidsSet.has(aid)); + + let stmt = ""; + for (const aid of newAids) { + stmt += `INSERT OR IGNORE INTO bili_info_crawl (aid, status) VALUES (${aid}, 'pending');\n`; + } + sqlite.prepare(stmt).run(); + logger.log(`Added ${newAids.length} to local DB.`); +} + +async function insertAidsToDB() { + await addCandidates(); + + const aidsInDB = sqlite + .prepare("SELECT aid FROM bili_info_crawl WHERE status = 'pending'") + .all() + .map((row: any) => row.aid) as number[]; + + const totalAids = aidsInDB.length; + let processedAids = 0; + const startTime = Date.now(); + + const processAid = async (aid: number) => { + try { + const res = await getVideoDetails(aid); + if (res === null) { + updateAidStatus(aid, "failed"); + } else { + updateAidStatus(aid, "success", res.View.bvid, JSON.stringify(res)); + } + } catch (error) { + console.error(`Error updating aid ${aid}: ${error}`); + updateAidStatus(aid, "failed"); + } finally { + processedAids++; + logProgress(aid, processedAids, totalAids, startTime); + } + }; + + const groupSize = 5; + const groups = []; + for (let i = 0; i < totalAids; i += groupSize) { + groups.push(aidsInDB.slice(i, i + groupSize)); + } + + logger.log(`Processing ${totalAids} aids in ${groups.length} groups.`); + + for (const group of groups) { + await Promise.all(group.map((aid) => processAid(aid))); + } +} + +function updateAidStatus(aid: number, status: string, bvid?: string, data?: string) { + const stmt = sqlite.prepare(` + UPDATE bili_info_crawl + SET status = ?, + ${bvid ? "bvid = ?," : ""} + ${data ? "data = ?," : ""} + timestamp = ? + WHERE aid = ? + `); + const params = [ + status, + ...(bvid ? [bvid] : []), + ...(data ? [data] : []), + Date.now() / 1000, + aid + ]; + stmt.run(...params); +} + +function logProgress(aid: number, processedAids: number, totalAids: number, startTime: number) { + const elapsedTime = Date.now() - startTime; + const elapsedSeconds = Math.floor(elapsedTime / 1000); + const elapsedMinutes = Math.floor(elapsedSeconds / 60); + const elapsedHours = Math.floor(elapsedMinutes / 60); + + const remainingAids = totalAids - processedAids; + const averageTimePerAid = elapsedTime / processedAids; + const eta = remainingAids * averageTimePerAid; + const etaSeconds = Math.floor(eta / 1000); + const etaMinutes = Math.floor(etaSeconds / 60); + const etaHours = Math.floor(etaMinutes / 60); + + const progress = `${processedAids}/${totalAids}, ${((processedAids / totalAids) * 100).toFixed( + 2 + )}%, elapsed ${elapsedHours.toString().padStart(2, "0")}:${(elapsedMinutes % 60).toString().padStart(2, "0")}:${( + elapsedSeconds % 60 + ) + .toString() + .padStart(2, "0")}, ETA ${etaHours}h${(etaMinutes % 60).toString().padStart(2, "0")}m`; + logger.log(`Updated aid ${aid}, ${progress}`); +} + +await insertAidsToDB(); +quit();