diff --git a/deno.json b/deno.json index 8628b87..82bd6ac 100644 --- a/deno.json +++ b/deno.json @@ -13,7 +13,8 @@ "worker": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write worker.ts", "adder": "deno run --allow-env --allow-read --allow-ffi --allow-net jobAdder.ts", "bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net bullui.ts", - "all": "concurrently 'deno task start' 'deno task worker' 'deno task adder' 'deno task bullui'" + "all": "concurrently 'deno task start' 'deno task worker' 'deno task adder' 'deno task bullui'", + "test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run" }, "lint": { "rules": { diff --git a/lib/mq/exec/getLatestVideos.ts b/lib/mq/exec/getLatestVideos.ts new file mode 100644 index 0000000..3098721 --- /dev/null +++ b/lib/mq/exec/getLatestVideos.ts @@ -0,0 +1,44 @@ +import { Job } from "bullmq"; +import { insertLatestVideos } from "lib/task/insertLatestVideo.ts"; +import MainQueue from "lib/mq/index.ts"; +import { MINUTE } from "$std/datetime/constants.ts"; +import { db } from "lib/db/init.ts"; +import { truncate } from "lib/utils/truncate.ts"; +import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; +import logger from "lib/log/logger.ts"; + +const delayMap = [5, 10, 15, 30, 60, 60]; + +const updateQueueInterval = async (failedCount: number, delay: number) => { + logger.log(`job:getLatestVideos added to queue, delay: ${(delay / MINUTE).toFixed(2)} minutes.`, "mq"); + await MainQueue.upsertJobScheduler("getLatestVideos", { + every: delay, + }, { + data: { + failedCount: failedCount, + }, + }); + return; +}; + +const executeTask = async (client: Client, failedCount: number) => { + logger.log("getLatestVideos now executing", "task"); + const result = await insertLatestVideos(client); + failedCount = result !== 0 ? truncate(failedCount + 1, 0, 5) : 0; + if (failedCount !== 0) { + await updateQueueInterval(failedCount, delayMap[failedCount] * MINUTE); + } + return; +}; + +export const getLatestVideosWorker = async (job: Job) => { + const failedCount = (job.data.failedCount ?? 0) as number; + const client = await db.connect(); + + try { + await executeTask(client, failedCount); + } finally { + client.release(); + } + return; +}; diff --git a/lib/mq/exec/getVideoTags.ts b/lib/mq/exec/getVideoTags.ts new file mode 100644 index 0000000..e62234f --- /dev/null +++ b/lib/mq/exec/getVideoTags.ts @@ -0,0 +1,48 @@ +import { Job } from "bullmq"; +import { insertLatestVideos } from "lib/task/insertLatestVideo.ts"; +import MainQueue from "lib/mq/index.ts"; +import { MINUTE } from "$std/datetime/constants.ts"; +import { db } from "lib/db/init.ts"; +import { truncate } from "lib/utils/truncate.ts"; +import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; +import logger from "lib/log/logger.ts"; + +const delayMap = [5, 10, 15, 30, 60, 60]; + +const updateQueueInterval = async (failedCount: number, delay: number) => { + logger.log(`job:getVideoTags added to queue, delay: ${(delay / MINUTE).toFixed(2)} minutes.`, "mq"); + await MainQueue.upsertJobScheduler("getVideoTags", { + every: delay, + }, { + data: { + failedCount: failedCount, + }, + }); + return; +}; + +const executeTask = async (client: Client, failedCount: number) => { + logger.log("getLatestVideos now executing", "task"); + const result = await insertLatestVideos(client); + failedCount = result !== 0 ? truncate(failedCount + 1, 0, 5) : 0; + if (failedCount !== 0) { + await updateQueueInterval(failedCount, delayMap[failedCount] * MINUTE); + } + return; +}; + +export const getVideoTagsWorker = async (job: Job) => { + const failedCount = (job.data.failedCount ?? 0) as number; + const client = await db.connect(); + const aid = job.data.aid; + if (!aid) { + return; + } + + try { + await executeTask(client, failedCount); + } finally { + client.release(); + } + return; +}; diff --git a/lib/mq/executors.ts b/lib/mq/executors.ts index 5f822b0..6af60b2 100644 --- a/lib/mq/executors.ts +++ b/lib/mq/executors.ts @@ -1,44 +1 @@ -import { Job } from "bullmq"; -import { insertLatestVideos } from "lib/task/insertLatestVideo.ts"; -import MainQueue from "lib/mq/index.ts"; -import { MINUTE } from "$std/datetime/constants.ts"; -import { db } from "lib/db/init.ts"; -import { truncate } from "lib/utils/truncate.ts"; -import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; -import logger from "lib/log/logger.ts"; - -const delayMap = [5, 10, 15, 30, 60, 60]; - -const addJobToQueue = (failedCount: number, delay: number) => { - logger.log(`job:getLatestVideos added to queue, delay: ${(delay / MINUTE).toFixed(2)} minutes.`, "mq"); - MainQueue.upsertJobScheduler("getLatestVideos", { - every: delay, - }, { - data: { - failedCount: failedCount, - }, - }); - return; -}; - -export const insertVideosWorker = async (job: Job) => { - const failedCount = (job.data.failedCount ?? 0) as number; - const client = await db.connect(); - - try { - await executeTask(client, failedCount); - } finally { - client.release(); - } - return; -}; - -const executeTask = async (client: Client, failedCount: number) => { - logger.log("getLatestVideos now executing", "task"); - const result = await insertLatestVideos(client); - failedCount = result !== 0 ? truncate(failedCount + 1, 0, 5) : 0; - if (failedCount !== 0) { - addJobToQueue(failedCount, delayMap[failedCount] * MINUTE); - } - return; -}; +export * from "lib/mq/exec/getLatestVideos.ts"; \ No newline at end of file diff --git a/lib/mq/rateLimiter.ts b/lib/mq/rateLimiter.ts new file mode 100644 index 0000000..52063f2 --- /dev/null +++ b/lib/mq/rateLimiter.ts @@ -0,0 +1,46 @@ +import { SlidingWindow } from "lib/mq/slidingWindow.ts"; + +export interface RateLimiterConfig { + window: SlidingWindow; + max: number; +} + +export class RateLimiter { + private readonly configs: RateLimiterConfig[]; + private readonly configEventNames: string[]; + + /* + * @param name The name of the rate limiter + * @param configs The configuration of the rate limiter + */ + constructor(name: string, configs: RateLimiterConfig[]) { + this.configs = configs; + this.configEventNames = configs.map((_, index) => `${name}_config_${index}`); + } + + /* + * Check if the event has reached the rate limit + */ + async getAvailability(): Promise { + for (let i = 0; i < this.configs.length; i++) { + const config = this.configs[i]; + const eventName = this.configEventNames[i]; + const count = await config.window.count(eventName); + if (count >= config.max) { + return false; + } + } + return true; + } + + /* + * Trigger an event in the rate limiter + */ + async trigger(): Promise { + for (let i = 0; i < this.configs.length; i++) { + const config = this.configs[i]; + const eventName = this.configEventNames[i]; + await config.window.event(eventName); + } + } +} \ No newline at end of file diff --git a/lib/mq/scheduler.ts b/lib/mq/scheduler.ts new file mode 100644 index 0000000..7b36940 --- /dev/null +++ b/lib/mq/scheduler.ts @@ -0,0 +1,128 @@ +import logger from "lib/log/logger.ts"; +import {RateLimiter} from "lib/mq/rateLimiter.ts"; + +interface Proxy { + type: string; + task: string; + data?: { + [key: string]: string; + }; + limiter?: RateLimiter; +} + +interface ProxiesMap { + [name: string]: Proxy; +} + +type NetSchedulerErrorCode = + | "NO_AVAILABLE_PROXY" + | "PROXY_RATE_LIMITED" + | "PROXY_NOT_FOUND" + | "FETCH_ERROR" + | "NOT_IMPLEMENTED"; + +export class NetSchedulerError extends Error { + public errorCode: NetSchedulerErrorCode; + constructor(message: string, errorCode: NetSchedulerErrorCode) { + super(message); + this.name = "NetSchedulerError"; + this.errorCode = errorCode; + } +} + +export class NetScheduler { + private proxies: ProxiesMap = {}; + + addProxy(name: string, type: string, task: string): void { + this.proxies[name] = { type, task }; + } + + removeProxy(name: string): void { + delete this.proxies[name]; + } + + setProxyLimiter(name: string, limiter: RateLimiter): void { + this.proxies[name].limiter = limiter; + } + + /* + * Make a request to the specified URL with any available proxy + * @param {string} url - The URL to request. + * @param {string} method - The HTTP method to use for the request. Default is "GET". + * @returns {Promise} - A promise that resolves to the response body. + * @throws {NetSchedulerError} - The error will be thrown in following cases: + * - No available proxy currently: with error code NO_AVAILABLE_PROXY + * - Proxy is under rate limit: with error code PROXY_RATE_LIMITED + * - The native `fetch` function threw an error: with error code FETCH_ERROR + * - The proxy type is not supported: with error code NOT_IMPLEMENTED + */ + async request(url: string, method: string = "GET"): Promise { + // find a available proxy + const proxiesNames = Object.keys(this.proxies); + for (const proxyName of proxiesNames) { + const proxy = this.proxies[proxyName]; + if (!proxy.limiter) { + return await this.proxyRequest(url, proxyName, method); + } + const proxyIsNotRateLimited = await proxy.limiter.getAvailability(); + if (proxyIsNotRateLimited) { + return await this.proxyRequest(url, proxyName, method); + } + } + throw new NetSchedulerError("No available proxy currently.", "NO_AVAILABLE_PROXY"); + } + + /* + * Make a request to the specified URL with the specified proxy + * @param {string} url - The URL to request. + * @param {string} proxyName - The name of the proxy to use. + * @param {string} method - The HTTP method to use for the request. Default is "GET". + * @param {boolean} force - If true, the request will be made even if the proxy is rate limited. Default is false. + * @returns {Promise} - A promise that resolves to the response body. + * @throws {NetSchedulerError} - The error will be thrown in following cases: + * - Proxy not found: with error code PROXY_NOT_FOUND + * - Proxy is under rate limit: with error code PROXY_RATE_LIMITED + * - The native `fetch` function threw an error: with error code FETCH_ERROR + * - The proxy type is not supported: with error code NOT_IMPLEMENTED + */ + async proxyRequest(url: string, proxyName: string, method: string = "GET", force: boolean = false): Promise { + const proxy = this.proxies[proxyName]; + const limiterExists = proxy.limiter !== undefined; + if (!proxy) { + throw new NetSchedulerError(`Proxy "${proxy}" not found`, "PROXY_NOT_FOUND"); + } + + if (!force && limiterExists && !(await proxy.limiter!.getAvailability())) { + throw new NetSchedulerError(`Proxy "${proxy}" is rate limited`, "PROXY_RATE_LIMITED"); + } + + if (limiterExists) { + await proxy.limiter!.trigger(); + } + + switch (proxy.type) { + case "native": + return await this.nativeRequest(url, method); + default: + throw new NetSchedulerError(`Proxy type ${proxy.type} not supported.`, "NOT_IMPLEMENTED"); + } + } + + async getProxyAvailability(name: string): Promise { + const proxyConfig = this.proxies[name]; + if (!proxyConfig || !proxyConfig.limiter) { + return true; + } + return await proxyConfig.limiter.getAvailability(); + } + + private async nativeRequest(url: string, method: string): Promise { + try { + const response = await fetch(url, { method }); + return await response.json() as R; + } catch (e) { + logger.error(e as Error); + throw new NetSchedulerError("Fetch error", "FETCH_ERROR"); + } + } +} diff --git a/lib/mq/slidingWindow.ts b/lib/mq/slidingWindow.ts new file mode 100644 index 0000000..4cfbece --- /dev/null +++ b/lib/mq/slidingWindow.ts @@ -0,0 +1,46 @@ +import { Redis } from "ioredis"; + +export class SlidingWindow { + private redis: Redis; + private readonly windowSize: number; + + constructor(redisClient: Redis, windowSize: number) { + this.redis = redisClient; + this.windowSize = windowSize; + } + + /* + * Trigger an event in the sliding window + * @param eventName The name of the event + */ + async event(eventName: string): Promise { + const now = Date.now(); + const key = `cvsa:sliding_window:${eventName}`; + + const uniqueMember = `${now}-${Math.random()}`; + // Add current timestamp to an ordered set + await this.redis.zadd(key, now, uniqueMember); + + // Remove timestamps outside the window + await this.redis.zremrangebyscore(key, 0, now - this.windowSize); + } + + /* + * Count the number of events in the sliding window + * @param eventName The name of the event + */ + async count(eventName: string): Promise { + const key = `cvsa:sliding_window:${eventName}`; + const now = Date.now(); + + // Remove timestamps outside the window + await this.redis.zremrangebyscore(key, 0, now - this.windowSize); + // Get the number of timestamps in the window + return this.redis.zcard(key); + } + + async clear(eventName: string): Promise { + const key = `cvsa:sliding_window:${eventName}`; + return await this.redis.del(key); + } +} diff --git a/src/db/raw/videoInfo.ts b/src/db/raw/videoInfo.ts index 54b3f32..10272b2 100644 --- a/src/db/raw/videoInfo.ts +++ b/src/db/raw/videoInfo.ts @@ -20,6 +20,7 @@ export async function getBiliBiliVideoInfo(bvidORaid?: string | number, region: async function proxyRequestWithRegion(url: string, region: string): Promise { const td = new TextDecoder(); + // aliyun configure set --access-key-id $ALIYUN_AK --access-key-secret $ALIYUN_SK --region cn-shenzhen --profile CVSA-shenzhen --mode AK const p = await new Deno.Command("aliyun", { args: [ "fc", diff --git a/test/mq/rateLimiter.test.ts b/test/mq/rateLimiter.test.ts new file mode 100644 index 0000000..c1df8f2 --- /dev/null +++ b/test/mq/rateLimiter.test.ts @@ -0,0 +1,96 @@ +import {assertEquals} from "jsr:@std/assert"; +import {redis} from "lib/db/redis.ts"; +import {SlidingWindow} from "lib/mq/slidingWindow.ts"; +import {RateLimiter, RateLimiterConfig} from "lib/mq/rateLimiter.ts"; +import logger from "lib/log/logger.ts"; + +Deno.test("RateLimiter works correctly", async () => { + await redis.del("cvsa:sliding_window:test_event_config_0"); + const windowSize = 5000; + const maxRequests = 10; + + const slidingWindow = new SlidingWindow(redis, windowSize); + const config: RateLimiterConfig = { + window: slidingWindow, + max: maxRequests, + }; + const rateLimiter = new RateLimiter("test_event", [config]); + + // Initial availability should be true + assertEquals(await rateLimiter.getAvailability(), true); + + // Trigger events up to the limit + for (let i = 0; i < maxRequests + 1; i++) { + await rateLimiter.trigger(); + } + + logger.debug(`${await rateLimiter.getAvailability()}`); + + // Availability should now be false + assertEquals(await rateLimiter.getAvailability(), false); + + // Wait for the window to slide + await new Promise((resolve) => setTimeout(resolve, windowSize + 500)); // Add a small buffer + + // Availability should be true again + assertEquals(await rateLimiter.getAvailability(), true); + + // Clean up Redis after the test (important!) + await redis.del("cvsa:sliding_window:test_event_config_0"); +}); + +Deno.test("Multiple configs work correctly", async () => { + await redis.del("cvsa:sliding_window:test_event_multi_config_0"); // Corrected keys + await redis.del("cvsa:sliding_window:test_event_multi_config_1"); + const windowSize1 = 1000; // 1 second window + const maxRequests1 = 2; + const windowSize2 = 5000; // 2 second window + const maxRequests2 = 6; + + const slidingWindow1 = new SlidingWindow(redis, windowSize1); + const config1: RateLimiterConfig = { + window: slidingWindow1, + max: maxRequests1, + }; + const slidingWindow2 = new SlidingWindow(redis, windowSize2); + const config2: RateLimiterConfig = { + window: slidingWindow2, + max: maxRequests2, + }; + const rateLimiter = new RateLimiter("test_event_multi", [config1, config2]); + + // Initial availability should be true + assertEquals(await rateLimiter.getAvailability(), true); + + // Trigger events up to the limit of the first config + for (let i = 0; i < maxRequests1; i++) { + await rateLimiter.trigger(); + } + + // Availability should now be false (due to config1) + assertEquals(await rateLimiter.getAvailability(), false); + + // Wait for the first window to slide + await new Promise((resolve) => setTimeout(resolve, windowSize1 + 500)); // Add a small buffer + + // Availability should now be true (due to config1) + assertEquals(await rateLimiter.getAvailability(), true); // Corrected Assertion + + // Trigger events up to the limit of the second config + for (let i = maxRequests1; i < maxRequests2; i++) { + await rateLimiter.trigger(); + } + + // Availability should still be false (due to config2) + assertEquals(await rateLimiter.getAvailability(), false); + + // Wait for the second window to slide + await new Promise((resolve) => setTimeout(resolve, windowSize2 + 500)); // Add a small buffer + + // Availability should be true again + assertEquals(await rateLimiter.getAvailability(), true); + + // Clean up Redis after the test (important!) + await redis.del("cvsa:sliding_window:test_event_multi_config_0"); // Corrected keys + await redis.del("cvsa:sliding_window:test_event_multi_config_1"); +}); \ No newline at end of file diff --git a/test/mq/slidingWindow.test.ts b/test/mq/slidingWindow.test.ts new file mode 100644 index 0000000..c441694 --- /dev/null +++ b/test/mq/slidingWindow.test.ts @@ -0,0 +1,101 @@ +import { assertEquals } from "jsr:@std/assert"; +import { SlidingWindow } from "lib/mq/slidingWindow.ts"; +import { Redis } from "ioredis"; + +Deno.test("SlidingWindow - event and count", async () => { + const redis = new Redis({ maxRetriesPerRequest: null }); + const windowSize = 5000; // 5 seconds + const slidingWindow = new SlidingWindow(redis, windowSize); + const eventName = "test_event"; + slidingWindow.clear(eventName); + + await slidingWindow.event(eventName); + const count = await slidingWindow.count(eventName); + + assertEquals(count, 1); + redis.quit(); +}); + +Deno.test("SlidingWindow - multiple events", async () => { + const redis = new Redis({ maxRetriesPerRequest: null }); + const windowSize = 5000; // 5 seconds + const slidingWindow = new SlidingWindow(redis, windowSize); + const eventName = "test_event"; + slidingWindow.clear(eventName); + + await slidingWindow.event(eventName); + await slidingWindow.event(eventName); + await slidingWindow.event(eventName); + const count = await slidingWindow.count(eventName); + + assertEquals(count, 3); + redis.quit(); +}); + +Deno.test("SlidingWindow - events outside window", async () => { + const redis = new Redis({ maxRetriesPerRequest: null }); + const windowSize = 5000; // 5 seconds + const slidingWindow = new SlidingWindow(redis, windowSize); + const eventName = "test_event"; + slidingWindow.clear(eventName); + + const now = Date.now(); + await redis.zadd(`cvsa:sliding_window:${eventName}`, now - windowSize - 1000, now - windowSize - 1000); // Event outside the window + await slidingWindow.event(eventName); // Event inside the window + + const count = await slidingWindow.count(eventName); + + assertEquals(count, 1); + redis.quit(); +}); + +Deno.test("SlidingWindow - no events", async () => { + const redis = new Redis({ maxRetriesPerRequest: null }); + const windowSize = 5000; // 5 seconds + const slidingWindow = new SlidingWindow(redis, windowSize); + const eventName = "test_event"; + slidingWindow.clear(eventName); + + const count = await slidingWindow.count(eventName); + + assertEquals(count, 0); + redis.quit(); +}); + +Deno.test("SlidingWindow - different event names", async () => { + const redis = new Redis({ maxRetriesPerRequest: null }); + const windowSize = 5000; // 5 seconds + const slidingWindow = new SlidingWindow(redis, windowSize); + const eventName1 = "test_event_1"; + const eventName2 = "test_event_2"; + slidingWindow.clear(eventName1); + slidingWindow.clear(eventName2); + + await slidingWindow.event(eventName1); + await slidingWindow.event(eventName2); + + const count1 = await slidingWindow.count(eventName1); + const count2 = await slidingWindow.count(eventName2); + + assertEquals(count1, 1); + assertEquals(count2, 1); + redis.quit(); +}); + +Deno.test("SlidingWindow - large number of events", async () => { + const redis = new Redis({ maxRetriesPerRequest: null }); + const windowSize = 5000; // 5 seconds + const slidingWindow = new SlidingWindow(redis, windowSize); + const eventName = "test_event"; + slidingWindow.clear(eventName); + const numEvents = 1000; + + for (let i = 0; i < numEvents; i++) { + await slidingWindow.event(eventName); + } + + const count = await slidingWindow.count(eventName); + + assertEquals(count, numEvents); + redis.quit(); +}); diff --git a/worker.ts b/worker.ts index 34564a4..33da57b 100644 --- a/worker.ts +++ b/worker.ts @@ -1,5 +1,5 @@ import { Job, Worker } from "bullmq"; -import { insertVideosWorker } from "lib/mq/executors.ts"; +import { getLatestVideosWorker } from "lib/mq/executors.ts"; import { redis } from "lib/db/redis.ts"; import logger from "lib/log/logger.ts"; @@ -8,7 +8,7 @@ const worker = new Worker( async (job: Job) => { switch (job.name) { case "getLatestVideos": - await insertVideosWorker(job); + await getLatestVideosWorker(job); break; default: break;