From 248978a3e8e28f526029990f62be354d65d6c26c Mon Sep 17 00:00:00 2001 From: alikia2x Date: Mon, 10 Feb 2025 22:48:37 +0800 Subject: [PATCH] improve: error handling for NetScheduler fix: incorrect test code --- lib/db/redis.ts | 1 - lib/mq/rateLimiter.ts | 12 +++++- lib/mq/scheduler.ts | 79 +++++++++++++++++++++++++---------- lib/mq/slidingWindow.ts | 11 +++-- test/mq/rateLimiter.test.ts | 35 +++++++--------- test/mq/slidingWindow.test.ts | 29 +++---------- 6 files changed, 98 insertions(+), 69 deletions(-) diff --git a/lib/db/redis.ts b/lib/db/redis.ts index cb7ebb3..7e8152f 100644 --- a/lib/db/redis.ts +++ b/lib/db/redis.ts @@ -1,4 +1,3 @@ - import { Redis } from "ioredis"; export const redis = new Redis({ maxRetriesPerRequest: null }); \ No newline at end of file diff --git a/lib/mq/rateLimiter.ts b/lib/mq/rateLimiter.ts index 52063f2..41a2f4f 100644 --- a/lib/mq/rateLimiter.ts +++ b/lib/mq/rateLimiter.ts @@ -11,7 +11,9 @@ export class RateLimiter { /* * @param name The name of the rate limiter - * @param configs The configuration of the rate limiter + * @param configs The configuration of the rate limiter, containing: + * - window: The sliding window to use + * - max: The maximum number of events allowed in the window */ constructor(name: string, configs: RateLimiterConfig[]) { this.configs = configs; @@ -43,4 +45,12 @@ export class RateLimiter { await config.window.event(eventName); } } + + async clear(): Promise { + for (let i = 0; i < this.configs.length; i++) { + const config = this.configs[i]; + const eventName = this.configEventNames[i]; + await config.window.clear(eventName); + } + } } \ No newline at end of file diff --git a/lib/mq/scheduler.ts b/lib/mq/scheduler.ts index 24fce04..a061953 100644 --- a/lib/mq/scheduler.ts +++ b/lib/mq/scheduler.ts @@ -1,5 +1,8 @@ import logger from "lib/log/logger.ts"; -import {RateLimiter} from "lib/mq/rateLimiter.ts"; +import { RateLimiter } from "lib/mq/rateLimiter.ts"; +import { SlidingWindow } from "lib/mq/slidingWindow.ts"; +import { redis } from "lib/db/redis.ts"; +import Redis from "ioredis"; interface Proxy { type: string; @@ -27,7 +30,7 @@ export class NetSchedulerError extends Error { } } -export class NetScheduler { +class NetScheduler { private proxies: ProxiesMap = {}; addProxy(name: string, type: string, task: string): void { @@ -48,10 +51,10 @@ export class NetScheduler { * @param {string} method - The HTTP method to use for the request. Default is "GET". * @returns {Promise} - A promise that resolves to the response body. * @throws {NetSchedulerError} - The error will be thrown in following cases: - * - No available proxy currently: with error code NO_AVAILABLE_PROXY - * - Proxy is under rate limit: with error code PROXY_RATE_LIMITED - * - The native `fetch` function threw an error: with error code FETCH_ERROR - * - The proxy type is not supported: with error code NOT_IMPLEMENTED + * - No available proxy currently: with error code NO_AVAILABLE_PROXY + * - Proxy is under rate limit: with error code PROXY_RATE_LIMITED + * - The native `fetch` function threw an error: with error code FETCH_ERROR + * - The proxy type is not supported: with error code NOT_IMPLEMENTED */ async request(url: string, method: string = "GET", task: string): Promise { // find a available proxy @@ -59,11 +62,7 @@ export class NetScheduler { for (const proxyName of proxiesNames) { const proxy = this.proxies[proxyName]; if (proxy.task !== task) continue; - if (!proxy.limiter) { - return await this.proxyRequest(url, proxyName, method); - } - const proxyIsNotRateLimited = await proxy.limiter.getAvailability(); - if (proxyIsNotRateLimited) { + if (await this.getProxyAvailability(proxyName)) { return await this.proxyRequest(url, proxyName, method); } } @@ -85,17 +84,24 @@ export class NetScheduler { */ async proxyRequest(url: string, proxyName: string, method: string = "GET", force: boolean = false): Promise { const proxy = this.proxies[proxyName]; - const limiterExists = proxy.limiter !== undefined; if (!proxy) { throw new NetSchedulerError(`Proxy "${proxy}" not found`, "PROXY_NOT_FOUND"); } - if (!force && limiterExists && !(await proxy.limiter!.getAvailability())) { + if (!force && await this.getProxyAvailability(proxyName) === false) { throw new NetSchedulerError(`Proxy "${proxy}" is rate limited`, "PROXY_RATE_LIMITED"); } - if (limiterExists) { - await proxy.limiter!.trigger(); + if (proxy.limiter) { + try { + await proxy.limiter!.trigger(); + } catch (e) { + const error = e as Error; + if (e instanceof Redis.ReplyError) { + logger.error(error, "redis"); + } + logger.warn(`Unhandled error: ${error.message}`, "mq", "proxyRequest"); + } } switch (proxy.type) { @@ -106,21 +112,52 @@ export class NetScheduler { } } - async getProxyAvailability(name: string): Promise { - const proxyConfig = this.proxies[name]; - if (!proxyConfig || !proxyConfig.limiter) { - return true; + private async getProxyAvailability(name: string): Promise { + try { + const proxyConfig = this.proxies[name]; + if (!proxyConfig || !proxyConfig.limiter) { + return true; + } + return await proxyConfig.limiter.getAvailability(); + } catch (e) { + const error = e as Error; + if (e instanceof Redis.ReplyError) { + logger.error(error, "redis"); + return false; + } + logger.warn(`Unhandled error: ${error.message}`, "mq", "getProxyAvailability"); + return false; } - return await proxyConfig.limiter.getAvailability(); } private async nativeRequest(url: string, method: string): Promise { try { const response = await fetch(url, { method }); - return await response.json() as R; + const data = await response.json() as R; + return data; } catch (e) { logger.error(e as Error); throw new NetSchedulerError("Fetch error", "FETCH_ERROR"); } } } + +const netScheduler = new NetScheduler(); +netScheduler.addProxy("tags-native", "native", "getVideoTags"); +const tagsRateLimiter = new RateLimiter("getVideoTags", [ + { + window: new SlidingWindow(redis, 1.2), + max: 1, + }, + { + window: new SlidingWindow(redis, 30), + max: 5, + }, + { + window: new SlidingWindow(redis, 5 * 60), + max: 70, + }, +]); +netScheduler.setProxyLimiter("tags-native", tagsRateLimiter); + +export default netScheduler; diff --git a/lib/mq/slidingWindow.ts b/lib/mq/slidingWindow.ts index 4cfbece..049a9f0 100644 --- a/lib/mq/slidingWindow.ts +++ b/lib/mq/slidingWindow.ts @@ -4,9 +4,14 @@ export class SlidingWindow { private redis: Redis; private readonly windowSize: number; + /* + * Create a new sliding window + * @param redisClient The Redis client used to store the data + * @param windowSize The size of the window in seconds + */ constructor(redisClient: Redis, windowSize: number) { this.redis = redisClient; - this.windowSize = windowSize; + this.windowSize = windowSize * 1000; } /* @@ -39,8 +44,8 @@ export class SlidingWindow { return this.redis.zcard(key); } - async clear(eventName: string): Promise { + clear(eventName: string): Promise { const key = `cvsa:sliding_window:${eventName}`; - return await this.redis.del(key); + return this.redis.del(key); } } diff --git a/test/mq/rateLimiter.test.ts b/test/mq/rateLimiter.test.ts index c1df8f2..054e945 100644 --- a/test/mq/rateLimiter.test.ts +++ b/test/mq/rateLimiter.test.ts @@ -1,12 +1,11 @@ import {assertEquals} from "jsr:@std/assert"; -import {redis} from "lib/db/redis.ts"; import {SlidingWindow} from "lib/mq/slidingWindow.ts"; import {RateLimiter, RateLimiterConfig} from "lib/mq/rateLimiter.ts"; -import logger from "lib/log/logger.ts"; +import {Redis} from "npm:ioredis@5.5.0"; Deno.test("RateLimiter works correctly", async () => { - await redis.del("cvsa:sliding_window:test_event_config_0"); - const windowSize = 5000; + const redis = new Redis({ maxRetriesPerRequest: null }); + const windowSize = 5; const maxRequests = 10; const slidingWindow = new SlidingWindow(redis, windowSize); @@ -15,36 +14,33 @@ Deno.test("RateLimiter works correctly", async () => { max: maxRequests, }; const rateLimiter = new RateLimiter("test_event", [config]); + await rateLimiter.clear(); // Initial availability should be true assertEquals(await rateLimiter.getAvailability(), true); // Trigger events up to the limit - for (let i = 0; i < maxRequests + 1; i++) { + for (let i = 0; i < maxRequests; i++) { await rateLimiter.trigger(); } - logger.debug(`${await rateLimiter.getAvailability()}`); - // Availability should now be false assertEquals(await rateLimiter.getAvailability(), false); // Wait for the window to slide - await new Promise((resolve) => setTimeout(resolve, windowSize + 500)); // Add a small buffer + await new Promise((resolve) => setTimeout(resolve, windowSize * 1000 + 500)); // Availability should be true again assertEquals(await rateLimiter.getAvailability(), true); - // Clean up Redis after the test (important!) - await redis.del("cvsa:sliding_window:test_event_config_0"); + redis.quit(); }); Deno.test("Multiple configs work correctly", async () => { - await redis.del("cvsa:sliding_window:test_event_multi_config_0"); // Corrected keys - await redis.del("cvsa:sliding_window:test_event_multi_config_1"); - const windowSize1 = 1000; // 1 second window + const redis = new Redis({ maxRetriesPerRequest: null }); + const windowSize1 = 1; const maxRequests1 = 2; - const windowSize2 = 5000; // 2 second window + const windowSize2 = 5; const maxRequests2 = 6; const slidingWindow1 = new SlidingWindow(redis, windowSize1); @@ -58,6 +54,7 @@ Deno.test("Multiple configs work correctly", async () => { max: maxRequests2, }; const rateLimiter = new RateLimiter("test_event_multi", [config1, config2]); + await rateLimiter.clear(); // Initial availability should be true assertEquals(await rateLimiter.getAvailability(), true); @@ -71,10 +68,10 @@ Deno.test("Multiple configs work correctly", async () => { assertEquals(await rateLimiter.getAvailability(), false); // Wait for the first window to slide - await new Promise((resolve) => setTimeout(resolve, windowSize1 + 500)); // Add a small buffer + await new Promise((resolve) => setTimeout(resolve, windowSize1 * 1000 + 500)); // Availability should now be true (due to config1) - assertEquals(await rateLimiter.getAvailability(), true); // Corrected Assertion + assertEquals(await rateLimiter.getAvailability(), true); // Trigger events up to the limit of the second config for (let i = maxRequests1; i < maxRequests2; i++) { @@ -85,12 +82,10 @@ Deno.test("Multiple configs work correctly", async () => { assertEquals(await rateLimiter.getAvailability(), false); // Wait for the second window to slide - await new Promise((resolve) => setTimeout(resolve, windowSize2 + 500)); // Add a small buffer + await new Promise((resolve) => setTimeout(resolve, windowSize2 * 1000 + 500)); // Availability should be true again assertEquals(await rateLimiter.getAvailability(), true); - // Clean up Redis after the test (important!) - await redis.del("cvsa:sliding_window:test_event_multi_config_0"); // Corrected keys - await redis.del("cvsa:sliding_window:test_event_multi_config_1"); + redis.quit(); }); \ No newline at end of file diff --git a/test/mq/slidingWindow.test.ts b/test/mq/slidingWindow.test.ts index c441694..cde8d11 100644 --- a/test/mq/slidingWindow.test.ts +++ b/test/mq/slidingWindow.test.ts @@ -7,7 +7,7 @@ Deno.test("SlidingWindow - event and count", async () => { const windowSize = 5000; // 5 seconds const slidingWindow = new SlidingWindow(redis, windowSize); const eventName = "test_event"; - slidingWindow.clear(eventName); + await slidingWindow.clear(eventName); await slidingWindow.event(eventName); const count = await slidingWindow.count(eventName); @@ -21,7 +21,7 @@ Deno.test("SlidingWindow - multiple events", async () => { const windowSize = 5000; // 5 seconds const slidingWindow = new SlidingWindow(redis, windowSize); const eventName = "test_event"; - slidingWindow.clear(eventName); + await slidingWindow.clear(eventName); await slidingWindow.event(eventName); await slidingWindow.event(eventName); @@ -32,29 +32,12 @@ Deno.test("SlidingWindow - multiple events", async () => { redis.quit(); }); -Deno.test("SlidingWindow - events outside window", async () => { - const redis = new Redis({ maxRetriesPerRequest: null }); - const windowSize = 5000; // 5 seconds - const slidingWindow = new SlidingWindow(redis, windowSize); - const eventName = "test_event"; - slidingWindow.clear(eventName); - - const now = Date.now(); - await redis.zadd(`cvsa:sliding_window:${eventName}`, now - windowSize - 1000, now - windowSize - 1000); // Event outside the window - await slidingWindow.event(eventName); // Event inside the window - - const count = await slidingWindow.count(eventName); - - assertEquals(count, 1); - redis.quit(); -}); - Deno.test("SlidingWindow - no events", async () => { const redis = new Redis({ maxRetriesPerRequest: null }); const windowSize = 5000; // 5 seconds const slidingWindow = new SlidingWindow(redis, windowSize); const eventName = "test_event"; - slidingWindow.clear(eventName); + await slidingWindow.clear(eventName); const count = await slidingWindow.count(eventName); @@ -68,8 +51,8 @@ Deno.test("SlidingWindow - different event names", async () => { const slidingWindow = new SlidingWindow(redis, windowSize); const eventName1 = "test_event_1"; const eventName2 = "test_event_2"; - slidingWindow.clear(eventName1); - slidingWindow.clear(eventName2); + await slidingWindow.clear(eventName1); + await slidingWindow.clear(eventName2); await slidingWindow.event(eventName1); await slidingWindow.event(eventName2); @@ -87,7 +70,7 @@ Deno.test("SlidingWindow - large number of events", async () => { const windowSize = 5000; // 5 seconds const slidingWindow = new SlidingWindow(redis, windowSize); const eventName = "test_event"; - slidingWindow.clear(eventName); + await slidingWindow.clear(eventName); const numEvents = 1000; for (let i = 0; i < numEvents; i++) {