From 471a522d05426de3cd201e3a1f9422dea6d3ef37 Mon Sep 17 00:00:00 2001 From: alikia2x Date: Tue, 11 Feb 2025 00:37:40 +0800 Subject: [PATCH] update: getVideoTags with NetScheduler improve: extracted PostgreSQL config --- lib/db/allData.ts | 21 ++++++++++++++++----- lib/db/init.ts | 25 ++----------------------- lib/db/pgConfig.ts | 21 +++++++++++++++++++++ lib/mq/scheduler.ts | 29 +++++++++++++++-------------- lib/net/getVideoTags.ts | 26 +++++++++++++++++++++----- test/db/videoTagIsNull.test.ts | 33 +++++++++++++++++++++++++++++++++ worker.ts | 8 ++++---- 7 files changed, 112 insertions(+), 51 deletions(-) create mode 100644 lib/db/pgConfig.ts create mode 100644 test/db/videoTagIsNull.test.ts diff --git a/lib/db/allData.ts b/lib/db/allData.ts index 08e83d6..9ec5659 100644 --- a/lib/db/allData.ts +++ b/lib/db/allData.ts @@ -1,22 +1,26 @@ -import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; +import { Client, Transaction } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { AllDataType } from "lib/db/schema.d.ts"; import logger from "lib/log/logger.ts"; export async function videoExistsInAllData(client: Client, aid: number) { - return await client.queryObject<{ exists: boolean }>("SELECT EXISTS(SELECT 1 FROM all_data WHERE aid = $1)", [aid]) + return await client.queryObject<{ exists: boolean }>(`SELECT EXISTS(SELECT 1 FROM all_data WHERE aid = $1)`, [aid]) .then((result) => result.rows[0].exists); } export async function insertIntoAllData(client: Client, data: AllDataType) { - logger.log(`inserted ${data.aid}`, "db-all_data") + logger.log(`inserted ${data.aid}`, "db-all_data"); return await client.queryObject( - "INSERT INTO all_data (aid, bvid, description, uid, tags, title, published_at) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (aid) DO NOTHING", + `INSERT INTO all_data (aid, bvid, description, uid, tags, title, published_at) + VALUES ($1, $2, $3, $4, $5, $6, $7) + ON CONFLICT (aid) DO NOTHING`, [data.aid, data.bvid, data.description, data.uid, data.tags, data.title, data.published_at], ); } export async function getLatestVideoTimestampFromAllData(client: Client) { - return await client.queryObject<{ published_at: string }>("SELECT published_at FROM all_data ORDER BY published_at DESC LIMIT 1") + return await client.queryObject<{ published_at: string }>( + `SELECT published_at FROM all_data ORDER BY published_at DESC LIMIT 1`, + ) .then((result) => { const date = new Date(result.rows[0].published_at); if (isNaN(date.getTime())) { @@ -25,3 +29,10 @@ export async function getLatestVideoTimestampFromAllData(client: Client) { return date.getTime(); }); } + +export async function videoTagsIsNull(client: Client | Transaction, aid: number) { + return await client.queryObject<{ exists: boolean }>( + `SELECT EXISTS(SELECT 1 FROM all_data WHERE aid = $1 AND tags IS NULL)`, + [aid], + ).then((result) => result.rows[0].exists); +} diff --git a/lib/db/init.ts b/lib/db/init.ts index 12e3c4f..ed4667d 100644 --- a/lib/db/init.ts +++ b/lib/db/init.ts @@ -1,27 +1,6 @@ import { Pool } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; +import {postgresConfig} from "lib/db/pgConfig.ts"; -const requiredEnvVars = ["DB_HOST", "DB_NAME", "DB_USER", "DB_PASSWORD", "DB_PORT"]; - -const unsetVars = requiredEnvVars.filter((key) => Deno.env.get(key) === undefined); - -if (unsetVars.length > 0) { - throw new Error(`Missing required environment variables: ${unsetVars.join(", ")}`); -} - -const databaseHost = Deno.env.get("DB_HOST")!; -const databaseName = Deno.env.get("DB_NAME")!; -const databaseUser = Deno.env.get("DB_USER")!; -const databasePassword = Deno.env.get("DB_PASSWORD")!; -const databasePort = Deno.env.get("DB_PORT")!; - -const postgresConfig = { - hostname: databaseHost, - port: parseInt(databasePort), - database: databaseName, - user: databaseUser, - password: databasePassword, -}; - -const pool = new Pool(postgresConfig, 4); +const pool = new Pool(postgresConfig, 32); export const db = pool; diff --git a/lib/db/pgConfig.ts b/lib/db/pgConfig.ts new file mode 100644 index 0000000..4c34ef4 --- /dev/null +++ b/lib/db/pgConfig.ts @@ -0,0 +1,21 @@ +const requiredEnvVars = ["DB_HOST", "DB_NAME", "DB_USER", "DB_PASSWORD", "DB_PORT"]; + +const unsetVars = requiredEnvVars.filter((key) => Deno.env.get(key) === undefined); + +if (unsetVars.length > 0) { + throw new Error(`Missing required environment variables: ${unsetVars.join(", ")}`); +} + +const databaseHost = Deno.env.get("DB_HOST")!; +const databaseName = Deno.env.get("DB_NAME")!; +const databaseUser = Deno.env.get("DB_USER")!; +const databasePassword = Deno.env.get("DB_PASSWORD")!; +const databasePort = Deno.env.get("DB_PORT")!; + +export const postgresConfig = { + hostname: databaseHost, + port: parseInt(databasePort), + database: databaseName, + user: databaseUser, + password: databasePassword, +}; \ No newline at end of file diff --git a/lib/mq/scheduler.ts b/lib/mq/scheduler.ts index a061953..f456fdd 100644 --- a/lib/mq/scheduler.ts +++ b/lib/mq/scheduler.ts @@ -1,7 +1,7 @@ import logger from "lib/log/logger.ts"; -import { RateLimiter } from "lib/mq/rateLimiter.ts"; -import { SlidingWindow } from "lib/mq/slidingWindow.ts"; -import { redis } from "lib/db/redis.ts"; +import {RateLimiter} from "lib/mq/rateLimiter.ts"; +import {SlidingWindow} from "lib/mq/slidingWindow.ts"; +import {redis} from "lib/db/redis.ts"; import Redis from "ioredis"; interface Proxy { @@ -23,10 +23,12 @@ type NetSchedulerErrorCode = export class NetSchedulerError extends Error { public errorCode: NetSchedulerErrorCode; - constructor(message: string, errorCode: NetSchedulerErrorCode) { + public rawError: unknown | undefined; + constructor(message: string, errorCode: NetSchedulerErrorCode, rawError?: unknown) { super(message); this.name = "NetSchedulerError"; this.errorCode = errorCode; + this.rawError = rawError; } } @@ -56,7 +58,7 @@ class NetScheduler { * - The native `fetch` function threw an error: with error code FETCH_ERROR * - The proxy type is not supported: with error code NOT_IMPLEMENTED */ - async request(url: string, method: string = "GET", task: string): Promise { + async request(url: string, task: string, method: string = "GET"): Promise { // find a available proxy const proxiesNames = Object.keys(this.proxies); for (const proxyName of proxiesNames) { @@ -133,29 +135,28 @@ class NetScheduler { private async nativeRequest(url: string, method: string): Promise { try { const response = await fetch(url, { method }); - const data = await response.json() as R; - return data; + return await response.json() as R; } catch (e) { - logger.error(e as Error); - throw new NetSchedulerError("Fetch error", "FETCH_ERROR"); + throw new NetSchedulerError("Fetch error", "FETCH_ERROR", e); } } } const netScheduler = new NetScheduler(); +netScheduler.addProxy("default", "native", "default"); netScheduler.addProxy("tags-native", "native", "getVideoTags"); const tagsRateLimiter = new RateLimiter("getVideoTags", [ { - window: new SlidingWindow(redis, 1.2), - max: 1, + window: new SlidingWindow(redis, 1), + max: 3, }, { window: new SlidingWindow(redis, 30), - max: 5, + max: 30, }, { - window: new SlidingWindow(redis, 5 * 60), - max: 70, + window: new SlidingWindow(redis, 2 * 60), + max: 50, }, ]); netScheduler.setProxyLimiter("tags-native", tagsRateLimiter); diff --git a/lib/net/getVideoTags.ts b/lib/net/getVideoTags.ts index 7838a73..c8f77be 100644 --- a/lib/net/getVideoTags.ts +++ b/lib/net/getVideoTags.ts @@ -1,19 +1,35 @@ import { VideoTagsResponse } from "lib/net/bilibili.d.ts"; +import netScheduler, {NetSchedulerError} from "lib/mq/scheduler.ts"; import logger from "lib/log/logger.ts"; +/* + * Fetch the tags for a video + * @param {number} aid The video's aid + * @return {Promise} A promise, which resolves to an array of tags, + * or null if an `fetch` error occurred + * @throws {NetSchedulerError} If the request failed. + */ export async function getVideoTags(aid: number): Promise { try { const url = `https://api.bilibili.com/x/tag/archive/tags?aid=${aid}`; - const res = await fetch(url); - const data: VideoTagsResponse = await res.json(); + const data = await netScheduler.request(url, 'getVideoTags'); if (data.code != 0) { logger.error(`Error fetching tags for video ${aid}: ${data.message}`, 'net', 'getVideoTags'); return []; } return data.data.map((tag) => tag.tag_name); } - catch { - logger.error(`Error fetching tags for video ${aid}`, 'net', 'getVideoTags'); - return null; + catch (e) { + const error = e as NetSchedulerError; + if (error.errorCode == "FETCH_ERROR") { + const rawError = error.rawError! as Error; + rawError.message = `Error fetching tags for video ${aid}: ` + rawError.message; + logger.error(rawError, 'net', 'getVideoTags'); + return null; + } + else { + // Re-throw the error + throw e; + } } } diff --git a/test/db/videoTagIsNull.test.ts b/test/db/videoTagIsNull.test.ts new file mode 100644 index 0000000..7ffc8cc --- /dev/null +++ b/test/db/videoTagIsNull.test.ts @@ -0,0 +1,33 @@ +import { assertEquals } from "jsr:@std/assert"; +import { videoTagsIsNull } from "lib/db/allData.ts"; +import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; +import { postgresConfig } from "lib/db/pgConfig.ts"; + +// A minimal aid which has an empty tags field in our database +const TEST_AID = 63569; + +Deno.test("videoTagsIsNull function", async () => { + const client = new Client(postgresConfig); + + try { + const transaction = client.createTransaction("test_transaction"); + await transaction.begin(); + + const result1 = await videoTagsIsNull(transaction, TEST_AID); + assertEquals(typeof result1, "boolean", "The result should be a boolean value."); + assertEquals(result1, false, "The result should be false if tags is not NULL for the given aid."); + + await transaction.queryArray`UPDATE all_data SET tags = NULL WHERE aid = ${TEST_AID}`; + + const result2 = await videoTagsIsNull(transaction, TEST_AID); + assertEquals(typeof result2, "boolean", "The result should be a boolean value."); + assertEquals(result2, true, "The result should be true if tags is NULL for the given aid."); + + await transaction.rollback(); + } catch (error) { + console.error("Error during test:", error); + throw error; + } finally { + client.end(); + } +}); diff --git a/worker.ts b/worker.ts index 33da57b..4ad92ee 100644 --- a/worker.ts +++ b/worker.ts @@ -3,7 +3,7 @@ import { getLatestVideosWorker } from "lib/mq/executors.ts"; import { redis } from "lib/db/redis.ts"; import logger from "lib/log/logger.ts"; -const worker = new Worker( +const crawlerWorker = new Worker( "cvsa", async (job: Job) => { switch (job.name) { @@ -14,13 +14,13 @@ const worker = new Worker( break; } }, - { connection: redis, concurrency: 4 }, + { connection: redis, concurrency: 10 }, ); -worker.on("active", () => { +crawlerWorker.on("active", () => { logger.log("Worker activated.", "mq"); }); -worker.on("error", (err) => { +crawlerWorker.on("error", (err) => { logger.error(err); });