From d870627220542c1a9ded7d9b0d359b9f538a1f1b Mon Sep 17 00:00:00 2001 From: alikia2x Date: Wed, 12 Feb 2025 00:18:56 +0800 Subject: [PATCH] update: added tags fetching --- deno.json | 12 +-- lib/db/allData.ts | 23 ++++++ lib/mq/exec/getLatestVideos.ts | 14 +++- lib/mq/exec/getVideoTags.ts | 109 +++++++++++++++++++------- lib/mq/index.ts | 4 +- lib/mq/init.ts | 22 ++++-- lib/mq/lockManager.ts | 57 ++++++++++++++ lib/mq/scheduler.ts | 8 +- lib/net/getLatestVideos.ts | 70 +++++++++-------- lib/net/getVideoTags.ts | 2 +- lib/utils/formatTimestampToPostgre.ts | 12 ++- bullui.ts => src/bullui.ts | 5 +- jobAdder.ts => src/jobAdder.ts | 1 - src/worker.ts | 68 ++++++++++++++++ worker.ts | 26 ------ 15 files changed, 316 insertions(+), 117 deletions(-) create mode 100644 lib/mq/lockManager.ts rename bullui.ts => src/bullui.ts (80%) rename jobAdder.ts => src/jobAdder.ts (98%) create mode 100644 src/worker.ts delete mode 100644 worker.ts diff --git a/deno.json b/deno.json index 82bd6ac..b69c87d 100644 --- a/deno.json +++ b/deno.json @@ -10,9 +10,9 @@ "build": "deno run -A dev.ts build", "preview": "deno run -A main.ts", "update": "deno run -A -r https://fresh.deno.dev/update .", - "worker": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write worker.ts", - "adder": "deno run --allow-env --allow-read --allow-ffi --allow-net jobAdder.ts", - "bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net bullui.ts", + "worker": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/worker.ts", + "adder": "deno run --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts", + "bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts", "all": "concurrently 'deno task start' 'deno task worker' 'deno task adder' 'deno task bullui'", "test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run" }, @@ -39,7 +39,8 @@ "ioredis": "npm:ioredis", "@bull-board/api": "npm:@bull-board/api", "@bull-board/express": "npm:@bull-board/express", - "express": "npm:express" + "express": "npm:express", + "src/": "./src/" }, "compilerOptions": { "jsx": "react-jsx", @@ -50,6 +51,7 @@ "useTabs": true, "lineWidth": 120, "indentWidth": 4, - "semiColons": true + "semiColons": true, + "proseWrap": "always" } } diff --git a/lib/db/allData.ts b/lib/db/allData.ts index 9ec5659..92c225b 100644 --- a/lib/db/allData.ts +++ b/lib/db/allData.ts @@ -1,6 +1,7 @@ import { Client, Transaction } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { AllDataType } from "lib/db/schema.d.ts"; import logger from "lib/log/logger.ts"; +import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts"; export async function videoExistsInAllData(client: Client, aid: number) { return await client.queryObject<{ exists: boolean }>(`SELECT EXISTS(SELECT 1 FROM all_data WHERE aid = $1)`, [aid]) @@ -36,3 +37,25 @@ export async function videoTagsIsNull(client: Client | Transaction, aid: number) [aid], ).then((result) => result.rows[0].exists); } + +export async function updateVideoTags(client: Client | Transaction, aid: number, tags: string[]) { + return await client.queryObject( + `UPDATE all_data SET tags = $1 WHERE aid = $2`, + [tags.join(","), aid], + ); +} + +export async function getNullVideoTagsList(client: Client) { + const queryResult = await client.queryObject<{ aid: number; published_at: string }>( + `SELECT aid, published_at FROM all_data WHERE tags IS NULL`, + ); + const rows = queryResult.rows; + return rows.map( + (row) => { + return { + aid: Number(row.aid), + published_at: parseTimestampFromPsql(row.published_at), + }; + }, + ); +} diff --git a/lib/mq/exec/getLatestVideos.ts b/lib/mq/exec/getLatestVideos.ts index 3098721..08bad1c 100644 --- a/lib/mq/exec/getLatestVideos.ts +++ b/lib/mq/exec/getLatestVideos.ts @@ -1,17 +1,18 @@ import { Job } from "bullmq"; import { insertLatestVideos } from "lib/task/insertLatestVideo.ts"; -import MainQueue from "lib/mq/index.ts"; +import { LatestVideosQueue } from "lib/mq/index.ts"; import { MINUTE } from "$std/datetime/constants.ts"; import { db } from "lib/db/init.ts"; import { truncate } from "lib/utils/truncate.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import logger from "lib/log/logger.ts"; +import { lockManager } from "lib/mq/lockManager.ts"; const delayMap = [5, 10, 15, 30, 60, 60]; const updateQueueInterval = async (failedCount: number, delay: number) => { logger.log(`job:getLatestVideos added to queue, delay: ${(delay / MINUTE).toFixed(2)} minutes.`, "mq"); - await MainQueue.upsertJobScheduler("getLatestVideos", { + await LatestVideosQueue.upsertJobScheduler("getLatestVideos", { every: delay, }, { data: { @@ -22,7 +23,6 @@ const updateQueueInterval = async (failedCount: number, delay: number) => { }; const executeTask = async (client: Client, failedCount: number) => { - logger.log("getLatestVideos now executing", "task"); const result = await insertLatestVideos(client); failedCount = result !== 0 ? truncate(failedCount + 1, 0, 5) : 0; if (failedCount !== 0) { @@ -32,6 +32,13 @@ const executeTask = async (client: Client, failedCount: number) => { }; export const getLatestVideosWorker = async (job: Job) => { + if (await lockManager.isLocked("getLatestVideos")) { + logger.log("job:getLatestVideos is locked, skipping.", "mq"); + return; + } + + lockManager.acquireLock("getLatestVideos"); + const failedCount = (job.data.failedCount ?? 0) as number; const client = await db.connect(); @@ -39,6 +46,7 @@ export const getLatestVideosWorker = async (job: Job) => { await executeTask(client, failedCount); } finally { client.release(); + lockManager.releaseLock("getLatestVideos"); } return; }; diff --git a/lib/mq/exec/getVideoTags.ts b/lib/mq/exec/getVideoTags.ts index e62234f..72c678e 100644 --- a/lib/mq/exec/getVideoTags.ts +++ b/lib/mq/exec/getVideoTags.ts @@ -1,34 +1,65 @@ import { Job } from "bullmq"; -import { insertLatestVideos } from "lib/task/insertLatestVideo.ts"; -import MainQueue from "lib/mq/index.ts"; -import { MINUTE } from "$std/datetime/constants.ts"; +import { VideoTagsQueue } from "lib/mq/index.ts"; +import { DAY, HOUR, MINUTE, SECOND } from "$std/datetime/constants.ts"; import { db } from "lib/db/init.ts"; import { truncate } from "lib/utils/truncate.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import logger from "lib/log/logger.ts"; +import { getNullVideoTagsList, updateVideoTags } from "lib/db/allData.ts"; +import { getVideoTags } from "lib/net/getVideoTags.ts"; +import { NetSchedulerError } from "lib/mq/scheduler.ts"; +import { WorkerError } from "src/worker.ts"; -const delayMap = [5, 10, 15, 30, 60, 60]; - -const updateQueueInterval = async (failedCount: number, delay: number) => { - logger.log(`job:getVideoTags added to queue, delay: ${(delay / MINUTE).toFixed(2)} minutes.`, "mq"); - await MainQueue.upsertJobScheduler("getVideoTags", { - every: delay, - }, { - data: { - failedCount: failedCount, - }, - }); - return; +const delayMap = [0.5, 3, 5, 15, 30, 60]; +const getJobPriority = (diff: number) => { + let priority; + if (diff > 14 * DAY) { + priority = 10; + } else if (diff > 7 * DAY) { + priority = 7; + } else if (diff > DAY) { + priority = 5; + } else if (diff > 6 * HOUR) { + priority = 3; + } else if (diff > HOUR) { + priority = 2; + } else { + priority = 1; + } + return priority; }; -const executeTask = async (client: Client, failedCount: number) => { - logger.log("getLatestVideos now executing", "task"); - const result = await insertLatestVideos(client); - failedCount = result !== 0 ? truncate(failedCount + 1, 0, 5) : 0; - if (failedCount !== 0) { - await updateQueueInterval(failedCount, delayMap[failedCount] * MINUTE); +const executeTask = async (client: Client, aid: number, failedCount: number, job: Job) => { + try { + const result = await getVideoTags(aid); + if (!result) { + failedCount = truncate(failedCount + 1, 0, 5); + const delay = delayMap[failedCount] * MINUTE; + logger.log( + `job:getVideoTags added to queue, delay: ${delayMap[failedCount]} minutes.`, + "mq", + ); + await VideoTagsQueue.add("getVideoTags", { aid, failedCount }, { delay, priority: 6 - failedCount }); + return 1; + } + await updateVideoTags(client, aid, result); + logger.log(`Fetched tags for aid: ${aid}`, "task"); + return 0; + } catch (e) { + if (!(e instanceof NetSchedulerError)) { + throw new WorkerError( e, "task", "getVideoTags/fn:executeTask"); + } + const err = e as NetSchedulerError; + if (err.code === "NO_AVAILABLE_PROXY" || err.code === "PROXY_RATE_LIMITED") { + logger.warn(`No available proxy for fetching tags, delayed. aid: ${aid}`, "task"); + await VideoTagsQueue.add("getVideoTags", { aid, failedCount }, { + delay: 25 * SECOND * Math.random() + 5 * SECOND, + priority: job.priority, + }); + return 2; + } + throw new WorkerError(err, "task", "getVideoTags/fn:executeTask"); } - return; }; export const getVideoTagsWorker = async (job: Job) => { @@ -36,13 +67,33 @@ export const getVideoTagsWorker = async (job: Job) => { const client = await db.connect(); const aid = job.data.aid; if (!aid) { - return; + return 3; } - try { - await executeTask(client, failedCount); - } finally { - client.release(); - } - return; + const v = await executeTask(client, aid, failedCount, job); + client.release(); + return v; +}; + +export const getVideoTagsInitializer = async () => { + const client = await db.connect(); + const videos = await getNullVideoTagsList(client); + if (videos.length == 0) { + return 4; + } + const count = await VideoTagsQueue.getJobCounts("wait", "delayed", "active"); + const total = count.delayed + count.active + count.wait; + const max = 15; + const rest = truncate(max - total, 0, max); + + let i = 0; + for (const video of videos) { + if (i > rest) return 100 + i; + const aid = video.aid; + const timestamp = video.published_at; + const diff = Date.now() - timestamp; + await VideoTagsQueue.add("getVideoTags", { aid }, { priority: getJobPriority(diff) }); + i++; + } + return 0; }; diff --git a/lib/mq/index.ts b/lib/mq/index.ts index 2858a39..22511ab 100644 --- a/lib/mq/index.ts +++ b/lib/mq/index.ts @@ -1,5 +1,5 @@ import { Queue } from "bullmq"; -const MainQueue = new Queue("cvsa"); +export const LatestVideosQueue = new Queue("latestVideos"); -export default MainQueue; \ No newline at end of file +export const VideoTagsQueue = new Queue("videoTags"); diff --git a/lib/mq/init.ts b/lib/mq/init.ts index 9697233..a79ba03 100644 --- a/lib/mq/init.ts +++ b/lib/mq/init.ts @@ -1,14 +1,22 @@ -import { MINUTE } from "$std/datetime/constants.ts"; -import MainQueue from "lib/mq/index.ts"; +import { MINUTE, SECOND } from "$std/datetime/constants.ts"; +import { LatestVideosQueue, VideoTagsQueue } from "lib/mq/index.ts"; import logger from "lib/log/logger.ts"; async function configGetLatestVideos() { - await MainQueue.upsertJobScheduler("getLatestVideos", { - every: 1 * MINUTE - }) + await LatestVideosQueue.upsertJobScheduler("getLatestVideos", { + every: 1 * MINUTE, + }); +} + +async function configGetVideosTags() { + await VideoTagsQueue.upsertJobScheduler("getVideosTags", { + every: 30 * SECOND, + immediately: true, + }); } export async function initMQ() { - await configGetLatestVideos() - logger.log("Message queue initialized.") + await configGetLatestVideos(); + await configGetVideosTags(); + logger.log("Message queue initialized."); } diff --git a/lib/mq/lockManager.ts b/lib/mq/lockManager.ts new file mode 100644 index 0000000..0aa989e --- /dev/null +++ b/lib/mq/lockManager.ts @@ -0,0 +1,57 @@ +import { Redis } from "ioredis"; +import { redis } from "lib/db/redis.ts"; + +class LockManager { + private redis: Redis; + + /* + * Create a new LockManager + * @param redisClient The Redis client used to store the lock data + */ + constructor(redisClient: Redis) { + this.redis = redisClient; + } + + /* + * Acquire a lock for a given ID + * @param id The unique identifier for the lock + * @param timeout Optional timeout in seconds after which the lock will automatically be released + * @returns true if the lock was successfully acquired, false otherwise + */ + async acquireLock(id: string, timeout?: number): Promise { + const key = `cvsa:lock:${id}`; + const result = await this.redis.set(key, "locked", "NX"); + + if (result !== "OK") { + return false; + } + if (timeout) { + await this.redis.expire(key, timeout); + } + return true; + } + + /* + * Release a lock for a given ID + * @param id The unique identifier for the lock + * @returns true if the lock was successfully released, false otherwise + */ + async releaseLock(id: string): Promise { + const key = `cvsa:lock:${id}`; + const result = await this.redis.del(key); + return result === 1; + } + + /* + * Check if a lock is currently held for a given ID + * @param id The unique identifier for the lock + * @returns true if the lock is currently held, false otherwise + */ + async isLocked(id: string): Promise { + const key = `cvsa:lock:${id}`; + const result = await this.redis.exists(key); + return result === 1; + } +} + +export const lockManager = new LockManager(redis); diff --git a/lib/mq/scheduler.ts b/lib/mq/scheduler.ts index f456fdd..25e7705 100644 --- a/lib/mq/scheduler.ts +++ b/lib/mq/scheduler.ts @@ -22,12 +22,12 @@ type NetSchedulerErrorCode = | "NOT_IMPLEMENTED"; export class NetSchedulerError extends Error { - public errorCode: NetSchedulerErrorCode; + public code: NetSchedulerErrorCode; public rawError: unknown | undefined; constructor(message: string, errorCode: NetSchedulerErrorCode, rawError?: unknown) { super(message); this.name = "NetSchedulerError"; - this.errorCode = errorCode; + this.code = errorCode; this.rawError = rawError; } } @@ -87,11 +87,11 @@ class NetScheduler { async proxyRequest(url: string, proxyName: string, method: string = "GET", force: boolean = false): Promise { const proxy = this.proxies[proxyName]; if (!proxy) { - throw new NetSchedulerError(`Proxy "${proxy}" not found`, "PROXY_NOT_FOUND"); + throw new NetSchedulerError(`Proxy "${proxyName}" not found`, "PROXY_NOT_FOUND"); } if (!force && await this.getProxyAvailability(proxyName) === false) { - throw new NetSchedulerError(`Proxy "${proxy}" is rate limited`, "PROXY_RATE_LIMITED"); + throw new NetSchedulerError(`Proxy "${proxyName}" is rate limited`, "PROXY_RATE_LIMITED"); } if (proxy.limiter) { diff --git a/lib/net/getLatestVideos.ts b/lib/net/getLatestVideos.ts index df89099..33b539c 100644 --- a/lib/net/getLatestVideos.ts +++ b/lib/net/getLatestVideos.ts @@ -1,39 +1,45 @@ import { VideoListResponse } from "lib/net/bilibili.d.ts"; -import formatPublishedAt from "lib/utils/formatTimestampToPostgre.ts"; +import { formatTimestampToPsql as formatPublishedAt } from "lib/utils/formatTimestampToPostgre.ts"; import { AllDataType } from "lib/db/schema.d.ts"; import logger from "lib/log/logger.ts"; +import { HOUR, SECOND } from "$std/datetime/constants.ts"; -export async function getLatestVideos(page: number = 1, pageSize: number = 10, sleepRate: number = 250, fetchTags: boolean = true): Promise { - try { - const response = await fetch(`https://api.bilibili.com/x/web-interface/newlist?rid=30&ps=${pageSize}&pn=${page}`); - const data: VideoListResponse = await response.json(); +export async function getLatestVideos( + page: number = 1, + pageSize: number = 10, + sleepRate: number = 250, + fetchTags: boolean = true, +): Promise { + try { + const response = await fetch( + `https://api.bilibili.com/x/web-interface/newlist?rid=30&ps=${pageSize}&pn=${page}`, + ); + const data: VideoListResponse = await response.json(); - if (data.code !== 0) { - logger.error(`Error fetching videos: ${data.message}`, 'net', 'getLatestVideos'); - return null; - } + if (data.code !== 0) { + logger.error(`Error fetching videos: ${data.message}`, "net", "getLatestVideos"); + return null; + } - if (data.data.archives.length === 0) { - logger.verbose("No more videos found", 'net', 'getLatestVideos'); - return []; - } + if (data.data.archives.length === 0) { + logger.verbose("No more videos found", "net", "getLatestVideos"); + return []; + } - const videoData = data.data.archives.map((video) => { - const published_at = formatPublishedAt(video.pubdate + 3600 * 8); - return { - aid: video.aid, - bvid: video.bvid, - description: video.desc, - uid: video.owner.mid, - tags: null, - title: video.title, - published_at: published_at, - } as AllDataType; - }); - - return videoData; - } catch (error) { - logger.error(error as Error, "net", "getLatestVideos"); - return null; - } -} \ No newline at end of file + return data.data.archives.map((video) => { + const published_at = formatPublishedAt(video.pubdate * SECOND + 8 * HOUR); + return { + aid: video.aid, + bvid: video.bvid, + description: video.desc, + uid: video.owner.mid, + tags: null, + title: video.title, + published_at: published_at, + } as AllDataType; + }); + } catch (error) { + logger.error(error as Error, "net", "getLatestVideos"); + return null; + } +} diff --git a/lib/net/getVideoTags.ts b/lib/net/getVideoTags.ts index c8f77be..4ec0af6 100644 --- a/lib/net/getVideoTags.ts +++ b/lib/net/getVideoTags.ts @@ -21,7 +21,7 @@ export async function getVideoTags(aid: number): Promise { } catch (e) { const error = e as NetSchedulerError; - if (error.errorCode == "FETCH_ERROR") { + if (error.code == "FETCH_ERROR") { const rawError = error.rawError! as Error; rawError.message = `Error fetching tags for video ${aid}: ` + rawError.message; logger.error(rawError, 'net', 'getVideoTags'); diff --git a/lib/utils/formatTimestampToPostgre.ts b/lib/utils/formatTimestampToPostgre.ts index 9b5140a..8dc37b2 100644 --- a/lib/utils/formatTimestampToPostgre.ts +++ b/lib/utils/formatTimestampToPostgre.ts @@ -1,4 +1,8 @@ -export default function formatTimestamp(timestamp: number) { - const date = new Date(timestamp * 1000); - return date.toISOString().slice(0, 19).replace("T", " "); -} \ No newline at end of file +export function formatTimestampToPsql(timestamp: number) { + const date = new Date(timestamp); + return date.toISOString().slice(0, 23).replace("T", " "); +} + +export function parseTimestampFromPsql(timestamp: string) { + return new Date(timestamp).getTime(); +} diff --git a/bullui.ts b/src/bullui.ts similarity index 80% rename from bullui.ts rename to src/bullui.ts index 321820b..9aab14b 100644 --- a/bullui.ts +++ b/src/bullui.ts @@ -2,14 +2,13 @@ import express from "express"; import { createBullBoard } from "@bull-board/api"; import { BullMQAdapter } from "@bull-board/api/bullMQAdapter.js"; import { ExpressAdapter } from "@bull-board/express"; - -import MainQueue from "lib/mq/index.ts"; +import { LatestVideosQueue, VideoTagsQueue } from "lib/mq/index.ts"; const serverAdapter = new ExpressAdapter(); serverAdapter.setBasePath("/"); createBullBoard({ - queues: [new BullMQAdapter(MainQueue)], + queues: [new BullMQAdapter(LatestVideosQueue), new BullMQAdapter(VideoTagsQueue)], serverAdapter: serverAdapter, }); diff --git a/jobAdder.ts b/src/jobAdder.ts similarity index 98% rename from jobAdder.ts rename to src/jobAdder.ts index 001804d..cb107f4 100644 --- a/jobAdder.ts +++ b/src/jobAdder.ts @@ -1,4 +1,3 @@ import { initMQ } from "lib/mq/init.ts"; await initMQ(); - diff --git a/src/worker.ts b/src/worker.ts new file mode 100644 index 0000000..75dfb94 --- /dev/null +++ b/src/worker.ts @@ -0,0 +1,68 @@ +import { Job, Worker } from "bullmq"; +import { getLatestVideosWorker } from "lib/mq/executors.ts"; +import { redis } from "lib/db/redis.ts"; +import logger from "lib/log/logger.ts"; +import {getVideoTagsWorker} from "lib/mq/exec/getVideoTags.ts"; +import { getVideoTagsInitializer } from "lib/mq/exec/getVideoTags.ts"; + +export class WorkerError extends Error { + public service?: string; + public codePath?: string; + public rawError: Error; + constructor(rawError: Error, service?: string, codePath?: string) { + super(rawError.message); + this.name = "WorkerFailure"; + this.codePath = codePath; + this.service = service; + this.rawError = rawError; + } +} + +const latestVideoWorker = new Worker( + "latestVideos", + async (job: Job) => { + switch (job.name) { + case "getLatestVideos": + await getLatestVideosWorker(job); + break; + default: + break; + } + }, + { connection: redis, concurrency: 1 }, +); + +latestVideoWorker.on("active", () => { + logger.log("Worker activated.", "mq"); +}); + +latestVideoWorker.on("error", (err) => { + const e = err as WorkerError; + logger.error(e.rawError, e.service, e.codePath); +}); + + +const videoTagsWorker = new Worker( + "videoTags", + async (job: Job) => { + switch (job.name) { + case "getVideoTags": + return await getVideoTagsWorker(job); + case "getVideosTags": + return await getVideoTagsInitializer(); + default: + break; + } + }, + { connection: redis, concurrency: 6 }, +); + +videoTagsWorker.on("active", () => { + logger.log("Worker activated.", "mq"); +}); + +videoTagsWorker.on("error", (err) => { + const e = err as WorkerError; + logger.error(e.rawError, e.service, e.codePath); +}); + diff --git a/worker.ts b/worker.ts deleted file mode 100644 index 4ad92ee..0000000 --- a/worker.ts +++ /dev/null @@ -1,26 +0,0 @@ -import { Job, Worker } from "bullmq"; -import { getLatestVideosWorker } from "lib/mq/executors.ts"; -import { redis } from "lib/db/redis.ts"; -import logger from "lib/log/logger.ts"; - -const crawlerWorker = new Worker( - "cvsa", - async (job: Job) => { - switch (job.name) { - case "getLatestVideos": - await getLatestVideosWorker(job); - break; - default: - break; - } - }, - { connection: redis, concurrency: 10 }, -); - -crawlerWorker.on("active", () => { - logger.log("Worker activated.", "mq"); -}); - -crawlerWorker.on("error", (err) => { - logger.error(err); -});