fix: incorrectly ignored type when collecting videos for archive snapshots

This commit is contained in:
alikia2x (寒寒) 2025-06-06 16:52:27 +08:00
parent c0340677a1
commit 1e8d28e194
Signed by: alikia2x
GPG Key ID: 56209E0CCD8420C6
10 changed files with 410 additions and 613 deletions

496
bun.lock

File diff suppressed because it is too large Load Diff

View File

@ -10,12 +10,14 @@
"packages/crawler"
],
"dependencies": {
"arg": "^5.0.2",
"postgres": "^3.4.5"
},
"devDependencies": {
"@types/bun": "^1.2.15",
"prettier": "^3.5.3",
"vite-tsconfig-paths": "^5.1.4",
"vitest": "^3.1.2",
"vitest-tsconfig-paths": "^3.4.1",
"prettier": "^3.5.3"
"vitest-tsconfig-paths": "^3.4.1"
}
}

View File

@ -19,8 +19,8 @@ export async function refreshSnapshotWindowCounts(sql: Psql, redisClient: Redis)
const startTime = now.getTime();
const result = await sql<{ window_start: Date; count: number }[]>`
SELECT
date_trunc('hour', started_at) +
SELECT
date_trunc('hour', started_at) +
(EXTRACT(minute FROM started_at)::int / 5 * INTERVAL '5 minutes') AS window_start,
COUNT(*) AS count
FROM snapshot_schedule
@ -56,8 +56,8 @@ async function getWindowCount(redisClient: Redis, offset: number): Promise<numbe
export async function snapshotScheduleExists(sql: Psql, id: number) {
const rows = await sql<{ id: number }[]>`
SELECT id
FROM snapshot_schedule
SELECT id
FROM snapshot_schedule
WHERE id = ${id}
`;
return rows.length > 0;
@ -65,9 +65,9 @@ export async function snapshotScheduleExists(sql: Psql, id: number) {
export async function videoHasActiveScheduleWithType(sql: Psql, aid: number, type: string) {
const rows = await sql<{ status: string }[]>`
SELECT status FROM snapshot_schedule
SELECT status FROM snapshot_schedule
WHERE aid = ${aid}
AND (status = 'pending' OR status = 'processing')
AND (status = 'pending' OR status = 'processing')
AND type = ${type}
`;
return rows.length > 0;
@ -76,7 +76,7 @@ export async function videoHasActiveScheduleWithType(sql: Psql, aid: number, typ
export async function videoHasProcessingSchedule(sql: Psql, aid: number) {
const rows = await sql<{ status: string }[]>`
SELECT status
FROM snapshot_schedule
FROM snapshot_schedule
WHERE aid = ${aid}
AND status = 'processing'
`;
@ -88,7 +88,7 @@ export async function bulkGetVideosWithoutProcessingSchedules(sql: Psql, aids: n
SELECT aid
FROM snapshot_schedule
WHERE aid = ANY(${aids})
AND status != 'processing'
AND status != 'processing'
GROUP BY aid
`;
return rows.map((row) => Number(row.aid));
@ -134,8 +134,8 @@ export async function findSnapshotBefore(sql: Psql, aid: number, targetTime: Dat
export async function hasAtLeast2Snapshots(sql: Psql, aid: number) {
const res = await sql<{ count: number }[]>`
SELECT COUNT(*)
FROM video_snapshot
SELECT COUNT(*)
FROM video_snapshot
WHERE aid = ${aid}
`;
return res[0].count >= 2;
@ -143,10 +143,10 @@ export async function hasAtLeast2Snapshots(sql: Psql, aid: number) {
export async function getLatestSnapshot(sql: Psql, aid: number): Promise<Snapshot | null> {
const res = await sql<{ created_at: string; views: number }[]>`
SELECT created_at, views
FROM video_snapshot
SELECT created_at, views
FROM video_snapshot
WHERE aid = ${aid}
ORDER BY created_at DESC
ORDER BY created_at DESC
LIMIT 1
`;
if (res.length === 0) return null;
@ -209,11 +209,11 @@ export async function scheduleSnapshot(
}
logger.log(`Scheduled snapshot for ${aid} at ${adjustedTime.toISOString()}`, "mq", "fn:scheduleSnapshot");
return sql`
INSERT INTO snapshot_schedule
(aid, type, started_at)
INSERT INTO snapshot_schedule
(aid, type, started_at)
VALUES (
${aid},
${type},
${aid},
${type},
${adjustedTime.toISOString()}
)
`;
@ -331,7 +331,7 @@ export async function getVideosWithoutActiveSnapshotScheduleByType(sql: Psql, ty
const rows = await sql<{ aid: string }[]>`
SELECT s.aid
FROM songs s
LEFT JOIN snapshot_schedule ss ON
LEFT JOIN snapshot_schedule ss ON
s.aid = ss.aid AND
(ss.status = 'pending' OR ss.status = 'processing') AND
ss.type = ${type}
@ -339,13 +339,3 @@ export async function getVideosWithoutActiveSnapshotScheduleByType(sql: Psql, ty
`;
return rows.map((r) => Number(r.aid));
}
export async function getAllVideosWithoutActiveSnapshotSchedule(psql: Psql) {
const rows = await psql<{ aid: number }[]>`
SELECT s.aid
FROM bilibili_metadata s
LEFT JOIN snapshot_schedule ss ON s.aid = ss.aid AND (ss.status = 'pending' OR ss.status = 'processing')
WHERE ss.aid IS NULL
`;
return rows.map((r) => Number(r.aid));
}

View File

@ -1,5 +1,5 @@
import { Job } from "bullmq";
import { getAllVideosWithoutActiveSnapshotSchedule, scheduleSnapshot } from "db/snapshotSchedule.ts";
import { getVideosWithoutActiveSnapshotScheduleByType, scheduleSnapshot } from "db/snapshotSchedule.ts";
import logger from "@core/log/logger.ts";
import { lockManager } from "@core/mq/lockManager.ts";
import { getLatestVideoSnapshot } from "db/snapshot.ts";
@ -31,7 +31,7 @@ export const archiveSnapshotsWorker = async (_job: Job) => {
return;
}
await lockManager.acquireLock("dispatchArchiveSnapshots", 30 * 60);
const aids = await getAllVideosWithoutActiveSnapshotSchedule(sql);
const aids = await getVideosWithoutActiveSnapshotScheduleByType(sql, "archive");
for (const rawAid of aids) {
const aid = Number(rawAid);
const latestSnapshot = await getLatestVideoSnapshot(sql, aid);

View File

@ -1,121 +0,0 @@
import { Database } from "jsr:@db/sqlite@0.12";
import { ensureDir } from "https://deno.land/std@0.113.0/fs/mod.ts";
// 常量定义
const MAX_RETRIES = 3;
const API_URL = "https://api.bilibili.com/x/web-interface/newlist?rid=30&ps=50&pn=";
const DATABASE_PATH = "./data/main.db";
const LOG_DIR = "./logs/bili-info-crawl";
const LOG_FILE = `${LOG_DIR}/run-${Date.now() / 1000}.log`;
// 打开数据库
const db = new Database(DATABASE_PATH, { int64: true });
// 设置日志
async function setupLogging() {
await ensureDir(LOG_DIR);
const logStream = await Deno.open(LOG_FILE, {
write: true,
create: true,
append: true,
});
const redirectConsole =
// deno-lint-ignore no-explicit-any
(originalConsole: (...args: any[]) => void) =>
// deno-lint-ignore no-explicit-any
(...args: any[]) => {
const message = args.map((
arg,
) => (typeof arg === "object" ? JSON.stringify(arg) : arg)).join(" ");
originalConsole(message);
logStream.write(new TextEncoder().encode(message + "\n"));
};
console.log = redirectConsole(console.log);
console.error = redirectConsole(console.error);
console.warn = redirectConsole(console.warn);
}
interface Metadata {
key: string;
value: string;
}
// 获取最后一次更新的时间
function getLastUpdate(): Date {
const result = db.prepare(
"SELECT value FROM metadata WHERE key = 'fetchAid-lastUpdate'",
).get() as Metadata;
return result ? new Date(result.value as string) : new Date(0);
}
// 更新最后更新时间
function updateLastUpdate() {
const now = new Date().toISOString();
db.prepare("UPDATE metadata SET value = ? WHERE key = 'fetchAid-lastUpdate'")
.run(now);
}
// 辅助函数:获取数据
// deno-lint-ignore no-explicit-any
async function fetchData(pn: number, retries = MAX_RETRIES): Promise<any> {
try {
const response = await fetch(`${API_URL}${pn}`);
if (!response.ok) throw new Error(`HTTP error! status: ${response.status}`);
return await response.json();
} catch (error) {
if (retries > 0) {
await new Promise((resolve) => setTimeout(resolve, 1000));
return fetchData(pn, retries - 1);
}
throw error;
}
}
// 插入 aid 到数据库
function insertAid(aid: number) {
db.prepare(
"INSERT OR IGNORE INTO bili_info_crawl (aid, status) VALUES (?, 'pending')",
).run(aid);
}
// 主函数
async function main() {
await setupLogging();
let pn = 1;
let shouldContinue = true;
const lastUpdate = getLastUpdate();
while (shouldContinue) {
try {
const data = await fetchData(pn);
const archives = data.data.archives;
for (const archive of archives) {
const pubTime = new Date(archive.pubdate * 1000);
if (pubTime > lastUpdate) {
insertAid(archive.aid);
} else {
shouldContinue = false;
break;
}
}
pn++;
console.log(`Fetched page ${pn}`);
} catch (error) {
console.error(`Error fetching data for pn=${pn}: ${error}`);
}
}
// 更新最后更新时间
updateLastUpdate();
// 关闭数据库
db.close();
}
// 运行主函数
main().catch(console.error);

View File

@ -1,223 +0,0 @@
import path from "node:path";
import { Database } from "jsr:@db/sqlite@0.12";
import { getBiliBiliVideoInfo } from "./videoInfo.ts";
import { ensureDir } from "https://deno.land/std@0.113.0/fs/mod.ts";
const aidPath = "./data/2025010104_c30_aids.txt";
const db = new Database("./data/main.db", { int64: true });
const regions = [
"shanghai",
"hangzhou",
"qingdao",
"beijing",
"zhangjiakou",
"chengdu",
"shenzhen",
"hohhot",
];
const logDir = "./logs/bili-info-crawl";
const logFile = path.join(logDir, `run-${Date.now() / 1000}.log`);
const shouldReadTextFile = false;
const SECOND = 1000;
const SECONDS = SECOND;
const MINUTE = 60 * SECONDS;
const MINUTES = MINUTE;
const IPs = regions.length;
const rateLimits = [
{ window: 5 * MINUTES, maxRequests: 160 * IPs },
{ window: 30 * SECONDS, maxRequests: 20 * IPs },
{ window: 1.2 * SECOND, maxRequests: 1 * IPs },
];
const requestQueue: number[] = [];
async function setupLogging() {
await ensureDir(logDir);
const logStream = await Deno.open(logFile, {
write: true,
create: true,
append: true,
});
const redirectConsole =
// deno-lint-ignore no-explicit-any
(originalConsole: (...args: any[]) => void) =>
// deno-lint-ignore no-explicit-any
(...args: any[]) => {
const message = args.map((
arg,
) => (typeof arg === "object" ? JSON.stringify(arg) : arg)).join(" ");
originalConsole(message);
logStream.write(new TextEncoder().encode(message + "\n"));
};
console.log = redirectConsole(console.log);
console.error = redirectConsole(console.error);
console.warn = redirectConsole(console.warn);
}
function isRateLimited(): boolean {
const now = Date.now();
return rateLimits.some(({ window, maxRequests }) => {
const windowStart = now - window;
const requestsInWindow = requestQueue.filter((timestamp) => timestamp >= windowStart).length;
return requestsInWindow >= maxRequests;
});
}
async function readFromText() {
const aidRawcontent = await Deno.readTextFile(aidPath);
const aids = aidRawcontent
.split("\n")
.filter((line) => line.length > 0)
.map((line) => parseInt(line));
// if (!db.prepare("SELECT COUNT(*) FROM bili_info_crawl").get()) {
// const insertStmt = db.prepare("INSERT OR IGNORE INTO bili_info_crawl (aid, status) VALUES (?, 'pending')");
// aids.forEach((aid) => insertStmt.run(aid));
// }
// 查询数据库中已经存在的 aid
const existingAids = db
.prepare("SELECT aid FROM bili_info_crawl")
.all()
.map((row) => row.aid);
console.log(existingAids.length);
// 将 existingAids 转换为 Set 以提高查找效率
const existingAidsSet = new Set(existingAids);
// 找出 aids 数组中不存在于数据库的条目
const newAids = aids.filter((aid) => !existingAidsSet.has(aid));
// 插入这些新条目
const insertStmt = db.prepare(
"INSERT OR IGNORE INTO bili_info_crawl (aid, status) VALUES (?, 'pending')",
);
newAids.forEach((aid) => insertStmt.run(aid));
}
async function insertAidsToDB() {
if (shouldReadTextFile) {
await readFromText();
}
const aidsInDB = db
.prepare(
"SELECT aid FROM bili_info_crawl WHERE status = 'pending' OR status = 'failed'",
)
.all()
.map((row) => row.aid) as number[];
const totalAids = aidsInDB.length;
let processedAids = 0;
const startTime = Date.now();
const processAid = async (aid: number) => {
try {
const res = await getBiliBiliVideoInfo(
aid,
regions[processedAids % regions.length],
);
if (res === null) {
updateAidStatus(aid, "failed");
} else {
const rawData = JSON.parse(res);
if (rawData.code === 0) {
updateAidStatus(
aid,
"success",
rawData.data.View.bvid,
JSON.stringify(rawData.data),
);
} else {
updateAidStatus(aid, "error", undefined, res);
}
}
} catch (error) {
console.error(`Error updating aid ${aid}: ${error}`);
updateAidStatus(aid, "failed");
} finally {
processedAids++;
logProgress(aid, processedAids, totalAids, startTime);
}
};
const interval = setInterval(async () => {
if (aidsInDB.length === 0) {
clearInterval(interval);
console.log("All aids processed.");
return;
}
if (!isRateLimited()) {
const aid = aidsInDB.shift();
if (aid !== undefined) {
requestQueue.push(Date.now());
await processAid(aid);
}
}
}, 50);
console.log("Starting to process aids...");
}
function updateAidStatus(
aid: number,
status: string,
bvid?: string,
data?: string,
) {
const stmt = db.prepare(`
UPDATE bili_info_crawl
SET status = ?,
${bvid ? "bvid = ?," : ""}
${data ? "data = ?," : ""}
timestamp = ?
WHERE aid = ?
`);
const params = [
status,
...(bvid ? [bvid] : []),
...(data ? [data] : []),
Date.now() / 1000,
aid,
];
stmt.run(...params);
}
function logProgress(
aid: number,
processedAids: number,
totalAids: number,
startTime: number,
) {
const elapsedTime = Date.now() - startTime;
const elapsedSeconds = Math.floor(elapsedTime / 1000);
const elapsedMinutes = Math.floor(elapsedSeconds / 60);
const elapsedHours = Math.floor(elapsedMinutes / 60);
const remainingAids = totalAids - processedAids;
const averageTimePerAid = elapsedTime / processedAids;
const eta = remainingAids * averageTimePerAid;
const etaSeconds = Math.floor(eta / 1000);
const etaMinutes = Math.floor(etaSeconds / 60);
const etaHours = Math.floor(etaMinutes / 60);
const progress = `${processedAids}/${totalAids}, ${
((processedAids / totalAids) * 100).toFixed(
2,
)
}%, elapsed ${elapsedHours.toString().padStart(2, "0")}:${(elapsedMinutes % 60).toString().padStart(2, "0")}:${
(
elapsedSeconds % 60
)
.toString()
.padStart(2, "0")
}, ETA ${etaHours}h${(etaMinutes % 60).toString().padStart(2, "0")}m`;
console.log(`Updated aid ${aid}, ${progress}`);
}
await setupLogging();
insertAidsToDB();

View File

@ -1,60 +0,0 @@
export async function getBiliBiliVideoInfo(
bvidORaid?: string | number,
region: string = "hangzhou",
) {
const bvid = typeof bvidORaid === "string" ? bvidORaid : undefined;
const aid = typeof bvidORaid === "number" ? bvidORaid : undefined;
const baseURL = "https://api.bilibili.com/x/web-interface/view/detail";
const urlObject = new URL(baseURL);
if (aid) {
urlObject.searchParams.append("aid", aid.toString());
const finalURL = urlObject.toString();
return await proxyRequestWithRegion(finalURL, region);
} else if (bvid) {
urlObject.searchParams.append("bvid", bvid);
const finalURL = urlObject.toString();
return await proxyRequestWithRegion(finalURL, region);
} else {
return null;
}
}
async function proxyRequestWithRegion(
url: string,
region: string,
): Promise<any | null> {
const td = new TextDecoder();
// aliyun configure set --access-key-id $ALIYUN_AK --access-key-secret $ALIYUN_SK --region cn-shenzhen --profile CVSA-shenzhen --mode AK
const p = await new Deno.Command("aliyun", {
args: [
"fc",
"POST",
`/2023-03-30/functions/proxy-${region}/invocations`,
"--qualifier",
"LATEST",
"--header",
"Content-Type=application/json;x-fc-invocation-type=Sync;x-fc-log-type=None;",
"--body",
JSON.stringify({ url: url }),
"--profile",
`CVSA-${region}`,
],
}).output();
try {
const out = td.decode(p.stdout);
const rawData = JSON.parse(out);
if (rawData.statusCode !== 200) {
console.error(
`Error proxying request ${url} to ${region} , statusCode: ${rawData.statusCode}`,
);
return null;
} else {
return JSON.parse(rawData.body);
}
} catch (e) {
console.error(`Error proxying request ${url} to ${region}: ${e}`);
return null;
}
}

32
src/metadataArchive.ts Normal file
View File

@ -0,0 +1,32 @@
import arg from "arg";
//import { getVideoDetails } from "@crawler/net/getVideoDetails";
import logger from "@core/log/logger";
const quit = (reason: string) => {
logger.error(reason);
process.exit();
};
const args = arg({
"--aids": String // --port <number> or --port=<number>
});
const aidsFileName = args["--aids"];
if (!aidsFileName) {
quit("Missing --aids <file_path>");
}
const aidsFile = Bun.file(aidsFileName!);
const fileExists = await aidsFile.exists();
if (!fileExists) {
quit(`${aidsFile} does not exist.`);
}
const aidsText = await aidsFile.text();
const aids = aidsText
.split("\n")
.map((line) => parseInt(line))
.filter((num) => !Number.isNaN(num));
logger.log(`Read ${aids.length} aids.`);

35
tsconfig.json Normal file
View File

@ -0,0 +1,35 @@
{
"include": ["**/*.ts"],
"compilerOptions": {
"baseUrl": ".",
"paths": {
"@core/*": ["./packages/core/*"],
"@crawler/*": ["./packages/crawler/*"]
},
// Environment setup & latest features
"lib": ["ESNext"],
"target": "ESNext",
"module": "Preserve",
"moduleDetection": "force",
"jsx": "react-jsx",
"allowJs": true,
// Bundler mode
"moduleResolution": "bundler",
"allowImportingTsExtensions": true,
"verbatimModuleSyntax": true,
"noEmit": true,
// Best practices
"strict": true,
"skipLibCheck": true,
"noFallthroughCasesInSwitch": true,
"noUncheckedIndexedAccess": true,
"noImplicitOverride": true,
// Some stricter flags (disabled by default)
"noUnusedLocals": false,
"noUnusedParameters": false,
"noPropertyAccessFromIndexSignature": false
}
}