From 137c19d74e7693f50a4108b87567ebd0b48eb6c7 Mon Sep 17 00:00:00 2001 From: alikia2x Date: Sun, 11 May 2025 01:50:02 +0800 Subject: [PATCH] ref: move from sliding window to token bucket in rate limiter --- packages/core/mq/rateLimiter.ts | 106 +++++++++++++++++------------- packages/core/mq/slidingWindow.ts | 51 -------------- packages/core/mq/tokenBucket.ts | 84 +++++++++++++++++++++++ packages/core/net/delegate.ts | 17 +++-- 4 files changed, 151 insertions(+), 107 deletions(-) delete mode 100644 packages/core/mq/slidingWindow.ts create mode 100644 packages/core/mq/tokenBucket.ts diff --git a/packages/core/mq/rateLimiter.ts b/packages/core/mq/rateLimiter.ts index ac42748..96f208c 100644 --- a/packages/core/mq/rateLimiter.ts +++ b/packages/core/mq/rateLimiter.ts @@ -1,56 +1,68 @@ -import type { SlidingWindow } from "./slidingWindow.ts"; +import { TokenBucket } from "./tokenBucket.ts"; export interface RateLimiterConfig { - window: SlidingWindow; - max: number; + duration: number; + max: number; } export class RateLimiter { - private readonly configs: RateLimiterConfig[]; - private readonly configEventNames: string[]; + private configs: RateLimiterConfig[] = []; + private buckets: TokenBucket[] = []; + private identifierFn: (configIndex: number) => string; - /* - * @param name The name of the rate limiter - * @param configs The configuration of the rate limiter, containing: - * - window: The sliding window to use - * - max: The maximum number of events allowed in the window - */ - constructor(name: string, configs: RateLimiterConfig[]) { - this.configs = configs; - this.configEventNames = configs.map((_, index) => `${name}_config_${index}`); - } + /* + * @param name The name of the rate limiter + * @param configs The configuration of the rate limiter, containing: + * - tokenBucket: The token bucket instance + * - max: The maximum number of tokens allowed per operation + */ + constructor( + name: string, + configs: RateLimiterConfig[], + identifierFn?: (configIndex: number) => string + ) { + this.configs = configs; + this.identifierFn = identifierFn || ((index) => `${name}_config_${index}`); + for (let i = 0; i < configs.length; i++) { + const config = configs[i]; + const bucket = new TokenBucket({ + capacity: config.max, + rate: config.max / config.duration, + identifier: this.identifierFn(i), + }) + this.buckets.push(bucket); + } + } - /* - * Check if the event has reached the rate limit - */ - async getAvailability(): Promise { - for (let i = 0; i < this.configs.length; i++) { - const config = this.configs[i]; - const eventName = this.configEventNames[i]; - const count = await config.window.count(eventName); - if (count >= config.max) { - return false; - } - } - return true; - } + /* + * Check if the event has reached the rate limit + */ + async getAvailability(): Promise { + for (let i = 0; i < this.configs.length; i++) { + const remaining = await this.buckets[i].getRemainingTokens(); - /* - * Trigger an event in the rate limiter - */ - async trigger(): Promise { - for (let i = 0; i < this.configs.length; i++) { - const config = this.configs[i]; - const eventName = this.configEventNames[i]; - await config.window.event(eventName); - } - } + if (remaining === null) { + return false; // Rate limit exceeded + } + } + return true; + } - async clear(): Promise { - for (let i = 0; i < this.configs.length; i++) { - const config = this.configs[i]; - const eventName = this.configEventNames[i]; - await config.window.clear(eventName); - } - } -} + /* + * Trigger an event in the rate limiter + */ + async trigger(): Promise { + for (let i = 0; i < this.configs.length; i++) { + await this.buckets[i].consume(1); + } + } + + /* + * Clear all buckets for all configurations + */ + async clear(): Promise { + for (let i = 0; i < this.configs.length; i++) { + await this.buckets[i].reset(); + } + } +} \ No newline at end of file diff --git a/packages/core/mq/slidingWindow.ts b/packages/core/mq/slidingWindow.ts deleted file mode 100644 index 457303c..0000000 --- a/packages/core/mq/slidingWindow.ts +++ /dev/null @@ -1,51 +0,0 @@ -import type { Redis } from "ioredis"; - -export class SlidingWindow { - private redis: Redis; - private readonly windowSize: number; - - /* - * Create a new sliding window - * @param redisClient The Redis client used to store the data - * @param windowSize The size of the window in seconds - */ - constructor(redisClient: Redis, windowSize: number) { - this.redis = redisClient; - this.windowSize = windowSize * 1000; - } - - /* - * Trigger an event in the sliding window - * @param eventName The name of the event - */ - async event(eventName: string): Promise { - const now = Date.now(); - const key = `cvsa:sliding_window:${eventName}`; - - const uniqueMember = `${now}-${Math.random()}`; - // Add current timestamp to an ordered set - await this.redis.zadd(key, now, uniqueMember); - - // Remove timestamps outside the window - await this.redis.zremrangebyscore(key, 0, now - this.windowSize); - } - - /* - * Count the number of events in the sliding window - * @param eventName The name of the event - */ - async count(eventName: string): Promise { - const key = `cvsa:sliding_window:${eventName}`; - const now = Date.now(); - - // Remove timestamps outside the window - await this.redis.zremrangebyscore(key, 0, now - this.windowSize); - // Get the number of timestamps in the window - return this.redis.zcard(key); - } - - clear(eventName: string): Promise { - const key = `cvsa:sliding_window:${eventName}`; - return this.redis.del(key); - } -} diff --git a/packages/core/mq/tokenBucket.ts b/packages/core/mq/tokenBucket.ts new file mode 100644 index 0000000..8e94db9 --- /dev/null +++ b/packages/core/mq/tokenBucket.ts @@ -0,0 +1,84 @@ +import { redis } from "@core/db/redis"; +import { SECOND } from "@core/const/time"; + +interface TokenBucketOptions { + capacity: number; + rate: number; + identifier: string; + keyPrefix?: string; +} + +export class TokenBucket { + private readonly capacity: number; + private readonly rate: number; + private readonly keyPrefix: string; + private readonly identifier: string; + + constructor(options: TokenBucketOptions) { + if (options.capacity <= 0 || options.rate <= 0) { + throw new Error("Capacity and rate must be greater than zero."); + } + + this.capacity = options.capacity; + this.rate = options.rate; + this.identifier = options.identifier; + this.keyPrefix = options.keyPrefix || "cvsa:token_bucket:"; + } + + getKey(): string { + return `${this.keyPrefix}${this.identifier}`; + } + + /** + * Try to consume a specified number of tokens + * @param count The number of tokens to be consumed + * @returns If consumption is successful, returns the number of remaining tokens; otherwise returns null + */ + public async consume(count: number): Promise { + const key = this.getKey(); + const now = Math.floor(Date.now() / SECOND); + + const script = ` + local tokens_key = KEYS[1] + local last_refilled_key = KEYS[2] + local now = tonumber(ARGV[1]) + local count = tonumber(ARGV[2]) + local capacity = tonumber(ARGV[3]) + local rate = tonumber(ARGV[4]) + + local last_refilled = tonumber(redis.call('GET', last_refilled_key)) or now + local current_tokens = tonumber(redis.call('GET', tokens_key)) or capacity + + local elapsed = now - last_refilled + local new_tokens = elapsed * rate + current_tokens = math.min(capacity, current_tokens + new_tokens) + + if current_tokens >= count then + current_tokens = current_tokens - count + redis.call('SET', tokens_key, current_tokens) + redis.call('SET', last_refilled_key, now) + return current_tokens + else + return nil + end + `; + + const keys = [`${key}:tokens`, `${key}:last_refilled`]; + const args = [now, count, this.capacity, this.rate]; + + const result = await redis.eval(script, keys.length, ...keys, ...args); + + return result as number | null; + } + + public async getRemainingTokens(): Promise { + const key = this.getKey(); + const tokens = await redis.get(`${key}:tokens`); + return Number(tokens) || this.capacity; + } + + public async reset(): Promise { + const key = this.getKey(); + await redis.del(`${key}:tokens`, `${key}:last_refilled`); + } +} diff --git a/packages/core/net/delegate.ts b/packages/core/net/delegate.ts index e7ca3f1..e1518c0 100644 --- a/packages/core/net/delegate.ts +++ b/packages/core/net/delegate.ts @@ -1,6 +1,5 @@ import logger from "@core/log/logger.ts"; import { RateLimiter, type RateLimiterConfig } from "mq/rateLimiter.ts"; -import { SlidingWindow } from "mq/slidingWindow.ts"; import { redis } from "db/redis.ts"; import { ReplyError } from "ioredis"; import { SECOND } from "../const/time.ts"; @@ -316,37 +315,37 @@ class NetworkDelegate { const networkDelegate = new NetworkDelegate(); const videoInfoRateLimiterConfig: RateLimiterConfig[] = [ { - window: new SlidingWindow(redis, 0.3), + duration: 0.3, max: 1, }, { - window: new SlidingWindow(redis, 3), + duration: 3, max: 5, }, { - window: new SlidingWindow(redis, 30), + duration: 30, max: 30, }, { - window: new SlidingWindow(redis, 2 * 60), + duration: 2 * 60, max: 50, }, ]; const biliLimiterConfig: RateLimiterConfig[] = [ { - window: new SlidingWindow(redis, 1), + duration: 1, max: 6, }, { - window: new SlidingWindow(redis, 5), + duration: 5, max: 20, }, { - window: new SlidingWindow(redis, 30), + duration: 30, max: 100, }, { - window: new SlidingWindow(redis, 5 * 60), + duration: 5 * 60, max: 200, }, ];