update: rate limiter
This commit is contained in:
parent
c5ba673069
commit
6d946f74df
@ -6,6 +6,15 @@ export interface RateLimiterConfig {
|
||||
max: number;
|
||||
}
|
||||
|
||||
export class RateLimiterError extends Error {
|
||||
public code: string;
|
||||
constructor(message: string) {
|
||||
super(message);
|
||||
this.name = "RateLimiterError";
|
||||
this.code = "RATE_LIMIT_EXCEEDED";
|
||||
}
|
||||
}
|
||||
|
||||
export class MultipleRateLimiter {
|
||||
private readonly name: string;
|
||||
private readonly configs: RateLimiterConfig[] = [];
|
||||
@ -26,38 +35,21 @@ export class MultipleRateLimiter {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
/*
|
||||
* Check if the event has reached the rate limit
|
||||
*/
|
||||
async getAvailability(): Promise<boolean> {
|
||||
for (let i = 0; i < this.configs.length; i++) {
|
||||
const { duration, max } = this.configs[i];
|
||||
const { remaining } = await this.limiter.allow(`cvsa:${this.name}_${i}`, {
|
||||
burst: max,
|
||||
ratePerPeriod: max,
|
||||
period: duration,
|
||||
cost: 0
|
||||
});
|
||||
|
||||
if (remaining < 1) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Trigger an event in the rate limiter
|
||||
*/
|
||||
async trigger(): Promise<void> {
|
||||
async trigger(shouldThrow = true): Promise<void> {
|
||||
for (let i = 0; i < this.configs.length; i++) {
|
||||
const { duration, max } = this.configs[i];
|
||||
await this.limiter.allow(`cvsa:${this.name}_${i}`, {
|
||||
const { allowed } = await this.limiter.allow(`cvsa:${this.name}_${i}`, {
|
||||
burst: max,
|
||||
ratePerPeriod: max,
|
||||
period: duration,
|
||||
cost: 1
|
||||
});
|
||||
if (!allowed && shouldThrow) {
|
||||
throw new RateLimiterError("Rate limit exceeded")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -1,5 +1,5 @@
|
||||
import logger from "@core/log/logger.ts";
|
||||
import { MultipleRateLimiter, type RateLimiterConfig } from "@core/mq/multipleRateLimiter.ts";
|
||||
import { MultipleRateLimiter, RateLimiterError, type RateLimiterConfig } from "@core/mq/multipleRateLimiter.ts";
|
||||
import { ReplyError } from "ioredis";
|
||||
import { SECOND } from "@core/const/time.ts";
|
||||
import { spawn, SpawnOptions } from "child_process";
|
||||
@ -123,16 +123,19 @@ class NetworkDelegate {
|
||||
}
|
||||
}
|
||||
|
||||
async triggerLimiter(task: string, proxy: string): Promise<void> {
|
||||
async triggerLimiter(task: string, proxy: string, force: boolean = false): Promise<void> {
|
||||
const limiterId = "proxy-" + proxy + "-" + task;
|
||||
const providerLimiterId = "provider-" + proxy + "-" + this.tasks[task].provider;
|
||||
try {
|
||||
await this.proxyLimiters[limiterId]?.trigger();
|
||||
await this.providerLimiters[providerLimiterId]?.trigger();
|
||||
await this.proxyLimiters[limiterId]?.trigger(!force);
|
||||
await this.providerLimiters[providerLimiterId]?.trigger(!force);
|
||||
} catch (e) {
|
||||
const error = e as Error;
|
||||
if (e instanceof ReplyError) {
|
||||
logger.error(error, "redis");
|
||||
} else if (e instanceof RateLimiterError) {
|
||||
// Re-throw it to ensure this.request can catch it
|
||||
throw e;
|
||||
}
|
||||
logger.warn(`Unhandled error: ${error.message}`, "mq", "proxyRequest");
|
||||
}
|
||||
@ -166,9 +169,15 @@ class NetworkDelegate {
|
||||
// find a available proxy
|
||||
const proxiesNames = this.getTaskProxies(task);
|
||||
for (const proxyName of shuffleArray(proxiesNames)) {
|
||||
if (await this.getProxyAvailability(proxyName, task)) {
|
||||
try {
|
||||
return await this.proxyRequest<R>(url, proxyName, task, method);
|
||||
}
|
||||
catch (e) {
|
||||
if (e instanceof RateLimiterError) {
|
||||
continue;
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
throw new NetSchedulerError("No proxy is available currently.", "NO_PROXY_AVAILABLE");
|
||||
}
|
||||
@ -200,16 +209,8 @@ class NetworkDelegate {
|
||||
throw new NetSchedulerError(`Proxy "${proxyName}" not found`, "PROXY_NOT_FOUND");
|
||||
}
|
||||
|
||||
if (!force) {
|
||||
const isAvailable = await this.getProxyAvailability(proxyName, task);
|
||||
const limiter = "proxy-" + proxyName + "-" + task;
|
||||
if (!isAvailable) {
|
||||
throw new NetSchedulerError(`Proxy "${limiter}" is rate limited`, "PROXY_RATE_LIMITED");
|
||||
}
|
||||
}
|
||||
|
||||
await this.triggerLimiter(task, proxyName, force);
|
||||
const result = await this.makeRequest<R>(url, proxy, method);
|
||||
await this.triggerLimiter(task, proxyName);
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -224,32 +225,6 @@ class NetworkDelegate {
|
||||
}
|
||||
}
|
||||
|
||||
private async getProxyAvailability(proxyName: string, taskName: string): Promise<boolean> {
|
||||
try {
|
||||
const task = this.tasks[taskName];
|
||||
const provider = task.provider;
|
||||
const proxyLimiterId = "proxy-" + proxyName + "-" + task;
|
||||
const providerLimiterId = "provider-" + proxyName + "-" + provider;
|
||||
if (!this.proxyLimiters[proxyLimiterId]) {
|
||||
const providerLimiter = this.providerLimiters[providerLimiterId];
|
||||
return await providerLimiter.getAvailability();
|
||||
}
|
||||
const proxyLimiter = this.proxyLimiters[proxyLimiterId];
|
||||
const providerLimiter = this.providerLimiters[providerLimiterId];
|
||||
const providerAvailable = await providerLimiter.getAvailability();
|
||||
const proxyAvailable = await proxyLimiter.getAvailability();
|
||||
return providerAvailable && proxyAvailable;
|
||||
} catch (e) {
|
||||
const error = e as Error;
|
||||
if (e instanceof ReplyError) {
|
||||
logger.error(error, "redis");
|
||||
return false;
|
||||
}
|
||||
logger.error(error, "mq", "getProxyAvailability");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private async nativeRequest<R>(url: string, method: string): Promise<R> {
|
||||
try {
|
||||
const controller = new AbortController();
|
||||
|
Loading…
Reference in New Issue
Block a user