diff --git a/deno.json b/deno.json index 4594440..7f56aa2 100644 --- a/deno.json +++ b/deno.json @@ -30,7 +30,8 @@ "$std/": "https://deno.land/std@0.216.0/", "@huggingface/transformers": "npm:@huggingface/transformers@3.0.0", "bullmq": "npm:bullmq", - "lib/": "./lib/" + "lib/": "./lib/", + "ioredis": "npm:ioredis" }, "compilerOptions": { "jsx": "react-jsx", diff --git a/dev.ts b/dev.ts index ae73946..6a7ab02 100755 --- a/dev.ts +++ b/dev.ts @@ -4,5 +4,7 @@ import dev from "$fresh/dev.ts"; import config from "./fresh.config.ts"; import "$std/dotenv/load.ts"; +import { initMQ } from "lib/mq/init.ts"; +await initMQ(); await dev(import.meta.url, "./main.ts", config); diff --git a/lib/db/allData.ts b/lib/db/allData.ts index 1aa2ff4..bdbb334 100644 --- a/lib/db/allData.ts +++ b/lib/db/allData.ts @@ -13,3 +13,14 @@ export async function insertIntoAllData(client: Client, data: AllDataType) { [data.aid, data.bvid, data.description, data.uid, data.tags, data.title, data.published_at], ); } + +export async function getLatestVideoTimestampFromAllData(client: Client) { + return await client.queryObject<{ published_at: string }>("SELECT published_at FROM all_data ORDER BY published_at DESC LIMIT 1") + .then((result) => { + const date = new Date(result.rows[0].published_at); + if (isNaN(date.getTime())) { + return null; + } + return date.getTime(); + }); +} diff --git a/lib/db/init.ts b/lib/db/init.ts new file mode 100644 index 0000000..12e3c4f --- /dev/null +++ b/lib/db/init.ts @@ -0,0 +1,27 @@ +import { Pool } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; + +const requiredEnvVars = ["DB_HOST", "DB_NAME", "DB_USER", "DB_PASSWORD", "DB_PORT"]; + +const unsetVars = requiredEnvVars.filter((key) => Deno.env.get(key) === undefined); + +if (unsetVars.length > 0) { + throw new Error(`Missing required environment variables: ${unsetVars.join(", ")}`); +} + +const databaseHost = Deno.env.get("DB_HOST")!; +const databaseName = Deno.env.get("DB_NAME")!; +const databaseUser = Deno.env.get("DB_USER")!; +const databasePassword = Deno.env.get("DB_PASSWORD")!; +const databasePort = Deno.env.get("DB_PORT")!; + +const postgresConfig = { + hostname: databaseHost, + port: parseInt(databasePort), + database: databaseName, + user: databaseUser, + password: databasePassword, +}; + +const pool = new Pool(postgresConfig, 4); + +export const db = pool; diff --git a/lib/db/redis.ts b/lib/db/redis.ts new file mode 100644 index 0000000..cb7ebb3 --- /dev/null +++ b/lib/db/redis.ts @@ -0,0 +1,4 @@ + +import { Redis } from "ioredis"; + +export const redis = new Redis({ maxRetriesPerRequest: null }); \ No newline at end of file diff --git a/lib/mq/executors.ts b/lib/mq/executors.ts new file mode 100644 index 0000000..ae1a3fd --- /dev/null +++ b/lib/mq/executors.ts @@ -0,0 +1,44 @@ +import { Job } from "bullmq"; +import { redis } from "lib/db/redis.ts"; +import { insertLatestVideos } from "lib/task/insertLatestVideo.ts"; +import MainQueue from "lib/mq/index.ts"; +import { MINUTE, SECOND } from "$std/datetime/constants.ts"; +import { db } from "lib/db/init.ts"; +import { truncate } from "lib/utils/turncate.ts"; +import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; + +const LAST_EXECUTED_KEY = "job:insert-videos:last-executed"; +const DELTA = 15 * SECOND; +const delayMap = [5, 10, 15, 30, 60, 60]; + +const setLastExecutedTimestamp = async () => + await redis.set(LAST_EXECUTED_KEY, Date.now()); + +const addJobToQueue = (failedCount: number, delay: number) => + MainQueue.add("getLatestVideos", { failedCount }, { delay }); + +export const insertVideosWorker = async (job: Job) => { + const failedCount = (job.data.failedCount ?? 0) as number; + const client = await db.connect(); + const lastExecutedTimestamp = Number(await redis.get(LAST_EXECUTED_KEY)); + + if (!lastExecutedTimestamp || isNaN(lastExecutedTimestamp)) { + await executeTask(client, failedCount); + return; + } + + const diff = Date.now() - lastExecutedTimestamp; + if (diff < 5 * MINUTE) { + addJobToQueue(0, diff + DELTA); + return; + } + + await executeTask(client, failedCount); +}; + +const executeTask = async (client: Client, failedCount: number) => { + const result = await insertLatestVideos(client); + await setLastExecutedTimestamp(); + failedCount = result !== 0 ? truncate(failedCount + 1, 0, 5) : 0; + addJobToQueue(failedCount, delayMap[failedCount] * MINUTE); +}; \ No newline at end of file diff --git a/lib/mq/index.ts b/lib/mq/index.ts index df17c36..2858a39 100644 --- a/lib/mq/index.ts +++ b/lib/mq/index.ts @@ -1,2 +1,5 @@ import { Queue } from "bullmq"; +const MainQueue = new Queue("cvsa"); + +export default MainQueue; \ No newline at end of file diff --git a/lib/mq/init.ts b/lib/mq/init.ts new file mode 100644 index 0000000..df01d22 --- /dev/null +++ b/lib/mq/init.ts @@ -0,0 +1,9 @@ +import MainQueue from "lib/mq/index.ts"; + +async function configGetLatestVideos() { + await MainQueue.add("getLatestVideos", {}); +} + +export async function initMQ() { + await configGetLatestVideos(); +} diff --git a/lib/net/bisectVideoStartFrom.ts b/lib/net/bisectVideoStartFrom.ts new file mode 100644 index 0000000..d54cbba --- /dev/null +++ b/lib/net/bisectVideoStartFrom.ts @@ -0,0 +1,60 @@ +import { getLatestVideos } from "lib/net/getLatestVideos.ts"; + +export async function bisectVideoPageInNewList(timestamp: number): Promise { + const pageSize = 50; + + let lowPage = 1; + let highPage = 1; + let foundUpper = false; + while (true) { + const videos = await getLatestVideos(highPage * pageSize, 1, 250, false); + if (!videos || videos.length === 0) { + break; + } + const lastVideo = videos[0]; + if (!lastVideo || !lastVideo.published_at) { + break; + } + const lastTime = Date.parse(lastVideo.published_at); + if (lastTime <= timestamp) { + foundUpper = true; + break; + } else { + lowPage = highPage; + highPage *= 2; + } + } + + if (!foundUpper) { + return null; + } + + let boundaryPage = highPage; + let lo = lowPage; + let hi = highPage; + while (lo <= hi) { + const mid = Math.floor((lo + hi) / 2); + const videos = await getLatestVideos(mid * pageSize, 1, 250, false); + if (!videos) { + return null; + } + if (videos.length === 0) { + hi = mid - 1; + continue; + } + const lastVideo = videos[videos.length - 1]; + if (!lastVideo || !lastVideo.published_at) { + hi = mid - 1; + continue; + } + const lastTime = Date.parse(lastVideo.published_at); + if (lastTime > timestamp) { + lo = mid + 1; + } else { + boundaryPage = mid; + hi = mid - 1; + } + } + + return boundaryPage * pageSize; +} diff --git a/lib/net/getLatestVideos.ts b/lib/net/getLatestVideos.ts index a46b735..3ca0c8b 100644 --- a/lib/net/getLatestVideos.ts +++ b/lib/net/getLatestVideos.ts @@ -4,7 +4,7 @@ import { getVideoTags } from "lib/net/getVideoTags.ts"; import { AllDataType } from "lib/db/schema.d.ts"; import { sleep } from "lib/utils/sleep.ts"; -export async function getLatestVideos(page: number = 1, pageSize: number = 10): Promise { +export async function getLatestVideos(page: number = 1, pageSize: number = 10, sleepRate: number = 250, fetchTags: boolean = true): Promise { try { const response = await fetch(`https://api.bilibili.com/x/web-interface/newlist?rid=30&ps=${pageSize}&pn=${page}`); const data: VideoListResponse = await response.json(); @@ -21,8 +21,11 @@ export async function getLatestVideos(page: number = 1, pageSize: number = 10): const videoPromises = data.data.archives.map(async (video) => { const published_at = formatPublishedAt(video.pubdate + 3600 * 8); - sleep(Math.random() * pageSize * 250); - const tags = await getVideoTags(video.aid); + let tags = null; + if (fetchTags) { + sleep(Math.random() * pageSize * sleepRate); + tags = await getVideoTags(video.aid); + } let processedTags = null; if (tags !== null) { processedTags = tags.join(','); diff --git a/lib/task/insertLatestVideo.ts b/lib/task/insertLatestVideo.ts index 3eaf60b..985bc18 100644 --- a/lib/task/insertLatestVideo.ts +++ b/lib/task/insertLatestVideo.ts @@ -1,47 +1,34 @@ import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { getLatestVideos } from "lib/net/getLatestVideos.ts"; -import { insertIntoAllData, videoExistsInAllData } from "lib/db/allData.ts"; +import { getLatestVideoTimestampFromAllData, insertIntoAllData, videoExistsInAllData } from "lib/db/allData.ts"; import { sleep } from "lib/utils/sleep.ts"; +import { bisectVideoPageInNewList } from "lib/net/bisectVideoStartFrom.ts"; -const requiredEnvVars = ["DB_HOST", "DB_NAME", "DB_USER", "DB_PASSWORD", "DB_PORT"]; - -const unsetVars = requiredEnvVars.filter((key) => Deno.env.get(key) === undefined); - -if (unsetVars.length > 0) { - throw new Error(`Missing required environment variables: ${unsetVars.join(", ")}`); -} - -const databaseHost = Deno.env.get("DB_HOST")!; -const databaseName = Deno.env.get("DB_NAME")!; -const databaseUser = Deno.env.get("DB_USER")!; -const databasePassword = Deno.env.get("DB_PASSWORD")!; -const databasePort = Deno.env.get("DB_PORT")!; - -const postgresConfig = { - hostname: databaseHost, - port: parseInt(databasePort), - database: databaseName, - user: databaseUser, - password: databasePassword, -}; - -async function connectToPostgres() { - const client = new Client(postgresConfig); - await client.connect(); - return client; -} - -export async function insertLatestVideos() { - const client = await connectToPostgres(); - let page = 334; +export async function insertLatestVideos( + client: Client, + pageSize: number = 10, + sleepRate: number = 250, + intervalRate: number = 4000, +): Promise { + const latestVideoTimestamp = await getLatestVideoTimestampFromAllData(client); + if (latestVideoTimestamp == null) { + console.error("Cannot get latest video timestamp from current database."); + return null + } + const videoIndex = await bisectVideoPageInNewList(latestVideoTimestamp); + if (videoIndex == null) { + console.error("Cannot locate the video through bisect."); + return null + } + let page = Math.floor(videoIndex / pageSize) + 1; let failCount = 0; while (true) { try { - const videos = await getLatestVideos(page, 10); + const videos = await getLatestVideos(page, pageSize, sleepRate); if (videos == null) { failCount++; if (failCount > 5) { - break; + return null; } continue; } @@ -49,33 +36,36 @@ export async function insertLatestVideos() { console.warn("No more videos found"); break; } - let allExists = true; + let allNotExists = true; for (const video of videos) { const videoExists = await videoExistsInAllData(client, video.aid); - if (!videoExists) { - allExists = false; + if (videoExists) { + allNotExists = false; + } + else { insertIntoAllData(client, video); } } - if (allExists) { - console.log("All videos already exist in all_data, stop crawling."); - break; + if (allNotExists) { + page++; + console.warn(`All video not exist in the database, going back to older page.`); + continue; } console.log(`Page ${page} crawled, total: ${(page - 1) * 20 + videos.length} videos.`); - page++; + page--; + if (page == 0) { + return 0; + } } catch (error) { console.error(error); failCount++; if (failCount > 5) { - break; + return null; } continue; - } - finally { - await sleep(Math.random() * 4000 + 1000); + } finally { + await sleep(Math.random() * intervalRate + 1000); } } + return 0; } - - -insertLatestVideos(); \ No newline at end of file diff --git a/lib/utils/truncate.ts b/lib/utils/truncate.ts new file mode 100644 index 0000000..677978d --- /dev/null +++ b/lib/utils/truncate.ts @@ -0,0 +1,3 @@ +export function truncate(num: number, min: number, max: number) { + return Math.max(min, Math.min(num, max)) +} \ No newline at end of file diff --git a/test/net/getVideoTags.test.ts b/test/net/getVideoTags.test.ts index dd1f02a..0487dfb 100644 --- a/test/net/getVideoTags.test.ts +++ b/test/net/getVideoTags.test.ts @@ -2,7 +2,7 @@ import { assertEquals } from "jsr:@std/assert"; import { getVideoTags } from "lib/net/getVideoTags.ts"; Deno.test("Get video tags - regular video", async () => { - const tags = (await getVideoTags(826597951)).sort(); + const tags = (await getVideoTags(826597951))!.sort(); assertEquals(tags, [ "纯白P", "中华墨水娘", diff --git a/worker.ts b/worker.ts new file mode 100644 index 0000000..33f6044 --- /dev/null +++ b/worker.ts @@ -0,0 +1,21 @@ +import { Job, Worker } from "bullmq"; +import { insertVideosWorker } from "lib/mq/executors.ts"; +import { redis } from "lib/db/redis.ts"; + +const worker = new Worker( + "cvsa", + async (job: Job) => { + switch (job.name) { + case "getLatestVideos": + await insertVideosWorker(job); + break; + default: + break; + } + }, + { connection: redis, concurrency: 4 }, +); + +worker.on("error", (err) => { + console.error(err); +});