fix: job won't actually execute

This commit is contained in:
alikia2x (寒寒) 2025-02-09 02:26:52 +08:00
parent 0c589b5d20
commit b4421a3b18
Signed by: alikia2x
GPG Key ID: 56209E0CCD8420C6
10 changed files with 99 additions and 34 deletions

26
bullui.ts Normal file
View File

@ -0,0 +1,26 @@
import express from "express";
import { createBullBoard } from "@bull-board/api";
import { BullMQAdapter } from "@bull-board/api/bullMQAdapter.js";
import { ExpressAdapter } from "@bull-board/express";
import MainQueue from "lib/mq/index.ts";
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath("/");
createBullBoard({
queues: [new BullMQAdapter(MainQueue)],
serverAdapter: serverAdapter,
});
const app = express();
app.use("/", serverAdapter.getRouter());
// other configurations of your server
app.listen(3000, () => {
console.log("Running on 3000...");
console.log("For the UI, open http://localhost:3000/");
console.log("Make sure Redis is running on port 6379 by default");
});

View File

@ -9,7 +9,8 @@
"start": "deno run -A --watch=static/,routes/ dev.ts", "start": "deno run -A --watch=static/,routes/ dev.ts",
"build": "deno run -A dev.ts build", "build": "deno run -A dev.ts build",
"preview": "deno run -A main.ts", "preview": "deno run -A main.ts",
"update": "deno run -A -r https://fresh.deno.dev/update ." "update": "deno run -A -r https://fresh.deno.dev/update .",
"worker": "deno run worker.ts"
}, },
"lint": { "lint": {
"rules": { "rules": {
@ -31,7 +32,10 @@
"@huggingface/transformers": "npm:@huggingface/transformers@3.0.0", "@huggingface/transformers": "npm:@huggingface/transformers@3.0.0",
"bullmq": "npm:bullmq", "bullmq": "npm:bullmq",
"lib/": "./lib/", "lib/": "./lib/",
"ioredis": "npm:ioredis" "ioredis": "npm:ioredis",
"@bull-board/api": "npm:@bull-board/api",
"@bull-board/express": "npm:@bull-board/express",
"express": "npm:express"
}, },
"compilerOptions": { "compilerOptions": {
"jsx": "react-jsx", "jsx": "react-jsx",

3
dev.ts
View File

@ -4,7 +4,4 @@ import dev from "$fresh/dev.ts";
import config from "./fresh.config.ts"; import config from "./fresh.config.ts";
import "$std/dotenv/load.ts"; import "$std/dotenv/load.ts";
import { initMQ } from "lib/mq/init.ts";
await initMQ();
await dev(import.meta.url, "./main.ts", config); await dev(import.meta.url, "./main.ts", config);

4
jobAdder.ts Normal file
View File

@ -0,0 +1,4 @@
import { initMQ } from "lib/mq/init.ts";
await initMQ();

View File

@ -7,7 +7,7 @@ export async function videoExistsInAllData(client: Client, aid: number) {
} }
export async function insertIntoAllData(client: Client, data: AllDataType) { export async function insertIntoAllData(client: Client, data: AllDataType) {
console.log(`inserted ${data.aid}`) console.log(`[db:all_data] inserted ${data.aid}`)
return await client.queryObject( return await client.queryObject(
"INSERT INTO all_data (aid, bvid, description, uid, tags, title, published_at) VALUES ($1, $2, $3, $4, $5, $6, $7)", "INSERT INTO all_data (aid, bvid, description, uid, tags, title, published_at) VALUES ($1, $2, $3, $4, $5, $6, $7)",
[data.aid, data.bvid, data.description, data.uid, data.tags, data.title, data.published_at], [data.aid, data.bvid, data.description, data.uid, data.tags, data.title, data.published_at],

View File

@ -4,23 +4,33 @@ import { insertLatestVideos } from "lib/task/insertLatestVideo.ts";
import MainQueue from "lib/mq/index.ts"; import MainQueue from "lib/mq/index.ts";
import { MINUTE, SECOND } from "$std/datetime/constants.ts"; import { MINUTE, SECOND } from "$std/datetime/constants.ts";
import { db } from "lib/db/init.ts"; import { db } from "lib/db/init.ts";
import { truncate } from "lib/utils/turncate.ts"; import { truncate } from "lib/utils/truncate.ts";
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
const LAST_EXECUTED_KEY = "job:insert-videos:last-executed"; const LAST_EXECUTED_KEY = "job:insert-videos:last-executed";
const DELTA = 15 * SECOND; const DELTA = 15 * SECOND;
const delayMap = [5, 10, 15, 30, 60, 60]; const delayMap = [5, 10, 15, 30, 60, 60];
const setLastExecutedTimestamp = async () => const setLastExecutedTimestamp = async () => {
await redis.set(LAST_EXECUTED_KEY, Date.now()); await redis.set(LAST_EXECUTED_KEY, Date.now());
console.log(`[redis] job:getLatestVideos last executed timestamp set to ${Date.now()}`);
}
const addJobToQueue = (failedCount: number, delay: number) => const addJobToQueue = async (failedCount: number, delay: number) => {
MainQueue.add("getLatestVideos", { failedCount }, { delay }); const job = await MainQueue.getJob("getLatestVideos");
if (job && job.getState() === 'active') {
console.log(`[bullmq] job:getLatestVideos is already running.`);
return;
}
console.log(`[bullmq] job:getLatestVideos added to queue with delay of ${delay / MINUTE} minutes.`)
MainQueue.add("getLatestVideos", { failedCount }, { delay: delay })
};
export const insertVideosWorker = async (job: Job) => { export const insertVideosWorker = async (job: Job) => {
const failedCount = (job.data.failedCount ?? 0) as number; const failedCount = (job.data.failedCount ?? 0) as number;
const client = await db.connect(); const client = await db.connect();
const lastExecutedTimestamp = Number(await redis.get(LAST_EXECUTED_KEY)); const lastExecutedTimestamp = Number(await redis.get(LAST_EXECUTED_KEY));
console.log(`[redis] job:getLatestVideos last executed at ${new Date(lastExecutedTimestamp).toISOString()}`)
if (!lastExecutedTimestamp || isNaN(lastExecutedTimestamp)) { if (!lastExecutedTimestamp || isNaN(lastExecutedTimestamp)) {
await executeTask(client, failedCount); await executeTask(client, failedCount);
@ -29,7 +39,8 @@ export const insertVideosWorker = async (job: Job) => {
const diff = Date.now() - lastExecutedTimestamp; const diff = Date.now() - lastExecutedTimestamp;
if (diff < 5 * MINUTE) { if (diff < 5 * MINUTE) {
addJobToQueue(0, diff + DELTA); const waitTime = 5 * MINUTE - diff;
await addJobToQueue(0, waitTime + DELTA);
return; return;
} }
@ -37,8 +48,9 @@ export const insertVideosWorker = async (job: Job) => {
}; };
const executeTask = async (client: Client, failedCount: number) => { const executeTask = async (client: Client, failedCount: number) => {
console.log("[task] Executing task:getLatestVideos")
const result = await insertLatestVideos(client); const result = await insertLatestVideos(client);
await setLastExecutedTimestamp();
failedCount = result !== 0 ? truncate(failedCount + 1, 0, 5) : 0; failedCount = result !== 0 ? truncate(failedCount + 1, 0, 5) : 0;
addJobToQueue(failedCount, delayMap[failedCount] * MINUTE); await setLastExecutedTimestamp();
await addJobToQueue(failedCount, delayMap[failedCount] * MINUTE);
}; };

View File

@ -5,5 +5,6 @@ async function configGetLatestVideos() {
} }
export async function initMQ() { export async function initMQ() {
await configGetLatestVideos(); await configGetLatestVideos()
console.log("Message queue initialized.")
} }

View File

@ -35,9 +35,9 @@ export async function bisectVideoPageInNewList(timestamp: number): Promise<numbe
while (lo <= hi) { while (lo <= hi) {
const mid = Math.floor((lo + hi) / 2); const mid = Math.floor((lo + hi) / 2);
const videos = await getLatestVideos(mid * pageSize, 1, 250, false); const videos = await getLatestVideos(mid * pageSize, 1, 250, false);
if (!videos) { if (!videos) {
return null; return null;
} }
if (videos.length === 0) { if (videos.length === 0) {
hi = mid - 1; hi = mid - 1;
continue; continue;
@ -56,5 +56,26 @@ export async function bisectVideoPageInNewList(timestamp: number): Promise<numbe
} }
} }
return boundaryPage * pageSize; const boundaryVideos = await getLatestVideos(boundaryPage, pageSize, 250, false);
let indexInPage = 0;
if (boundaryVideos && boundaryVideos.length > 0) {
for (let i = 0; i < boundaryVideos.length; i++) {
const video = boundaryVideos[i];
if (!video.published_at) {
continue;
}
const videoTime = Date.parse(video.published_at);
if (videoTime > timestamp) {
indexInPage++;
} else {
break;
}
}
}
const count = (boundaryPage - 1) * pageSize + indexInPage;
const safetyMargin = 5;
return count + safetyMargin;
} }

View File

@ -3,6 +3,7 @@ import { getLatestVideos } from "lib/net/getLatestVideos.ts";
import { getLatestVideoTimestampFromAllData, insertIntoAllData, videoExistsInAllData } from "lib/db/allData.ts"; import { getLatestVideoTimestampFromAllData, insertIntoAllData, videoExistsInAllData } from "lib/db/allData.ts";
import { sleep } from "lib/utils/sleep.ts"; import { sleep } from "lib/utils/sleep.ts";
import { bisectVideoPageInNewList } from "lib/net/bisectVideoStartFrom.ts"; import { bisectVideoPageInNewList } from "lib/net/bisectVideoStartFrom.ts";
import { SECOND } from "$std/datetime/constants.ts";
export async function insertLatestVideos( export async function insertLatestVideos(
client: Client, client: Client,
@ -12,16 +13,18 @@ export async function insertLatestVideos(
): Promise<number | null> { ): Promise<number | null> {
const latestVideoTimestamp = await getLatestVideoTimestampFromAllData(client); const latestVideoTimestamp = await getLatestVideoTimestampFromAllData(client);
if (latestVideoTimestamp == null) { if (latestVideoTimestamp == null) {
console.error("Cannot get latest video timestamp from current database."); console.error("[func:insertLatestVideos] Cannot get latest video timestamp from current database.");
return null return null
} }
console.log(`[func:insertLatestVideos] Latest video in the database: ${new Date(latestVideoTimestamp).toISOString()}`)
const videoIndex = await bisectVideoPageInNewList(latestVideoTimestamp); const videoIndex = await bisectVideoPageInNewList(latestVideoTimestamp);
if (videoIndex == null) { if (videoIndex == null) {
console.error("Cannot locate the video through bisect."); console.error("[func:insertLatestVideos] Cannot locate the video through bisect.");
return null return null
} }
let page = Math.floor(videoIndex / pageSize) + 1; let page = Math.floor(videoIndex / pageSize) + 1;
let failCount = 0; let failCount = 0;
const insertedVideos = new Set();
while (true) { while (true) {
try { try {
const videos = await getLatestVideos(page, pageSize, sleepRate); const videos = await getLatestVideos(page, pageSize, sleepRate);
@ -32,28 +35,21 @@ export async function insertLatestVideos(
} }
continue; continue;
} }
failCount = 0;
if (videos.length == 0) { if (videos.length == 0) {
console.warn("No more videos found"); console.warn("No more videos found");
break; break;
} }
let allNotExists = true;
for (const video of videos) { for (const video of videos) {
const videoExists = await videoExistsInAllData(client, video.aid); const videoExists = await videoExistsInAllData(client, video.aid);
if (videoExists) { if (!videoExists) {
allNotExists = false;
}
else {
insertIntoAllData(client, video); insertIntoAllData(client, video);
insertedVideos.add(video.aid);
} }
} }
if (allNotExists) { console.log(`[func:insertLatestVideos] Page ${page} crawled, total: ${insertedVideos.size} videos.`);
page++;
console.warn(`All video not exist in the database, going back to older page.`);
continue;
}
console.log(`Page ${page} crawled, total: ${(page - 1) * 20 + videos.length} videos.`);
page--; page--;
if (page == 0) { if (page < 1) {
return 0; return 0;
} }
} catch (error) { } catch (error) {
@ -64,7 +60,7 @@ export async function insertLatestVideos(
} }
continue; continue;
} finally { } finally {
await sleep(Math.random() * intervalRate + 1000); await sleep(Math.random() * intervalRate + failCount * 3 * SECOND + SECOND);
} }
} }
return 0; return 0;

View File

@ -13,9 +13,13 @@ const worker = new Worker(
break; break;
} }
}, },
{ connection: redis, concurrency: 4 }, { connection: redis },
); );
worker.on("active", () => {
console.log("[bullmq] Worker activated.");
});
worker.on("error", (err) => { worker.on("error", (err) => {
console.error(err); console.error(err);
}); });