update: the video list fetching workflow

This commit is contained in:
alikia2x (寒寒) 2025-02-08 23:15:47 +08:00
parent b21e6da07a
commit 0c589b5d20
Signed by: alikia2x
GPG Key ID: 56209E0CCD8420C6
14 changed files with 231 additions and 53 deletions

View File

@ -30,7 +30,8 @@
"$std/": "https://deno.land/std@0.216.0/",
"@huggingface/transformers": "npm:@huggingface/transformers@3.0.0",
"bullmq": "npm:bullmq",
"lib/": "./lib/"
"lib/": "./lib/",
"ioredis": "npm:ioredis"
},
"compilerOptions": {
"jsx": "react-jsx",

2
dev.ts
View File

@ -4,5 +4,7 @@ import dev from "$fresh/dev.ts";
import config from "./fresh.config.ts";
import "$std/dotenv/load.ts";
import { initMQ } from "lib/mq/init.ts";
await initMQ();
await dev(import.meta.url, "./main.ts", config);

View File

@ -13,3 +13,14 @@ export async function insertIntoAllData(client: Client, data: AllDataType) {
[data.aid, data.bvid, data.description, data.uid, data.tags, data.title, data.published_at],
);
}
export async function getLatestVideoTimestampFromAllData(client: Client) {
return await client.queryObject<{ published_at: string }>("SELECT published_at FROM all_data ORDER BY published_at DESC LIMIT 1")
.then((result) => {
const date = new Date(result.rows[0].published_at);
if (isNaN(date.getTime())) {
return null;
}
return date.getTime();
});
}

27
lib/db/init.ts Normal file
View File

@ -0,0 +1,27 @@
import { Pool } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
const requiredEnvVars = ["DB_HOST", "DB_NAME", "DB_USER", "DB_PASSWORD", "DB_PORT"];
const unsetVars = requiredEnvVars.filter((key) => Deno.env.get(key) === undefined);
if (unsetVars.length > 0) {
throw new Error(`Missing required environment variables: ${unsetVars.join(", ")}`);
}
const databaseHost = Deno.env.get("DB_HOST")!;
const databaseName = Deno.env.get("DB_NAME")!;
const databaseUser = Deno.env.get("DB_USER")!;
const databasePassword = Deno.env.get("DB_PASSWORD")!;
const databasePort = Deno.env.get("DB_PORT")!;
const postgresConfig = {
hostname: databaseHost,
port: parseInt(databasePort),
database: databaseName,
user: databaseUser,
password: databasePassword,
};
const pool = new Pool(postgresConfig, 4);
export const db = pool;

4
lib/db/redis.ts Normal file
View File

@ -0,0 +1,4 @@
import { Redis } from "ioredis";
export const redis = new Redis({ maxRetriesPerRequest: null });

44
lib/mq/executors.ts Normal file
View File

@ -0,0 +1,44 @@
import { Job } from "bullmq";
import { redis } from "lib/db/redis.ts";
import { insertLatestVideos } from "lib/task/insertLatestVideo.ts";
import MainQueue from "lib/mq/index.ts";
import { MINUTE, SECOND } from "$std/datetime/constants.ts";
import { db } from "lib/db/init.ts";
import { truncate } from "lib/utils/turncate.ts";
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
const LAST_EXECUTED_KEY = "job:insert-videos:last-executed";
const DELTA = 15 * SECOND;
const delayMap = [5, 10, 15, 30, 60, 60];
const setLastExecutedTimestamp = async () =>
await redis.set(LAST_EXECUTED_KEY, Date.now());
const addJobToQueue = (failedCount: number, delay: number) =>
MainQueue.add("getLatestVideos", { failedCount }, { delay });
export const insertVideosWorker = async (job: Job) => {
const failedCount = (job.data.failedCount ?? 0) as number;
const client = await db.connect();
const lastExecutedTimestamp = Number(await redis.get(LAST_EXECUTED_KEY));
if (!lastExecutedTimestamp || isNaN(lastExecutedTimestamp)) {
await executeTask(client, failedCount);
return;
}
const diff = Date.now() - lastExecutedTimestamp;
if (diff < 5 * MINUTE) {
addJobToQueue(0, diff + DELTA);
return;
}
await executeTask(client, failedCount);
};
const executeTask = async (client: Client, failedCount: number) => {
const result = await insertLatestVideos(client);
await setLastExecutedTimestamp();
failedCount = result !== 0 ? truncate(failedCount + 1, 0, 5) : 0;
addJobToQueue(failedCount, delayMap[failedCount] * MINUTE);
};

View File

@ -1,2 +1,5 @@
import { Queue } from "bullmq";
const MainQueue = new Queue("cvsa");
export default MainQueue;

9
lib/mq/init.ts Normal file
View File

@ -0,0 +1,9 @@
import MainQueue from "lib/mq/index.ts";
async function configGetLatestVideos() {
await MainQueue.add("getLatestVideos", {});
}
export async function initMQ() {
await configGetLatestVideos();
}

View File

@ -0,0 +1,60 @@
import { getLatestVideos } from "lib/net/getLatestVideos.ts";
export async function bisectVideoPageInNewList(timestamp: number): Promise<number | null> {
const pageSize = 50;
let lowPage = 1;
let highPage = 1;
let foundUpper = false;
while (true) {
const videos = await getLatestVideos(highPage * pageSize, 1, 250, false);
if (!videos || videos.length === 0) {
break;
}
const lastVideo = videos[0];
if (!lastVideo || !lastVideo.published_at) {
break;
}
const lastTime = Date.parse(lastVideo.published_at);
if (lastTime <= timestamp) {
foundUpper = true;
break;
} else {
lowPage = highPage;
highPage *= 2;
}
}
if (!foundUpper) {
return null;
}
let boundaryPage = highPage;
let lo = lowPage;
let hi = highPage;
while (lo <= hi) {
const mid = Math.floor((lo + hi) / 2);
const videos = await getLatestVideos(mid * pageSize, 1, 250, false);
if (!videos) {
return null;
}
if (videos.length === 0) {
hi = mid - 1;
continue;
}
const lastVideo = videos[videos.length - 1];
if (!lastVideo || !lastVideo.published_at) {
hi = mid - 1;
continue;
}
const lastTime = Date.parse(lastVideo.published_at);
if (lastTime > timestamp) {
lo = mid + 1;
} else {
boundaryPage = mid;
hi = mid - 1;
}
}
return boundaryPage * pageSize;
}

View File

@ -4,7 +4,7 @@ import { getVideoTags } from "lib/net/getVideoTags.ts";
import { AllDataType } from "lib/db/schema.d.ts";
import { sleep } from "lib/utils/sleep.ts";
export async function getLatestVideos(page: number = 1, pageSize: number = 10): Promise<AllDataType[] | null> {
export async function getLatestVideos(page: number = 1, pageSize: number = 10, sleepRate: number = 250, fetchTags: boolean = true): Promise<AllDataType[] | null> {
try {
const response = await fetch(`https://api.bilibili.com/x/web-interface/newlist?rid=30&ps=${pageSize}&pn=${page}`);
const data: VideoListResponse = await response.json();
@ -21,8 +21,11 @@ export async function getLatestVideos(page: number = 1, pageSize: number = 10):
const videoPromises = data.data.archives.map(async (video) => {
const published_at = formatPublishedAt(video.pubdate + 3600 * 8);
sleep(Math.random() * pageSize * 250);
const tags = await getVideoTags(video.aid);
let tags = null;
if (fetchTags) {
sleep(Math.random() * pageSize * sleepRate);
tags = await getVideoTags(video.aid);
}
let processedTags = null;
if (tags !== null) {
processedTags = tags.join(',');

View File

@ -1,47 +1,34 @@
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { getLatestVideos } from "lib/net/getLatestVideos.ts";
import { insertIntoAllData, videoExistsInAllData } from "lib/db/allData.ts";
import { getLatestVideoTimestampFromAllData, insertIntoAllData, videoExistsInAllData } from "lib/db/allData.ts";
import { sleep } from "lib/utils/sleep.ts";
import { bisectVideoPageInNewList } from "lib/net/bisectVideoStartFrom.ts";
const requiredEnvVars = ["DB_HOST", "DB_NAME", "DB_USER", "DB_PASSWORD", "DB_PORT"];
const unsetVars = requiredEnvVars.filter((key) => Deno.env.get(key) === undefined);
if (unsetVars.length > 0) {
throw new Error(`Missing required environment variables: ${unsetVars.join(", ")}`);
export async function insertLatestVideos(
client: Client,
pageSize: number = 10,
sleepRate: number = 250,
intervalRate: number = 4000,
): Promise<number | null> {
const latestVideoTimestamp = await getLatestVideoTimestampFromAllData(client);
if (latestVideoTimestamp == null) {
console.error("Cannot get latest video timestamp from current database.");
return null
}
const databaseHost = Deno.env.get("DB_HOST")!;
const databaseName = Deno.env.get("DB_NAME")!;
const databaseUser = Deno.env.get("DB_USER")!;
const databasePassword = Deno.env.get("DB_PASSWORD")!;
const databasePort = Deno.env.get("DB_PORT")!;
const postgresConfig = {
hostname: databaseHost,
port: parseInt(databasePort),
database: databaseName,
user: databaseUser,
password: databasePassword,
};
async function connectToPostgres() {
const client = new Client(postgresConfig);
await client.connect();
return client;
const videoIndex = await bisectVideoPageInNewList(latestVideoTimestamp);
if (videoIndex == null) {
console.error("Cannot locate the video through bisect.");
return null
}
export async function insertLatestVideos() {
const client = await connectToPostgres();
let page = 334;
let page = Math.floor(videoIndex / pageSize) + 1;
let failCount = 0;
while (true) {
try {
const videos = await getLatestVideos(page, 10);
const videos = await getLatestVideos(page, pageSize, sleepRate);
if (videos == null) {
failCount++;
if (failCount > 5) {
break;
return null;
}
continue;
}
@ -49,33 +36,36 @@ export async function insertLatestVideos() {
console.warn("No more videos found");
break;
}
let allExists = true;
let allNotExists = true;
for (const video of videos) {
const videoExists = await videoExistsInAllData(client, video.aid);
if (!videoExists) {
allExists = false;
if (videoExists) {
allNotExists = false;
}
else {
insertIntoAllData(client, video);
}
}
if (allExists) {
console.log("All videos already exist in all_data, stop crawling.");
break;
if (allNotExists) {
page++;
console.warn(`All video not exist in the database, going back to older page.`);
continue;
}
console.log(`Page ${page} crawled, total: ${(page - 1) * 20 + videos.length} videos.`);
page++;
page--;
if (page == 0) {
return 0;
}
} catch (error) {
console.error(error);
failCount++;
if (failCount > 5) {
break;
return null;
}
continue;
}
finally {
await sleep(Math.random() * 4000 + 1000);
} finally {
await sleep(Math.random() * intervalRate + 1000);
}
}
return 0;
}
insertLatestVideos();

3
lib/utils/truncate.ts Normal file
View File

@ -0,0 +1,3 @@
export function truncate(num: number, min: number, max: number) {
return Math.max(min, Math.min(num, max))
}

View File

@ -2,7 +2,7 @@ import { assertEquals } from "jsr:@std/assert";
import { getVideoTags } from "lib/net/getVideoTags.ts";
Deno.test("Get video tags - regular video", async () => {
const tags = (await getVideoTags(826597951)).sort();
const tags = (await getVideoTags(826597951))!.sort();
assertEquals(tags, [
"纯白P",
"中华墨水娘",

21
worker.ts Normal file
View File

@ -0,0 +1,21 @@
import { Job, Worker } from "bullmq";
import { insertVideosWorker } from "lib/mq/executors.ts";
import { redis } from "lib/db/redis.ts";
const worker = new Worker(
"cvsa",
async (job: Job) => {
switch (job.name) {
case "getLatestVideos":
await insertVideosWorker(job);
break;
default:
break;
}
},
{ connection: redis, concurrency: 4 },
);
worker.on("error", (err) => {
console.error(err);
});