update: added tags fetching

This commit is contained in:
alikia2x (寒寒) 2025-02-12 00:18:56 +08:00
parent bdb6568ae5
commit d870627220
Signed by: alikia2x
GPG Key ID: 56209E0CCD8420C6
15 changed files with 316 additions and 117 deletions

View File

@ -10,9 +10,9 @@
"build": "deno run -A dev.ts build",
"preview": "deno run -A main.ts",
"update": "deno run -A -r https://fresh.deno.dev/update .",
"worker": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write worker.ts",
"adder": "deno run --allow-env --allow-read --allow-ffi --allow-net jobAdder.ts",
"bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net bullui.ts",
"worker": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/worker.ts",
"adder": "deno run --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts",
"bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts",
"all": "concurrently 'deno task start' 'deno task worker' 'deno task adder' 'deno task bullui'",
"test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run"
},
@ -39,7 +39,8 @@
"ioredis": "npm:ioredis",
"@bull-board/api": "npm:@bull-board/api",
"@bull-board/express": "npm:@bull-board/express",
"express": "npm:express"
"express": "npm:express",
"src/": "./src/"
},
"compilerOptions": {
"jsx": "react-jsx",
@ -50,6 +51,7 @@
"useTabs": true,
"lineWidth": 120,
"indentWidth": 4,
"semiColons": true
"semiColons": true,
"proseWrap": "always"
}
}

View File

@ -1,6 +1,7 @@
import { Client, Transaction } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { AllDataType } from "lib/db/schema.d.ts";
import logger from "lib/log/logger.ts";
import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts";
export async function videoExistsInAllData(client: Client, aid: number) {
return await client.queryObject<{ exists: boolean }>(`SELECT EXISTS(SELECT 1 FROM all_data WHERE aid = $1)`, [aid])
@ -36,3 +37,25 @@ export async function videoTagsIsNull(client: Client | Transaction, aid: number)
[aid],
).then((result) => result.rows[0].exists);
}
export async function updateVideoTags(client: Client | Transaction, aid: number, tags: string[]) {
return await client.queryObject(
`UPDATE all_data SET tags = $1 WHERE aid = $2`,
[tags.join(","), aid],
);
}
export async function getNullVideoTagsList(client: Client) {
const queryResult = await client.queryObject<{ aid: number; published_at: string }>(
`SELECT aid, published_at FROM all_data WHERE tags IS NULL`,
);
const rows = queryResult.rows;
return rows.map(
(row) => {
return {
aid: Number(row.aid),
published_at: parseTimestampFromPsql(row.published_at),
};
},
);
}

View File

@ -1,17 +1,18 @@
import { Job } from "bullmq";
import { insertLatestVideos } from "lib/task/insertLatestVideo.ts";
import MainQueue from "lib/mq/index.ts";
import { LatestVideosQueue } from "lib/mq/index.ts";
import { MINUTE } from "$std/datetime/constants.ts";
import { db } from "lib/db/init.ts";
import { truncate } from "lib/utils/truncate.ts";
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import logger from "lib/log/logger.ts";
import { lockManager } from "lib/mq/lockManager.ts";
const delayMap = [5, 10, 15, 30, 60, 60];
const updateQueueInterval = async (failedCount: number, delay: number) => {
logger.log(`job:getLatestVideos added to queue, delay: ${(delay / MINUTE).toFixed(2)} minutes.`, "mq");
await MainQueue.upsertJobScheduler("getLatestVideos", {
await LatestVideosQueue.upsertJobScheduler("getLatestVideos", {
every: delay,
}, {
data: {
@ -22,7 +23,6 @@ const updateQueueInterval = async (failedCount: number, delay: number) => {
};
const executeTask = async (client: Client, failedCount: number) => {
logger.log("getLatestVideos now executing", "task");
const result = await insertLatestVideos(client);
failedCount = result !== 0 ? truncate(failedCount + 1, 0, 5) : 0;
if (failedCount !== 0) {
@ -32,6 +32,13 @@ const executeTask = async (client: Client, failedCount: number) => {
};
export const getLatestVideosWorker = async (job: Job) => {
if (await lockManager.isLocked("getLatestVideos")) {
logger.log("job:getLatestVideos is locked, skipping.", "mq");
return;
}
lockManager.acquireLock("getLatestVideos");
const failedCount = (job.data.failedCount ?? 0) as number;
const client = await db.connect();
@ -39,6 +46,7 @@ export const getLatestVideosWorker = async (job: Job) => {
await executeTask(client, failedCount);
} finally {
client.release();
lockManager.releaseLock("getLatestVideos");
}
return;
};

View File

@ -1,34 +1,65 @@
import { Job } from "bullmq";
import { insertLatestVideos } from "lib/task/insertLatestVideo.ts";
import MainQueue from "lib/mq/index.ts";
import { MINUTE } from "$std/datetime/constants.ts";
import { VideoTagsQueue } from "lib/mq/index.ts";
import { DAY, HOUR, MINUTE, SECOND } from "$std/datetime/constants.ts";
import { db } from "lib/db/init.ts";
import { truncate } from "lib/utils/truncate.ts";
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import logger from "lib/log/logger.ts";
import { getNullVideoTagsList, updateVideoTags } from "lib/db/allData.ts";
import { getVideoTags } from "lib/net/getVideoTags.ts";
import { NetSchedulerError } from "lib/mq/scheduler.ts";
import { WorkerError } from "src/worker.ts";
const delayMap = [5, 10, 15, 30, 60, 60];
const updateQueueInterval = async (failedCount: number, delay: number) => {
logger.log(`job:getVideoTags added to queue, delay: ${(delay / MINUTE).toFixed(2)} minutes.`, "mq");
await MainQueue.upsertJobScheduler("getVideoTags", {
every: delay,
}, {
data: {
failedCount: failedCount,
},
});
return;
const delayMap = [0.5, 3, 5, 15, 30, 60];
const getJobPriority = (diff: number) => {
let priority;
if (diff > 14 * DAY) {
priority = 10;
} else if (diff > 7 * DAY) {
priority = 7;
} else if (diff > DAY) {
priority = 5;
} else if (diff > 6 * HOUR) {
priority = 3;
} else if (diff > HOUR) {
priority = 2;
} else {
priority = 1;
}
return priority;
};
const executeTask = async (client: Client, failedCount: number) => {
logger.log("getLatestVideos now executing", "task");
const result = await insertLatestVideos(client);
failedCount = result !== 0 ? truncate(failedCount + 1, 0, 5) : 0;
if (failedCount !== 0) {
await updateQueueInterval(failedCount, delayMap[failedCount] * MINUTE);
const executeTask = async (client: Client, aid: number, failedCount: number, job: Job) => {
try {
const result = await getVideoTags(aid);
if (!result) {
failedCount = truncate(failedCount + 1, 0, 5);
const delay = delayMap[failedCount] * MINUTE;
logger.log(
`job:getVideoTags added to queue, delay: ${delayMap[failedCount]} minutes.`,
"mq",
);
await VideoTagsQueue.add("getVideoTags", { aid, failedCount }, { delay, priority: 6 - failedCount });
return 1;
}
await updateVideoTags(client, aid, result);
logger.log(`Fetched tags for aid: ${aid}`, "task");
return 0;
} catch (e) {
if (!(e instanceof NetSchedulerError)) {
throw new WorkerError(<Error> e, "task", "getVideoTags/fn:executeTask");
}
const err = e as NetSchedulerError;
if (err.code === "NO_AVAILABLE_PROXY" || err.code === "PROXY_RATE_LIMITED") {
logger.warn(`No available proxy for fetching tags, delayed. aid: ${aid}`, "task");
await VideoTagsQueue.add("getVideoTags", { aid, failedCount }, {
delay: 25 * SECOND * Math.random() + 5 * SECOND,
priority: job.priority,
});
return 2;
}
throw new WorkerError(err, "task", "getVideoTags/fn:executeTask");
}
return;
};
export const getVideoTagsWorker = async (job: Job) => {
@ -36,13 +67,33 @@ export const getVideoTagsWorker = async (job: Job) => {
const client = await db.connect();
const aid = job.data.aid;
if (!aid) {
return;
return 3;
}
try {
await executeTask(client, failedCount);
} finally {
const v = await executeTask(client, aid, failedCount, job);
client.release();
}
return;
return v;
};
export const getVideoTagsInitializer = async () => {
const client = await db.connect();
const videos = await getNullVideoTagsList(client);
if (videos.length == 0) {
return 4;
}
const count = await VideoTagsQueue.getJobCounts("wait", "delayed", "active");
const total = count.delayed + count.active + count.wait;
const max = 15;
const rest = truncate(max - total, 0, max);
let i = 0;
for (const video of videos) {
if (i > rest) return 100 + i;
const aid = video.aid;
const timestamp = video.published_at;
const diff = Date.now() - timestamp;
await VideoTagsQueue.add("getVideoTags", { aid }, { priority: getJobPriority(diff) });
i++;
}
return 0;
};

View File

@ -1,5 +1,5 @@
import { Queue } from "bullmq";
const MainQueue = new Queue("cvsa");
export const LatestVideosQueue = new Queue("latestVideos");
export default MainQueue;
export const VideoTagsQueue = new Queue("videoTags");

View File

@ -1,14 +1,22 @@
import { MINUTE } from "$std/datetime/constants.ts";
import MainQueue from "lib/mq/index.ts";
import { MINUTE, SECOND } from "$std/datetime/constants.ts";
import { LatestVideosQueue, VideoTagsQueue } from "lib/mq/index.ts";
import logger from "lib/log/logger.ts";
async function configGetLatestVideos() {
await MainQueue.upsertJobScheduler("getLatestVideos", {
every: 1 * MINUTE
})
await LatestVideosQueue.upsertJobScheduler("getLatestVideos", {
every: 1 * MINUTE,
});
}
async function configGetVideosTags() {
await VideoTagsQueue.upsertJobScheduler("getVideosTags", {
every: 30 * SECOND,
immediately: true,
});
}
export async function initMQ() {
await configGetLatestVideos()
logger.log("Message queue initialized.")
await configGetLatestVideos();
await configGetVideosTags();
logger.log("Message queue initialized.");
}

57
lib/mq/lockManager.ts Normal file
View File

@ -0,0 +1,57 @@
import { Redis } from "ioredis";
import { redis } from "lib/db/redis.ts";
class LockManager {
private redis: Redis;
/*
* Create a new LockManager
* @param redisClient The Redis client used to store the lock data
*/
constructor(redisClient: Redis) {
this.redis = redisClient;
}
/*
* Acquire a lock for a given ID
* @param id The unique identifier for the lock
* @param timeout Optional timeout in seconds after which the lock will automatically be released
* @returns true if the lock was successfully acquired, false otherwise
*/
async acquireLock(id: string, timeout?: number): Promise<boolean> {
const key = `cvsa:lock:${id}`;
const result = await this.redis.set(key, "locked", "NX");
if (result !== "OK") {
return false;
}
if (timeout) {
await this.redis.expire(key, timeout);
}
return true;
}
/*
* Release a lock for a given ID
* @param id The unique identifier for the lock
* @returns true if the lock was successfully released, false otherwise
*/
async releaseLock(id: string): Promise<boolean> {
const key = `cvsa:lock:${id}`;
const result = await this.redis.del(key);
return result === 1;
}
/*
* Check if a lock is currently held for a given ID
* @param id The unique identifier for the lock
* @returns true if the lock is currently held, false otherwise
*/
async isLocked(id: string): Promise<boolean> {
const key = `cvsa:lock:${id}`;
const result = await this.redis.exists(key);
return result === 1;
}
}
export const lockManager = new LockManager(redis);

View File

@ -22,12 +22,12 @@ type NetSchedulerErrorCode =
| "NOT_IMPLEMENTED";
export class NetSchedulerError extends Error {
public errorCode: NetSchedulerErrorCode;
public code: NetSchedulerErrorCode;
public rawError: unknown | undefined;
constructor(message: string, errorCode: NetSchedulerErrorCode, rawError?: unknown) {
super(message);
this.name = "NetSchedulerError";
this.errorCode = errorCode;
this.code = errorCode;
this.rawError = rawError;
}
}
@ -87,11 +87,11 @@ class NetScheduler {
async proxyRequest<R>(url: string, proxyName: string, method: string = "GET", force: boolean = false): Promise<R> {
const proxy = this.proxies[proxyName];
if (!proxy) {
throw new NetSchedulerError(`Proxy "${proxy}" not found`, "PROXY_NOT_FOUND");
throw new NetSchedulerError(`Proxy "${proxyName}" not found`, "PROXY_NOT_FOUND");
}
if (!force && await this.getProxyAvailability(proxyName) === false) {
throw new NetSchedulerError(`Proxy "${proxy}" is rate limited`, "PROXY_RATE_LIMITED");
throw new NetSchedulerError(`Proxy "${proxyName}" is rate limited`, "PROXY_RATE_LIMITED");
}
if (proxy.limiter) {

View File

@ -1,25 +1,33 @@
import { VideoListResponse } from "lib/net/bilibili.d.ts";
import formatPublishedAt from "lib/utils/formatTimestampToPostgre.ts";
import { formatTimestampToPsql as formatPublishedAt } from "lib/utils/formatTimestampToPostgre.ts";
import { AllDataType } from "lib/db/schema.d.ts";
import logger from "lib/log/logger.ts";
import { HOUR, SECOND } from "$std/datetime/constants.ts";
export async function getLatestVideos(page: number = 1, pageSize: number = 10, sleepRate: number = 250, fetchTags: boolean = true): Promise<AllDataType[] | null> {
export async function getLatestVideos(
page: number = 1,
pageSize: number = 10,
sleepRate: number = 250,
fetchTags: boolean = true,
): Promise<AllDataType[] | null> {
try {
const response = await fetch(`https://api.bilibili.com/x/web-interface/newlist?rid=30&ps=${pageSize}&pn=${page}`);
const response = await fetch(
`https://api.bilibili.com/x/web-interface/newlist?rid=30&ps=${pageSize}&pn=${page}`,
);
const data: VideoListResponse = await response.json();
if (data.code !== 0) {
logger.error(`Error fetching videos: ${data.message}`, 'net', 'getLatestVideos');
logger.error(`Error fetching videos: ${data.message}`, "net", "getLatestVideos");
return null;
}
if (data.data.archives.length === 0) {
logger.verbose("No more videos found", 'net', 'getLatestVideos');
logger.verbose("No more videos found", "net", "getLatestVideos");
return [];
}
const videoData = data.data.archives.map((video) => {
const published_at = formatPublishedAt(video.pubdate + 3600 * 8);
return data.data.archives.map((video) => {
const published_at = formatPublishedAt(video.pubdate * SECOND + 8 * HOUR);
return {
aid: video.aid,
bvid: video.bvid,
@ -30,8 +38,6 @@ export async function getLatestVideos(page: number = 1, pageSize: number = 10, s
published_at: published_at,
} as AllDataType;
});
return videoData;
} catch (error) {
logger.error(error as Error, "net", "getLatestVideos");
return null;

View File

@ -21,7 +21,7 @@ export async function getVideoTags(aid: number): Promise<string[] | null> {
}
catch (e) {
const error = e as NetSchedulerError;
if (error.errorCode == "FETCH_ERROR") {
if (error.code == "FETCH_ERROR") {
const rawError = error.rawError! as Error;
rawError.message = `Error fetching tags for video ${aid}: ` + rawError.message;
logger.error(rawError, 'net', 'getVideoTags');

View File

@ -1,4 +1,8 @@
export default function formatTimestamp(timestamp: number) {
const date = new Date(timestamp * 1000);
return date.toISOString().slice(0, 19).replace("T", " ");
export function formatTimestampToPsql(timestamp: number) {
const date = new Date(timestamp);
return date.toISOString().slice(0, 23).replace("T", " ");
}
export function parseTimestampFromPsql(timestamp: string) {
return new Date(timestamp).getTime();
}

View File

@ -2,14 +2,13 @@ import express from "express";
import { createBullBoard } from "@bull-board/api";
import { BullMQAdapter } from "@bull-board/api/bullMQAdapter.js";
import { ExpressAdapter } from "@bull-board/express";
import MainQueue from "lib/mq/index.ts";
import { LatestVideosQueue, VideoTagsQueue } from "lib/mq/index.ts";
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath("/");
createBullBoard({
queues: [new BullMQAdapter(MainQueue)],
queues: [new BullMQAdapter(LatestVideosQueue), new BullMQAdapter(VideoTagsQueue)],
serverAdapter: serverAdapter,
});

View File

@ -1,4 +1,3 @@
import { initMQ } from "lib/mq/init.ts";
await initMQ();

68
src/worker.ts Normal file
View File

@ -0,0 +1,68 @@
import { Job, Worker } from "bullmq";
import { getLatestVideosWorker } from "lib/mq/executors.ts";
import { redis } from "lib/db/redis.ts";
import logger from "lib/log/logger.ts";
import {getVideoTagsWorker} from "lib/mq/exec/getVideoTags.ts";
import { getVideoTagsInitializer } from "lib/mq/exec/getVideoTags.ts";
export class WorkerError extends Error {
public service?: string;
public codePath?: string;
public rawError: Error;
constructor(rawError: Error, service?: string, codePath?: string) {
super(rawError.message);
this.name = "WorkerFailure";
this.codePath = codePath;
this.service = service;
this.rawError = rawError;
}
}
const latestVideoWorker = new Worker(
"latestVideos",
async (job: Job) => {
switch (job.name) {
case "getLatestVideos":
await getLatestVideosWorker(job);
break;
default:
break;
}
},
{ connection: redis, concurrency: 1 },
);
latestVideoWorker.on("active", () => {
logger.log("Worker activated.", "mq");
});
latestVideoWorker.on("error", (err) => {
const e = err as WorkerError;
logger.error(e.rawError, e.service, e.codePath);
});
const videoTagsWorker = new Worker(
"videoTags",
async (job: Job) => {
switch (job.name) {
case "getVideoTags":
return await getVideoTagsWorker(job);
case "getVideosTags":
return await getVideoTagsInitializer();
default:
break;
}
},
{ connection: redis, concurrency: 6 },
);
videoTagsWorker.on("active", () => {
logger.log("Worker activated.", "mq");
});
videoTagsWorker.on("error", (err) => {
const e = err as WorkerError;
logger.error(e.rawError, e.service, e.codePath);
});

View File

@ -1,26 +0,0 @@
import { Job, Worker } from "bullmq";
import { getLatestVideosWorker } from "lib/mq/executors.ts";
import { redis } from "lib/db/redis.ts";
import logger from "lib/log/logger.ts";
const crawlerWorker = new Worker(
"cvsa",
async (job: Job) => {
switch (job.name) {
case "getLatestVideos":
await getLatestVideosWorker(job);
break;
default:
break;
}
},
{ connection: redis, concurrency: 10 },
);
crawlerWorker.on("active", () => {
logger.log("Worker activated.", "mq");
});
crawlerWorker.on("error", (err) => {
logger.error(err);
});