improve: error handling for NetScheduler

fix: incorrect test code
This commit is contained in:
alikia2x (寒寒) 2025-02-10 22:48:37 +08:00
parent e570e3bbff
commit 248978a3e8
Signed by: alikia2x
GPG Key ID: 56209E0CCD8420C6
6 changed files with 98 additions and 69 deletions

View File

@ -1,4 +1,3 @@
import { Redis } from "ioredis"; import { Redis } from "ioredis";
export const redis = new Redis({ maxRetriesPerRequest: null }); export const redis = new Redis({ maxRetriesPerRequest: null });

View File

@ -11,7 +11,9 @@ export class RateLimiter {
/* /*
* @param name The name of the rate limiter * @param name The name of the rate limiter
* @param configs The configuration of the rate limiter * @param configs The configuration of the rate limiter, containing:
* - window: The sliding window to use
* - max: The maximum number of events allowed in the window
*/ */
constructor(name: string, configs: RateLimiterConfig[]) { constructor(name: string, configs: RateLimiterConfig[]) {
this.configs = configs; this.configs = configs;
@ -43,4 +45,12 @@ export class RateLimiter {
await config.window.event(eventName); await config.window.event(eventName);
} }
} }
async clear(): Promise<void> {
for (let i = 0; i < this.configs.length; i++) {
const config = this.configs[i];
const eventName = this.configEventNames[i];
await config.window.clear(eventName);
}
}
} }

View File

@ -1,5 +1,8 @@
import logger from "lib/log/logger.ts"; import logger from "lib/log/logger.ts";
import { RateLimiter } from "lib/mq/rateLimiter.ts"; import { RateLimiter } from "lib/mq/rateLimiter.ts";
import { SlidingWindow } from "lib/mq/slidingWindow.ts";
import { redis } from "lib/db/redis.ts";
import Redis from "ioredis";
interface Proxy { interface Proxy {
type: string; type: string;
@ -27,7 +30,7 @@ export class NetSchedulerError extends Error {
} }
} }
export class NetScheduler { class NetScheduler {
private proxies: ProxiesMap = {}; private proxies: ProxiesMap = {};
addProxy(name: string, type: string, task: string): void { addProxy(name: string, type: string, task: string): void {
@ -59,11 +62,7 @@ export class NetScheduler {
for (const proxyName of proxiesNames) { for (const proxyName of proxiesNames) {
const proxy = this.proxies[proxyName]; const proxy = this.proxies[proxyName];
if (proxy.task !== task) continue; if (proxy.task !== task) continue;
if (!proxy.limiter) { if (await this.getProxyAvailability(proxyName)) {
return await this.proxyRequest<R>(url, proxyName, method);
}
const proxyIsNotRateLimited = await proxy.limiter.getAvailability();
if (proxyIsNotRateLimited) {
return await this.proxyRequest<R>(url, proxyName, method); return await this.proxyRequest<R>(url, proxyName, method);
} }
} }
@ -85,17 +84,24 @@ export class NetScheduler {
*/ */
async proxyRequest<R>(url: string, proxyName: string, method: string = "GET", force: boolean = false): Promise<R> { async proxyRequest<R>(url: string, proxyName: string, method: string = "GET", force: boolean = false): Promise<R> {
const proxy = this.proxies[proxyName]; const proxy = this.proxies[proxyName];
const limiterExists = proxy.limiter !== undefined;
if (!proxy) { if (!proxy) {
throw new NetSchedulerError(`Proxy "${proxy}" not found`, "PROXY_NOT_FOUND"); throw new NetSchedulerError(`Proxy "${proxy}" not found`, "PROXY_NOT_FOUND");
} }
if (!force && limiterExists && !(await proxy.limiter!.getAvailability())) { if (!force && await this.getProxyAvailability(proxyName) === false) {
throw new NetSchedulerError(`Proxy "${proxy}" is rate limited`, "PROXY_RATE_LIMITED"); throw new NetSchedulerError(`Proxy "${proxy}" is rate limited`, "PROXY_RATE_LIMITED");
} }
if (limiterExists) { if (proxy.limiter) {
try {
await proxy.limiter!.trigger(); await proxy.limiter!.trigger();
} catch (e) {
const error = e as Error;
if (e instanceof Redis.ReplyError) {
logger.error(error, "redis");
}
logger.warn(`Unhandled error: ${error.message}`, "mq", "proxyRequest");
}
} }
switch (proxy.type) { switch (proxy.type) {
@ -106,21 +112,52 @@ export class NetScheduler {
} }
} }
async getProxyAvailability(name: string): Promise<boolean> { private async getProxyAvailability(name: string): Promise<boolean> {
try {
const proxyConfig = this.proxies[name]; const proxyConfig = this.proxies[name];
if (!proxyConfig || !proxyConfig.limiter) { if (!proxyConfig || !proxyConfig.limiter) {
return true; return true;
} }
return await proxyConfig.limiter.getAvailability(); return await proxyConfig.limiter.getAvailability();
} catch (e) {
const error = e as Error;
if (e instanceof Redis.ReplyError) {
logger.error(error, "redis");
return false;
}
logger.warn(`Unhandled error: ${error.message}`, "mq", "getProxyAvailability");
return false;
}
} }
private async nativeRequest<R>(url: string, method: string): Promise<R> { private async nativeRequest<R>(url: string, method: string): Promise<R> {
try { try {
const response = await fetch(url, { method }); const response = await fetch(url, { method });
return await response.json() as R; const data = await response.json() as R;
return data;
} catch (e) { } catch (e) {
logger.error(e as Error); logger.error(e as Error);
throw new NetSchedulerError("Fetch error", "FETCH_ERROR"); throw new NetSchedulerError("Fetch error", "FETCH_ERROR");
} }
} }
} }
const netScheduler = new NetScheduler();
netScheduler.addProxy("tags-native", "native", "getVideoTags");
const tagsRateLimiter = new RateLimiter("getVideoTags", [
{
window: new SlidingWindow(redis, 1.2),
max: 1,
},
{
window: new SlidingWindow(redis, 30),
max: 5,
},
{
window: new SlidingWindow(redis, 5 * 60),
max: 70,
},
]);
netScheduler.setProxyLimiter("tags-native", tagsRateLimiter);
export default netScheduler;

View File

@ -4,9 +4,14 @@ export class SlidingWindow {
private redis: Redis; private redis: Redis;
private readonly windowSize: number; private readonly windowSize: number;
/*
* Create a new sliding window
* @param redisClient The Redis client used to store the data
* @param windowSize The size of the window in seconds
*/
constructor(redisClient: Redis, windowSize: number) { constructor(redisClient: Redis, windowSize: number) {
this.redis = redisClient; this.redis = redisClient;
this.windowSize = windowSize; this.windowSize = windowSize * 1000;
} }
/* /*
@ -39,8 +44,8 @@ export class SlidingWindow {
return this.redis.zcard(key); return this.redis.zcard(key);
} }
async clear(eventName: string): Promise<number> { clear(eventName: string): Promise<number> {
const key = `cvsa:sliding_window:${eventName}`; const key = `cvsa:sliding_window:${eventName}`;
return await this.redis.del(key); return this.redis.del(key);
} }
} }

View File

@ -1,12 +1,11 @@
import {assertEquals} from "jsr:@std/assert"; import {assertEquals} from "jsr:@std/assert";
import {redis} from "lib/db/redis.ts";
import {SlidingWindow} from "lib/mq/slidingWindow.ts"; import {SlidingWindow} from "lib/mq/slidingWindow.ts";
import {RateLimiter, RateLimiterConfig} from "lib/mq/rateLimiter.ts"; import {RateLimiter, RateLimiterConfig} from "lib/mq/rateLimiter.ts";
import logger from "lib/log/logger.ts"; import {Redis} from "npm:ioredis@5.5.0";
Deno.test("RateLimiter works correctly", async () => { Deno.test("RateLimiter works correctly", async () => {
await redis.del("cvsa:sliding_window:test_event_config_0"); const redis = new Redis({ maxRetriesPerRequest: null });
const windowSize = 5000; const windowSize = 5;
const maxRequests = 10; const maxRequests = 10;
const slidingWindow = new SlidingWindow(redis, windowSize); const slidingWindow = new SlidingWindow(redis, windowSize);
@ -15,36 +14,33 @@ Deno.test("RateLimiter works correctly", async () => {
max: maxRequests, max: maxRequests,
}; };
const rateLimiter = new RateLimiter("test_event", [config]); const rateLimiter = new RateLimiter("test_event", [config]);
await rateLimiter.clear();
// Initial availability should be true // Initial availability should be true
assertEquals(await rateLimiter.getAvailability(), true); assertEquals(await rateLimiter.getAvailability(), true);
// Trigger events up to the limit // Trigger events up to the limit
for (let i = 0; i < maxRequests + 1; i++) { for (let i = 0; i < maxRequests; i++) {
await rateLimiter.trigger(); await rateLimiter.trigger();
} }
logger.debug(`${await rateLimiter.getAvailability()}`);
// Availability should now be false // Availability should now be false
assertEquals(await rateLimiter.getAvailability(), false); assertEquals(await rateLimiter.getAvailability(), false);
// Wait for the window to slide // Wait for the window to slide
await new Promise((resolve) => setTimeout(resolve, windowSize + 500)); // Add a small buffer await new Promise((resolve) => setTimeout(resolve, windowSize * 1000 + 500));
// Availability should be true again // Availability should be true again
assertEquals(await rateLimiter.getAvailability(), true); assertEquals(await rateLimiter.getAvailability(), true);
// Clean up Redis after the test (important!) redis.quit();
await redis.del("cvsa:sliding_window:test_event_config_0");
}); });
Deno.test("Multiple configs work correctly", async () => { Deno.test("Multiple configs work correctly", async () => {
await redis.del("cvsa:sliding_window:test_event_multi_config_0"); // Corrected keys const redis = new Redis({ maxRetriesPerRequest: null });
await redis.del("cvsa:sliding_window:test_event_multi_config_1"); const windowSize1 = 1;
const windowSize1 = 1000; // 1 second window
const maxRequests1 = 2; const maxRequests1 = 2;
const windowSize2 = 5000; // 2 second window const windowSize2 = 5;
const maxRequests2 = 6; const maxRequests2 = 6;
const slidingWindow1 = new SlidingWindow(redis, windowSize1); const slidingWindow1 = new SlidingWindow(redis, windowSize1);
@ -58,6 +54,7 @@ Deno.test("Multiple configs work correctly", async () => {
max: maxRequests2, max: maxRequests2,
}; };
const rateLimiter = new RateLimiter("test_event_multi", [config1, config2]); const rateLimiter = new RateLimiter("test_event_multi", [config1, config2]);
await rateLimiter.clear();
// Initial availability should be true // Initial availability should be true
assertEquals(await rateLimiter.getAvailability(), true); assertEquals(await rateLimiter.getAvailability(), true);
@ -71,10 +68,10 @@ Deno.test("Multiple configs work correctly", async () => {
assertEquals(await rateLimiter.getAvailability(), false); assertEquals(await rateLimiter.getAvailability(), false);
// Wait for the first window to slide // Wait for the first window to slide
await new Promise((resolve) => setTimeout(resolve, windowSize1 + 500)); // Add a small buffer await new Promise((resolve) => setTimeout(resolve, windowSize1 * 1000 + 500));
// Availability should now be true (due to config1) // Availability should now be true (due to config1)
assertEquals(await rateLimiter.getAvailability(), true); // Corrected Assertion assertEquals(await rateLimiter.getAvailability(), true);
// Trigger events up to the limit of the second config // Trigger events up to the limit of the second config
for (let i = maxRequests1; i < maxRequests2; i++) { for (let i = maxRequests1; i < maxRequests2; i++) {
@ -85,12 +82,10 @@ Deno.test("Multiple configs work correctly", async () => {
assertEquals(await rateLimiter.getAvailability(), false); assertEquals(await rateLimiter.getAvailability(), false);
// Wait for the second window to slide // Wait for the second window to slide
await new Promise((resolve) => setTimeout(resolve, windowSize2 + 500)); // Add a small buffer await new Promise((resolve) => setTimeout(resolve, windowSize2 * 1000 + 500));
// Availability should be true again // Availability should be true again
assertEquals(await rateLimiter.getAvailability(), true); assertEquals(await rateLimiter.getAvailability(), true);
// Clean up Redis after the test (important!) redis.quit();
await redis.del("cvsa:sliding_window:test_event_multi_config_0"); // Corrected keys
await redis.del("cvsa:sliding_window:test_event_multi_config_1");
}); });

View File

@ -7,7 +7,7 @@ Deno.test("SlidingWindow - event and count", async () => {
const windowSize = 5000; // 5 seconds const windowSize = 5000; // 5 seconds
const slidingWindow = new SlidingWindow(redis, windowSize); const slidingWindow = new SlidingWindow(redis, windowSize);
const eventName = "test_event"; const eventName = "test_event";
slidingWindow.clear(eventName); await slidingWindow.clear(eventName);
await slidingWindow.event(eventName); await slidingWindow.event(eventName);
const count = await slidingWindow.count(eventName); const count = await slidingWindow.count(eventName);
@ -21,7 +21,7 @@ Deno.test("SlidingWindow - multiple events", async () => {
const windowSize = 5000; // 5 seconds const windowSize = 5000; // 5 seconds
const slidingWindow = new SlidingWindow(redis, windowSize); const slidingWindow = new SlidingWindow(redis, windowSize);
const eventName = "test_event"; const eventName = "test_event";
slidingWindow.clear(eventName); await slidingWindow.clear(eventName);
await slidingWindow.event(eventName); await slidingWindow.event(eventName);
await slidingWindow.event(eventName); await slidingWindow.event(eventName);
@ -32,29 +32,12 @@ Deno.test("SlidingWindow - multiple events", async () => {
redis.quit(); redis.quit();
}); });
Deno.test("SlidingWindow - events outside window", async () => {
const redis = new Redis({ maxRetriesPerRequest: null });
const windowSize = 5000; // 5 seconds
const slidingWindow = new SlidingWindow(redis, windowSize);
const eventName = "test_event";
slidingWindow.clear(eventName);
const now = Date.now();
await redis.zadd(`cvsa:sliding_window:${eventName}`, now - windowSize - 1000, now - windowSize - 1000); // Event outside the window
await slidingWindow.event(eventName); // Event inside the window
const count = await slidingWindow.count(eventName);
assertEquals(count, 1);
redis.quit();
});
Deno.test("SlidingWindow - no events", async () => { Deno.test("SlidingWindow - no events", async () => {
const redis = new Redis({ maxRetriesPerRequest: null }); const redis = new Redis({ maxRetriesPerRequest: null });
const windowSize = 5000; // 5 seconds const windowSize = 5000; // 5 seconds
const slidingWindow = new SlidingWindow(redis, windowSize); const slidingWindow = new SlidingWindow(redis, windowSize);
const eventName = "test_event"; const eventName = "test_event";
slidingWindow.clear(eventName); await slidingWindow.clear(eventName);
const count = await slidingWindow.count(eventName); const count = await slidingWindow.count(eventName);
@ -68,8 +51,8 @@ Deno.test("SlidingWindow - different event names", async () => {
const slidingWindow = new SlidingWindow(redis, windowSize); const slidingWindow = new SlidingWindow(redis, windowSize);
const eventName1 = "test_event_1"; const eventName1 = "test_event_1";
const eventName2 = "test_event_2"; const eventName2 = "test_event_2";
slidingWindow.clear(eventName1); await slidingWindow.clear(eventName1);
slidingWindow.clear(eventName2); await slidingWindow.clear(eventName2);
await slidingWindow.event(eventName1); await slidingWindow.event(eventName1);
await slidingWindow.event(eventName2); await slidingWindow.event(eventName2);
@ -87,7 +70,7 @@ Deno.test("SlidingWindow - large number of events", async () => {
const windowSize = 5000; // 5 seconds const windowSize = 5000; // 5 seconds
const slidingWindow = new SlidingWindow(redis, windowSize); const slidingWindow = new SlidingWindow(redis, windowSize);
const eventName = "test_event"; const eventName = "test_event";
slidingWindow.clear(eventName); await slidingWindow.clear(eventName);
const numEvents = 1000; const numEvents = 1000;
for (let i = 0; i < numEvents; i++) { for (let i = 0; i < numEvents; i++) {