diff --git a/bullui.ts b/bullui.ts new file mode 100644 index 0000000..321820b --- /dev/null +++ b/bullui.ts @@ -0,0 +1,26 @@ +import express from "express"; +import { createBullBoard } from "@bull-board/api"; +import { BullMQAdapter } from "@bull-board/api/bullMQAdapter.js"; +import { ExpressAdapter } from "@bull-board/express"; + +import MainQueue from "lib/mq/index.ts"; + +const serverAdapter = new ExpressAdapter(); +serverAdapter.setBasePath("/"); + +createBullBoard({ + queues: [new BullMQAdapter(MainQueue)], + serverAdapter: serverAdapter, +}); + +const app = express(); + +app.use("/", serverAdapter.getRouter()); + +// other configurations of your server + +app.listen(3000, () => { + console.log("Running on 3000..."); + console.log("For the UI, open http://localhost:3000/"); + console.log("Make sure Redis is running on port 6379 by default"); +}); diff --git a/deno.json b/deno.json index 7f56aa2..e2af351 100644 --- a/deno.json +++ b/deno.json @@ -9,7 +9,8 @@ "start": "deno run -A --watch=static/,routes/ dev.ts", "build": "deno run -A dev.ts build", "preview": "deno run -A main.ts", - "update": "deno run -A -r https://fresh.deno.dev/update ." + "update": "deno run -A -r https://fresh.deno.dev/update .", + "worker": "deno run worker.ts" }, "lint": { "rules": { @@ -31,7 +32,10 @@ "@huggingface/transformers": "npm:@huggingface/transformers@3.0.0", "bullmq": "npm:bullmq", "lib/": "./lib/", - "ioredis": "npm:ioredis" + "ioredis": "npm:ioredis", + "@bull-board/api": "npm:@bull-board/api", + "@bull-board/express": "npm:@bull-board/express", + "express": "npm:express" }, "compilerOptions": { "jsx": "react-jsx", diff --git a/dev.ts b/dev.ts index 6a7ab02..fd088b1 100755 --- a/dev.ts +++ b/dev.ts @@ -4,7 +4,4 @@ import dev from "$fresh/dev.ts"; import config from "./fresh.config.ts"; import "$std/dotenv/load.ts"; -import { initMQ } from "lib/mq/init.ts"; - -await initMQ(); await dev(import.meta.url, "./main.ts", config); diff --git a/jobAdder.ts b/jobAdder.ts new file mode 100644 index 0000000..001804d --- /dev/null +++ b/jobAdder.ts @@ -0,0 +1,4 @@ +import { initMQ } from "lib/mq/init.ts"; + +await initMQ(); + diff --git a/lib/db/allData.ts b/lib/db/allData.ts index bdbb334..ed82106 100644 --- a/lib/db/allData.ts +++ b/lib/db/allData.ts @@ -7,7 +7,7 @@ export async function videoExistsInAllData(client: Client, aid: number) { } export async function insertIntoAllData(client: Client, data: AllDataType) { - console.log(`inserted ${data.aid}`) + console.log(`[db:all_data] inserted ${data.aid}`) return await client.queryObject( "INSERT INTO all_data (aid, bvid, description, uid, tags, title, published_at) VALUES ($1, $2, $3, $4, $5, $6, $7)", [data.aid, data.bvid, data.description, data.uid, data.tags, data.title, data.published_at], diff --git a/lib/mq/executors.ts b/lib/mq/executors.ts index ae1a3fd..1168b30 100644 --- a/lib/mq/executors.ts +++ b/lib/mq/executors.ts @@ -4,23 +4,33 @@ import { insertLatestVideos } from "lib/task/insertLatestVideo.ts"; import MainQueue from "lib/mq/index.ts"; import { MINUTE, SECOND } from "$std/datetime/constants.ts"; import { db } from "lib/db/init.ts"; -import { truncate } from "lib/utils/turncate.ts"; +import { truncate } from "lib/utils/truncate.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; const LAST_EXECUTED_KEY = "job:insert-videos:last-executed"; const DELTA = 15 * SECOND; const delayMap = [5, 10, 15, 30, 60, 60]; -const setLastExecutedTimestamp = async () => +const setLastExecutedTimestamp = async () => { await redis.set(LAST_EXECUTED_KEY, Date.now()); + console.log(`[redis] job:getLatestVideos last executed timestamp set to ${Date.now()}`); +} -const addJobToQueue = (failedCount: number, delay: number) => - MainQueue.add("getLatestVideos", { failedCount }, { delay }); +const addJobToQueue = async (failedCount: number, delay: number) => { + const job = await MainQueue.getJob("getLatestVideos"); + if (job && job.getState() === 'active') { + console.log(`[bullmq] job:getLatestVideos is already running.`); + return; + } + console.log(`[bullmq] job:getLatestVideos added to queue with delay of ${delay / MINUTE} minutes.`) + MainQueue.add("getLatestVideos", { failedCount }, { delay: delay }) +}; export const insertVideosWorker = async (job: Job) => { const failedCount = (job.data.failedCount ?? 0) as number; const client = await db.connect(); const lastExecutedTimestamp = Number(await redis.get(LAST_EXECUTED_KEY)); + console.log(`[redis] job:getLatestVideos last executed at ${new Date(lastExecutedTimestamp).toISOString()}`) if (!lastExecutedTimestamp || isNaN(lastExecutedTimestamp)) { await executeTask(client, failedCount); @@ -29,7 +39,8 @@ export const insertVideosWorker = async (job: Job) => { const diff = Date.now() - lastExecutedTimestamp; if (diff < 5 * MINUTE) { - addJobToQueue(0, diff + DELTA); + const waitTime = 5 * MINUTE - diff; + await addJobToQueue(0, waitTime + DELTA); return; } @@ -37,8 +48,9 @@ export const insertVideosWorker = async (job: Job) => { }; const executeTask = async (client: Client, failedCount: number) => { + console.log("[task] Executing task:getLatestVideos") const result = await insertLatestVideos(client); - await setLastExecutedTimestamp(); failedCount = result !== 0 ? truncate(failedCount + 1, 0, 5) : 0; - addJobToQueue(failedCount, delayMap[failedCount] * MINUTE); + await setLastExecutedTimestamp(); + await addJobToQueue(failedCount, delayMap[failedCount] * MINUTE); }; \ No newline at end of file diff --git a/lib/mq/init.ts b/lib/mq/init.ts index df01d22..caf8717 100644 --- a/lib/mq/init.ts +++ b/lib/mq/init.ts @@ -5,5 +5,6 @@ async function configGetLatestVideos() { } export async function initMQ() { - await configGetLatestVideos(); + await configGetLatestVideos() + console.log("Message queue initialized.") } diff --git a/lib/net/bisectVideoStartFrom.ts b/lib/net/bisectVideoStartFrom.ts index d54cbba..2dbd206 100644 --- a/lib/net/bisectVideoStartFrom.ts +++ b/lib/net/bisectVideoStartFrom.ts @@ -35,9 +35,9 @@ export async function bisectVideoPageInNewList(timestamp: number): Promise 0) { + for (let i = 0; i < boundaryVideos.length; i++) { + const video = boundaryVideos[i]; + if (!video.published_at) { + continue; + } + const videoTime = Date.parse(video.published_at); + if (videoTime > timestamp) { + indexInPage++; + } else { + break; + } + } + } + + const count = (boundaryPage - 1) * pageSize + indexInPage; + + const safetyMargin = 5; + + return count + safetyMargin; } diff --git a/lib/task/insertLatestVideo.ts b/lib/task/insertLatestVideo.ts index 985bc18..2d286fc 100644 --- a/lib/task/insertLatestVideo.ts +++ b/lib/task/insertLatestVideo.ts @@ -3,6 +3,7 @@ import { getLatestVideos } from "lib/net/getLatestVideos.ts"; import { getLatestVideoTimestampFromAllData, insertIntoAllData, videoExistsInAllData } from "lib/db/allData.ts"; import { sleep } from "lib/utils/sleep.ts"; import { bisectVideoPageInNewList } from "lib/net/bisectVideoStartFrom.ts"; +import { SECOND } from "$std/datetime/constants.ts"; export async function insertLatestVideos( client: Client, @@ -12,16 +13,18 @@ export async function insertLatestVideos( ): Promise { const latestVideoTimestamp = await getLatestVideoTimestampFromAllData(client); if (latestVideoTimestamp == null) { - console.error("Cannot get latest video timestamp from current database."); + console.error("[func:insertLatestVideos] Cannot get latest video timestamp from current database."); return null } + console.log(`[func:insertLatestVideos] Latest video in the database: ${new Date(latestVideoTimestamp).toISOString()}`) const videoIndex = await bisectVideoPageInNewList(latestVideoTimestamp); if (videoIndex == null) { - console.error("Cannot locate the video through bisect."); + console.error("[func:insertLatestVideos] Cannot locate the video through bisect."); return null } let page = Math.floor(videoIndex / pageSize) + 1; let failCount = 0; + const insertedVideos = new Set(); while (true) { try { const videos = await getLatestVideos(page, pageSize, sleepRate); @@ -32,28 +35,21 @@ export async function insertLatestVideos( } continue; } + failCount = 0; if (videos.length == 0) { console.warn("No more videos found"); break; } - let allNotExists = true; for (const video of videos) { const videoExists = await videoExistsInAllData(client, video.aid); - if (videoExists) { - allNotExists = false; - } - else { + if (!videoExists) { insertIntoAllData(client, video); + insertedVideos.add(video.aid); } } - if (allNotExists) { - page++; - console.warn(`All video not exist in the database, going back to older page.`); - continue; - } - console.log(`Page ${page} crawled, total: ${(page - 1) * 20 + videos.length} videos.`); + console.log(`[func:insertLatestVideos] Page ${page} crawled, total: ${insertedVideos.size} videos.`); page--; - if (page == 0) { + if (page < 1) { return 0; } } catch (error) { @@ -64,7 +60,7 @@ export async function insertLatestVideos( } continue; } finally { - await sleep(Math.random() * intervalRate + 1000); + await sleep(Math.random() * intervalRate + failCount * 3 * SECOND + SECOND); } } return 0; diff --git a/worker.ts b/worker.ts index 33f6044..05a7c58 100644 --- a/worker.ts +++ b/worker.ts @@ -13,9 +13,13 @@ const worker = new Worker( break; } }, - { connection: redis, concurrency: 4 }, + { connection: redis }, ); +worker.on("active", () => { + console.log("[bullmq] Worker activated."); +}); + worker.on("error", (err) => { console.error(err); });