diff --git a/packages/core/mq/multipleRateLimiter.ts b/packages/core/mq/multipleRateLimiter.ts index c5f7bd1..5c9333e 100644 --- a/packages/core/mq/multipleRateLimiter.ts +++ b/packages/core/mq/multipleRateLimiter.ts @@ -6,6 +6,15 @@ export interface RateLimiterConfig { max: number; } +export class RateLimiterError extends Error { + public code: string; + constructor(message: string) { + super(message); + this.name = "RateLimiterError"; + this.code = "RATE_LIMIT_EXCEEDED"; + } +} + export class MultipleRateLimiter { private readonly name: string; private readonly configs: RateLimiterConfig[] = []; @@ -26,38 +35,21 @@ export class MultipleRateLimiter { this.name = name; } - /* - * Check if the event has reached the rate limit - */ - async getAvailability(): Promise { - for (let i = 0; i < this.configs.length; i++) { - const { duration, max } = this.configs[i]; - const { remaining } = await this.limiter.allow(`cvsa:${this.name}_${i}`, { - burst: max, - ratePerPeriod: max, - period: duration, - cost: 0 - }); - - if (remaining < 1) { - return false; - } - } - return true; - } - /* * Trigger an event in the rate limiter */ - async trigger(): Promise { + async trigger(shouldThrow = true): Promise { for (let i = 0; i < this.configs.length; i++) { const { duration, max } = this.configs[i]; - await this.limiter.allow(`cvsa:${this.name}_${i}`, { + const { allowed } = await this.limiter.allow(`cvsa:${this.name}_${i}`, { burst: max, ratePerPeriod: max, period: duration, cost: 1 }); + if (!allowed && shouldThrow) { + throw new RateLimiterError("Rate limit exceeded") + } } } } \ No newline at end of file diff --git a/packages/core/net/delegate.ts b/packages/core/net/delegate.ts index a89934d..0c5d856 100644 --- a/packages/core/net/delegate.ts +++ b/packages/core/net/delegate.ts @@ -1,5 +1,5 @@ import logger from "@core/log/logger.ts"; -import { MultipleRateLimiter, type RateLimiterConfig } from "@core/mq/multipleRateLimiter.ts"; +import { MultipleRateLimiter, RateLimiterError, type RateLimiterConfig } from "@core/mq/multipleRateLimiter.ts"; import { ReplyError } from "ioredis"; import { SECOND } from "@core/const/time.ts"; import { spawn, SpawnOptions } from "child_process"; @@ -123,16 +123,19 @@ class NetworkDelegate { } } - async triggerLimiter(task: string, proxy: string): Promise { + async triggerLimiter(task: string, proxy: string, force: boolean = false): Promise { const limiterId = "proxy-" + proxy + "-" + task; const providerLimiterId = "provider-" + proxy + "-" + this.tasks[task].provider; try { - await this.proxyLimiters[limiterId]?.trigger(); - await this.providerLimiters[providerLimiterId]?.trigger(); + await this.proxyLimiters[limiterId]?.trigger(!force); + await this.providerLimiters[providerLimiterId]?.trigger(!force); } catch (e) { const error = e as Error; if (e instanceof ReplyError) { logger.error(error, "redis"); + } else if (e instanceof RateLimiterError) { + // Re-throw it to ensure this.request can catch it + throw e; } logger.warn(`Unhandled error: ${error.message}`, "mq", "proxyRequest"); } @@ -166,9 +169,15 @@ class NetworkDelegate { // find a available proxy const proxiesNames = this.getTaskProxies(task); for (const proxyName of shuffleArray(proxiesNames)) { - if (await this.getProxyAvailability(proxyName, task)) { + try { return await this.proxyRequest(url, proxyName, task, method); } + catch (e) { + if (e instanceof RateLimiterError) { + continue; + } + throw e; + } } throw new NetSchedulerError("No proxy is available currently.", "NO_PROXY_AVAILABLE"); } @@ -200,16 +209,8 @@ class NetworkDelegate { throw new NetSchedulerError(`Proxy "${proxyName}" not found`, "PROXY_NOT_FOUND"); } - if (!force) { - const isAvailable = await this.getProxyAvailability(proxyName, task); - const limiter = "proxy-" + proxyName + "-" + task; - if (!isAvailable) { - throw new NetSchedulerError(`Proxy "${limiter}" is rate limited`, "PROXY_RATE_LIMITED"); - } - } - + await this.triggerLimiter(task, proxyName, force); const result = await this.makeRequest(url, proxy, method); - await this.triggerLimiter(task, proxyName); return result; } @@ -224,32 +225,6 @@ class NetworkDelegate { } } - private async getProxyAvailability(proxyName: string, taskName: string): Promise { - try { - const task = this.tasks[taskName]; - const provider = task.provider; - const proxyLimiterId = "proxy-" + proxyName + "-" + task; - const providerLimiterId = "provider-" + proxyName + "-" + provider; - if (!this.proxyLimiters[proxyLimiterId]) { - const providerLimiter = this.providerLimiters[providerLimiterId]; - return await providerLimiter.getAvailability(); - } - const proxyLimiter = this.proxyLimiters[proxyLimiterId]; - const providerLimiter = this.providerLimiters[providerLimiterId]; - const providerAvailable = await providerLimiter.getAvailability(); - const proxyAvailable = await proxyLimiter.getAvailability(); - return providerAvailable && proxyAvailable; - } catch (e) { - const error = e as Error; - if (e instanceof ReplyError) { - logger.error(error, "redis"); - return false; - } - logger.error(error, "mq", "getProxyAvailability"); - return false; - } - } - private async nativeRequest(url: string, method: string): Promise { try { const controller = new AbortController();