ref: move from sliding window to token bucket in rate limiter
This commit is contained in:
parent
5fb1355346
commit
137c19d74e
@ -1,56 +1,68 @@
|
||||
import type { SlidingWindow } from "./slidingWindow.ts";
|
||||
import { TokenBucket } from "./tokenBucket.ts";
|
||||
|
||||
export interface RateLimiterConfig {
|
||||
window: SlidingWindow;
|
||||
max: number;
|
||||
duration: number;
|
||||
max: number;
|
||||
}
|
||||
|
||||
export class RateLimiter {
|
||||
private readonly configs: RateLimiterConfig[];
|
||||
private readonly configEventNames: string[];
|
||||
private configs: RateLimiterConfig[] = [];
|
||||
private buckets: TokenBucket[] = [];
|
||||
private identifierFn: (configIndex: number) => string;
|
||||
|
||||
/*
|
||||
* @param name The name of the rate limiter
|
||||
* @param configs The configuration of the rate limiter, containing:
|
||||
* - window: The sliding window to use
|
||||
* - max: The maximum number of events allowed in the window
|
||||
*/
|
||||
constructor(name: string, configs: RateLimiterConfig[]) {
|
||||
this.configs = configs;
|
||||
this.configEventNames = configs.map((_, index) => `${name}_config_${index}`);
|
||||
}
|
||||
/*
|
||||
* @param name The name of the rate limiter
|
||||
* @param configs The configuration of the rate limiter, containing:
|
||||
* - tokenBucket: The token bucket instance
|
||||
* - max: The maximum number of tokens allowed per operation
|
||||
*/
|
||||
constructor(
|
||||
name: string,
|
||||
configs: RateLimiterConfig[],
|
||||
identifierFn?: (configIndex: number) => string
|
||||
) {
|
||||
this.configs = configs;
|
||||
this.identifierFn = identifierFn || ((index) => `${name}_config_${index}`);
|
||||
for (let i = 0; i < configs.length; i++) {
|
||||
const config = configs[i];
|
||||
const bucket = new TokenBucket({
|
||||
capacity: config.max,
|
||||
rate: config.max / config.duration,
|
||||
identifier: this.identifierFn(i),
|
||||
})
|
||||
this.buckets.push(bucket);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Check if the event has reached the rate limit
|
||||
*/
|
||||
async getAvailability(): Promise<boolean> {
|
||||
for (let i = 0; i < this.configs.length; i++) {
|
||||
const config = this.configs[i];
|
||||
const eventName = this.configEventNames[i];
|
||||
const count = await config.window.count(eventName);
|
||||
if (count >= config.max) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
/*
|
||||
* Check if the event has reached the rate limit
|
||||
*/
|
||||
async getAvailability(): Promise<boolean> {
|
||||
for (let i = 0; i < this.configs.length; i++) {
|
||||
const remaining = await this.buckets[i].getRemainingTokens();
|
||||
|
||||
/*
|
||||
* Trigger an event in the rate limiter
|
||||
*/
|
||||
async trigger(): Promise<void> {
|
||||
for (let i = 0; i < this.configs.length; i++) {
|
||||
const config = this.configs[i];
|
||||
const eventName = this.configEventNames[i];
|
||||
await config.window.event(eventName);
|
||||
}
|
||||
}
|
||||
if (remaining === null) {
|
||||
return false; // Rate limit exceeded
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
async clear(): Promise<void> {
|
||||
for (let i = 0; i < this.configs.length; i++) {
|
||||
const config = this.configs[i];
|
||||
const eventName = this.configEventNames[i];
|
||||
await config.window.clear(eventName);
|
||||
}
|
||||
}
|
||||
}
|
||||
/*
|
||||
* Trigger an event in the rate limiter
|
||||
*/
|
||||
async trigger(): Promise<void> {
|
||||
for (let i = 0; i < this.configs.length; i++) {
|
||||
await this.buckets[i].consume(1);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Clear all buckets for all configurations
|
||||
*/
|
||||
async clear(): Promise<void> {
|
||||
for (let i = 0; i < this.configs.length; i++) {
|
||||
await this.buckets[i].reset();
|
||||
}
|
||||
}
|
||||
}
|
@ -1,51 +0,0 @@
|
||||
import type { Redis } from "ioredis";
|
||||
|
||||
export class SlidingWindow {
|
||||
private redis: Redis;
|
||||
private readonly windowSize: number;
|
||||
|
||||
/*
|
||||
* Create a new sliding window
|
||||
* @param redisClient The Redis client used to store the data
|
||||
* @param windowSize The size of the window in seconds
|
||||
*/
|
||||
constructor(redisClient: Redis, windowSize: number) {
|
||||
this.redis = redisClient;
|
||||
this.windowSize = windowSize * 1000;
|
||||
}
|
||||
|
||||
/*
|
||||
* Trigger an event in the sliding window
|
||||
* @param eventName The name of the event
|
||||
*/
|
||||
async event(eventName: string): Promise<void> {
|
||||
const now = Date.now();
|
||||
const key = `cvsa:sliding_window:${eventName}`;
|
||||
|
||||
const uniqueMember = `${now}-${Math.random()}`;
|
||||
// Add current timestamp to an ordered set
|
||||
await this.redis.zadd(key, now, uniqueMember);
|
||||
|
||||
// Remove timestamps outside the window
|
||||
await this.redis.zremrangebyscore(key, 0, now - this.windowSize);
|
||||
}
|
||||
|
||||
/*
|
||||
* Count the number of events in the sliding window
|
||||
* @param eventName The name of the event
|
||||
*/
|
||||
async count(eventName: string): Promise<number> {
|
||||
const key = `cvsa:sliding_window:${eventName}`;
|
||||
const now = Date.now();
|
||||
|
||||
// Remove timestamps outside the window
|
||||
await this.redis.zremrangebyscore(key, 0, now - this.windowSize);
|
||||
// Get the number of timestamps in the window
|
||||
return this.redis.zcard(key);
|
||||
}
|
||||
|
||||
clear(eventName: string): Promise<number> {
|
||||
const key = `cvsa:sliding_window:${eventName}`;
|
||||
return this.redis.del(key);
|
||||
}
|
||||
}
|
84
packages/core/mq/tokenBucket.ts
Normal file
84
packages/core/mq/tokenBucket.ts
Normal file
@ -0,0 +1,84 @@
|
||||
import { redis } from "@core/db/redis";
|
||||
import { SECOND } from "@core/const/time";
|
||||
|
||||
interface TokenBucketOptions {
|
||||
capacity: number;
|
||||
rate: number;
|
||||
identifier: string;
|
||||
keyPrefix?: string;
|
||||
}
|
||||
|
||||
export class TokenBucket {
|
||||
private readonly capacity: number;
|
||||
private readonly rate: number;
|
||||
private readonly keyPrefix: string;
|
||||
private readonly identifier: string;
|
||||
|
||||
constructor(options: TokenBucketOptions) {
|
||||
if (options.capacity <= 0 || options.rate <= 0) {
|
||||
throw new Error("Capacity and rate must be greater than zero.");
|
||||
}
|
||||
|
||||
this.capacity = options.capacity;
|
||||
this.rate = options.rate;
|
||||
this.identifier = options.identifier;
|
||||
this.keyPrefix = options.keyPrefix || "cvsa:token_bucket:";
|
||||
}
|
||||
|
||||
getKey(): string {
|
||||
return `${this.keyPrefix}${this.identifier}`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to consume a specified number of tokens
|
||||
* @param count The number of tokens to be consumed
|
||||
* @returns If consumption is successful, returns the number of remaining tokens; otherwise returns null
|
||||
*/
|
||||
public async consume(count: number): Promise<number | null> {
|
||||
const key = this.getKey();
|
||||
const now = Math.floor(Date.now() / SECOND);
|
||||
|
||||
const script = `
|
||||
local tokens_key = KEYS[1]
|
||||
local last_refilled_key = KEYS[2]
|
||||
local now = tonumber(ARGV[1])
|
||||
local count = tonumber(ARGV[2])
|
||||
local capacity = tonumber(ARGV[3])
|
||||
local rate = tonumber(ARGV[4])
|
||||
|
||||
local last_refilled = tonumber(redis.call('GET', last_refilled_key)) or now
|
||||
local current_tokens = tonumber(redis.call('GET', tokens_key)) or capacity
|
||||
|
||||
local elapsed = now - last_refilled
|
||||
local new_tokens = elapsed * rate
|
||||
current_tokens = math.min(capacity, current_tokens + new_tokens)
|
||||
|
||||
if current_tokens >= count then
|
||||
current_tokens = current_tokens - count
|
||||
redis.call('SET', tokens_key, current_tokens)
|
||||
redis.call('SET', last_refilled_key, now)
|
||||
return current_tokens
|
||||
else
|
||||
return nil
|
||||
end
|
||||
`;
|
||||
|
||||
const keys = [`${key}:tokens`, `${key}:last_refilled`];
|
||||
const args = [now, count, this.capacity, this.rate];
|
||||
|
||||
const result = await redis.eval(script, keys.length, ...keys, ...args);
|
||||
|
||||
return result as number | null;
|
||||
}
|
||||
|
||||
public async getRemainingTokens(): Promise<number> {
|
||||
const key = this.getKey();
|
||||
const tokens = await redis.get(`${key}:tokens`);
|
||||
return Number(tokens) || this.capacity;
|
||||
}
|
||||
|
||||
public async reset(): Promise<void> {
|
||||
const key = this.getKey();
|
||||
await redis.del(`${key}:tokens`, `${key}:last_refilled`);
|
||||
}
|
||||
}
|
@ -1,6 +1,5 @@
|
||||
import logger from "@core/log/logger.ts";
|
||||
import { RateLimiter, type RateLimiterConfig } from "mq/rateLimiter.ts";
|
||||
import { SlidingWindow } from "mq/slidingWindow.ts";
|
||||
import { redis } from "db/redis.ts";
|
||||
import { ReplyError } from "ioredis";
|
||||
import { SECOND } from "../const/time.ts";
|
||||
@ -316,37 +315,37 @@ class NetworkDelegate {
|
||||
const networkDelegate = new NetworkDelegate();
|
||||
const videoInfoRateLimiterConfig: RateLimiterConfig[] = [
|
||||
{
|
||||
window: new SlidingWindow(redis, 0.3),
|
||||
duration: 0.3,
|
||||
max: 1,
|
||||
},
|
||||
{
|
||||
window: new SlidingWindow(redis, 3),
|
||||
duration: 3,
|
||||
max: 5,
|
||||
},
|
||||
{
|
||||
window: new SlidingWindow(redis, 30),
|
||||
duration: 30,
|
||||
max: 30,
|
||||
},
|
||||
{
|
||||
window: new SlidingWindow(redis, 2 * 60),
|
||||
duration: 2 * 60,
|
||||
max: 50,
|
||||
},
|
||||
];
|
||||
const biliLimiterConfig: RateLimiterConfig[] = [
|
||||
{
|
||||
window: new SlidingWindow(redis, 1),
|
||||
duration: 1,
|
||||
max: 6,
|
||||
},
|
||||
{
|
||||
window: new SlidingWindow(redis, 5),
|
||||
duration: 5,
|
||||
max: 20,
|
||||
},
|
||||
{
|
||||
window: new SlidingWindow(redis, 30),
|
||||
duration: 30,
|
||||
max: 100,
|
||||
},
|
||||
{
|
||||
window: new SlidingWindow(redis, 5 * 60),
|
||||
duration: 5 * 60,
|
||||
max: 200,
|
||||
},
|
||||
];
|
||||
|
Loading…
Reference in New Issue
Block a user