add: the net scheduler

This commit is contained in:
alikia2x (寒寒) 2025-02-10 21:32:03 +08:00
parent 04c7cca79d
commit 0a34c9a444
Signed by: alikia2x
GPG Key ID: 56209E0CCD8420C6
11 changed files with 515 additions and 47 deletions

View File

@ -13,7 +13,8 @@
"worker": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write worker.ts", "worker": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write worker.ts",
"adder": "deno run --allow-env --allow-read --allow-ffi --allow-net jobAdder.ts", "adder": "deno run --allow-env --allow-read --allow-ffi --allow-net jobAdder.ts",
"bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net bullui.ts", "bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net bullui.ts",
"all": "concurrently 'deno task start' 'deno task worker' 'deno task adder' 'deno task bullui'" "all": "concurrently 'deno task start' 'deno task worker' 'deno task adder' 'deno task bullui'",
"test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run"
}, },
"lint": { "lint": {
"rules": { "rules": {

View File

@ -0,0 +1,44 @@
import { Job } from "bullmq";
import { insertLatestVideos } from "lib/task/insertLatestVideo.ts";
import MainQueue from "lib/mq/index.ts";
import { MINUTE } from "$std/datetime/constants.ts";
import { db } from "lib/db/init.ts";
import { truncate } from "lib/utils/truncate.ts";
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import logger from "lib/log/logger.ts";
const delayMap = [5, 10, 15, 30, 60, 60];
const updateQueueInterval = async (failedCount: number, delay: number) => {
logger.log(`job:getLatestVideos added to queue, delay: ${(delay / MINUTE).toFixed(2)} minutes.`, "mq");
await MainQueue.upsertJobScheduler("getLatestVideos", {
every: delay,
}, {
data: {
failedCount: failedCount,
},
});
return;
};
const executeTask = async (client: Client, failedCount: number) => {
logger.log("getLatestVideos now executing", "task");
const result = await insertLatestVideos(client);
failedCount = result !== 0 ? truncate(failedCount + 1, 0, 5) : 0;
if (failedCount !== 0) {
await updateQueueInterval(failedCount, delayMap[failedCount] * MINUTE);
}
return;
};
export const getLatestVideosWorker = async (job: Job) => {
const failedCount = (job.data.failedCount ?? 0) as number;
const client = await db.connect();
try {
await executeTask(client, failedCount);
} finally {
client.release();
}
return;
};

View File

@ -0,0 +1,48 @@
import { Job } from "bullmq";
import { insertLatestVideos } from "lib/task/insertLatestVideo.ts";
import MainQueue from "lib/mq/index.ts";
import { MINUTE } from "$std/datetime/constants.ts";
import { db } from "lib/db/init.ts";
import { truncate } from "lib/utils/truncate.ts";
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import logger from "lib/log/logger.ts";
const delayMap = [5, 10, 15, 30, 60, 60];
const updateQueueInterval = async (failedCount: number, delay: number) => {
logger.log(`job:getVideoTags added to queue, delay: ${(delay / MINUTE).toFixed(2)} minutes.`, "mq");
await MainQueue.upsertJobScheduler("getVideoTags", {
every: delay,
}, {
data: {
failedCount: failedCount,
},
});
return;
};
const executeTask = async (client: Client, failedCount: number) => {
logger.log("getLatestVideos now executing", "task");
const result = await insertLatestVideos(client);
failedCount = result !== 0 ? truncate(failedCount + 1, 0, 5) : 0;
if (failedCount !== 0) {
await updateQueueInterval(failedCount, delayMap[failedCount] * MINUTE);
}
return;
};
export const getVideoTagsWorker = async (job: Job) => {
const failedCount = (job.data.failedCount ?? 0) as number;
const client = await db.connect();
const aid = job.data.aid;
if (!aid) {
return;
}
try {
await executeTask(client, failedCount);
} finally {
client.release();
}
return;
};

View File

@ -1,44 +1 @@
import { Job } from "bullmq"; export * from "lib/mq/exec/getLatestVideos.ts";
import { insertLatestVideos } from "lib/task/insertLatestVideo.ts";
import MainQueue from "lib/mq/index.ts";
import { MINUTE } from "$std/datetime/constants.ts";
import { db } from "lib/db/init.ts";
import { truncate } from "lib/utils/truncate.ts";
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import logger from "lib/log/logger.ts";
const delayMap = [5, 10, 15, 30, 60, 60];
const addJobToQueue = (failedCount: number, delay: number) => {
logger.log(`job:getLatestVideos added to queue, delay: ${(delay / MINUTE).toFixed(2)} minutes.`, "mq");
MainQueue.upsertJobScheduler("getLatestVideos", {
every: delay,
}, {
data: {
failedCount: failedCount,
},
});
return;
};
export const insertVideosWorker = async (job: Job) => {
const failedCount = (job.data.failedCount ?? 0) as number;
const client = await db.connect();
try {
await executeTask(client, failedCount);
} finally {
client.release();
}
return;
};
const executeTask = async (client: Client, failedCount: number) => {
logger.log("getLatestVideos now executing", "task");
const result = await insertLatestVideos(client);
failedCount = result !== 0 ? truncate(failedCount + 1, 0, 5) : 0;
if (failedCount !== 0) {
addJobToQueue(failedCount, delayMap[failedCount] * MINUTE);
}
return;
};

46
lib/mq/rateLimiter.ts Normal file
View File

@ -0,0 +1,46 @@
import { SlidingWindow } from "lib/mq/slidingWindow.ts";
export interface RateLimiterConfig {
window: SlidingWindow;
max: number;
}
export class RateLimiter {
private readonly configs: RateLimiterConfig[];
private readonly configEventNames: string[];
/*
* @param name The name of the rate limiter
* @param configs The configuration of the rate limiter
*/
constructor(name: string, configs: RateLimiterConfig[]) {
this.configs = configs;
this.configEventNames = configs.map((_, index) => `${name}_config_${index}`);
}
/*
* Check if the event has reached the rate limit
*/
async getAvailability(): Promise<boolean> {
for (let i = 0; i < this.configs.length; i++) {
const config = this.configs[i];
const eventName = this.configEventNames[i];
const count = await config.window.count(eventName);
if (count >= config.max) {
return false;
}
}
return true;
}
/*
* Trigger an event in the rate limiter
*/
async trigger(): Promise<void> {
for (let i = 0; i < this.configs.length; i++) {
const config = this.configs[i];
const eventName = this.configEventNames[i];
await config.window.event(eventName);
}
}
}

128
lib/mq/scheduler.ts Normal file
View File

@ -0,0 +1,128 @@
import logger from "lib/log/logger.ts";
import {RateLimiter} from "lib/mq/rateLimiter.ts";
interface Proxy {
type: string;
task: string;
data?: {
[key: string]: string;
};
limiter?: RateLimiter;
}
interface ProxiesMap {
[name: string]: Proxy;
}
type NetSchedulerErrorCode =
| "NO_AVAILABLE_PROXY"
| "PROXY_RATE_LIMITED"
| "PROXY_NOT_FOUND"
| "FETCH_ERROR"
| "NOT_IMPLEMENTED";
export class NetSchedulerError extends Error {
public errorCode: NetSchedulerErrorCode;
constructor(message: string, errorCode: NetSchedulerErrorCode) {
super(message);
this.name = "NetSchedulerError";
this.errorCode = errorCode;
}
}
export class NetScheduler {
private proxies: ProxiesMap = {};
addProxy(name: string, type: string, task: string): void {
this.proxies[name] = { type, task };
}
removeProxy(name: string): void {
delete this.proxies[name];
}
setProxyLimiter(name: string, limiter: RateLimiter): void {
this.proxies[name].limiter = limiter;
}
/*
* Make a request to the specified URL with any available proxy
* @param {string} url - The URL to request.
* @param {string} method - The HTTP method to use for the request. Default is "GET".
* @returns {Promise<any>} - A promise that resolves to the response body.
* @throws {NetSchedulerError} - The error will be thrown in following cases:
* - No available proxy currently: with error code NO_AVAILABLE_PROXY
* - Proxy is under rate limit: with error code PROXY_RATE_LIMITED
* - The native `fetch` function threw an error: with error code FETCH_ERROR
* - The proxy type is not supported: with error code NOT_IMPLEMENTED
*/
async request<R>(url: string, method: string = "GET"): Promise<R | null> {
// find a available proxy
const proxiesNames = Object.keys(this.proxies);
for (const proxyName of proxiesNames) {
const proxy = this.proxies[proxyName];
if (!proxy.limiter) {
return await this.proxyRequest<R>(url, proxyName, method);
}
const proxyIsNotRateLimited = await proxy.limiter.getAvailability();
if (proxyIsNotRateLimited) {
return await this.proxyRequest<R>(url, proxyName, method);
}
}
throw new NetSchedulerError("No available proxy currently.", "NO_AVAILABLE_PROXY");
}
/*
* Make a request to the specified URL with the specified proxy
* @param {string} url - The URL to request.
* @param {string} proxyName - The name of the proxy to use.
* @param {string} method - The HTTP method to use for the request. Default is "GET".
* @param {boolean} force - If true, the request will be made even if the proxy is rate limited. Default is false.
* @returns {Promise<any>} - A promise that resolves to the response body.
* @throws {NetSchedulerError} - The error will be thrown in following cases:
* - Proxy not found: with error code PROXY_NOT_FOUND
* - Proxy is under rate limit: with error code PROXY_RATE_LIMITED
* - The native `fetch` function threw an error: with error code FETCH_ERROR
* - The proxy type is not supported: with error code NOT_IMPLEMENTED
*/
async proxyRequest<R>(url: string, proxyName: string, method: string = "GET", force: boolean = false): Promise<R> {
const proxy = this.proxies[proxyName];
const limiterExists = proxy.limiter !== undefined;
if (!proxy) {
throw new NetSchedulerError(`Proxy "${proxy}" not found`, "PROXY_NOT_FOUND");
}
if (!force && limiterExists && !(await proxy.limiter!.getAvailability())) {
throw new NetSchedulerError(`Proxy "${proxy}" is rate limited`, "PROXY_RATE_LIMITED");
}
if (limiterExists) {
await proxy.limiter!.trigger();
}
switch (proxy.type) {
case "native":
return await this.nativeRequest<R>(url, method);
default:
throw new NetSchedulerError(`Proxy type ${proxy.type} not supported.`, "NOT_IMPLEMENTED");
}
}
async getProxyAvailability(name: string): Promise<boolean> {
const proxyConfig = this.proxies[name];
if (!proxyConfig || !proxyConfig.limiter) {
return true;
}
return await proxyConfig.limiter.getAvailability();
}
private async nativeRequest<R>(url: string, method: string): Promise<R> {
try {
const response = await fetch(url, { method });
return await response.json() as R;
} catch (e) {
logger.error(e as Error);
throw new NetSchedulerError("Fetch error", "FETCH_ERROR");
}
}
}

46
lib/mq/slidingWindow.ts Normal file
View File

@ -0,0 +1,46 @@
import { Redis } from "ioredis";
export class SlidingWindow {
private redis: Redis;
private readonly windowSize: number;
constructor(redisClient: Redis, windowSize: number) {
this.redis = redisClient;
this.windowSize = windowSize;
}
/*
* Trigger an event in the sliding window
* @param eventName The name of the event
*/
async event(eventName: string): Promise<void> {
const now = Date.now();
const key = `cvsa:sliding_window:${eventName}`;
const uniqueMember = `${now}-${Math.random()}`;
// Add current timestamp to an ordered set
await this.redis.zadd(key, now, uniqueMember);
// Remove timestamps outside the window
await this.redis.zremrangebyscore(key, 0, now - this.windowSize);
}
/*
* Count the number of events in the sliding window
* @param eventName The name of the event
*/
async count(eventName: string): Promise<number> {
const key = `cvsa:sliding_window:${eventName}`;
const now = Date.now();
// Remove timestamps outside the window
await this.redis.zremrangebyscore(key, 0, now - this.windowSize);
// Get the number of timestamps in the window
return this.redis.zcard(key);
}
async clear(eventName: string): Promise<number> {
const key = `cvsa:sliding_window:${eventName}`;
return await this.redis.del(key);
}
}

View File

@ -20,6 +20,7 @@ export async function getBiliBiliVideoInfo(bvidORaid?: string | number, region:
async function proxyRequestWithRegion(url: string, region: string): Promise<any | null> { async function proxyRequestWithRegion(url: string, region: string): Promise<any | null> {
const td = new TextDecoder(); const td = new TextDecoder();
// aliyun configure set --access-key-id $ALIYUN_AK --access-key-secret $ALIYUN_SK --region cn-shenzhen --profile CVSA-shenzhen --mode AK
const p = await new Deno.Command("aliyun", { const p = await new Deno.Command("aliyun", {
args: [ args: [
"fc", "fc",

View File

@ -0,0 +1,96 @@
import {assertEquals} from "jsr:@std/assert";
import {redis} from "lib/db/redis.ts";
import {SlidingWindow} from "lib/mq/slidingWindow.ts";
import {RateLimiter, RateLimiterConfig} from "lib/mq/rateLimiter.ts";
import logger from "lib/log/logger.ts";
Deno.test("RateLimiter works correctly", async () => {
await redis.del("cvsa:sliding_window:test_event_config_0");
const windowSize = 5000;
const maxRequests = 10;
const slidingWindow = new SlidingWindow(redis, windowSize);
const config: RateLimiterConfig = {
window: slidingWindow,
max: maxRequests,
};
const rateLimiter = new RateLimiter("test_event", [config]);
// Initial availability should be true
assertEquals(await rateLimiter.getAvailability(), true);
// Trigger events up to the limit
for (let i = 0; i < maxRequests + 1; i++) {
await rateLimiter.trigger();
}
logger.debug(`${await rateLimiter.getAvailability()}`);
// Availability should now be false
assertEquals(await rateLimiter.getAvailability(), false);
// Wait for the window to slide
await new Promise((resolve) => setTimeout(resolve, windowSize + 500)); // Add a small buffer
// Availability should be true again
assertEquals(await rateLimiter.getAvailability(), true);
// Clean up Redis after the test (important!)
await redis.del("cvsa:sliding_window:test_event_config_0");
});
Deno.test("Multiple configs work correctly", async () => {
await redis.del("cvsa:sliding_window:test_event_multi_config_0"); // Corrected keys
await redis.del("cvsa:sliding_window:test_event_multi_config_1");
const windowSize1 = 1000; // 1 second window
const maxRequests1 = 2;
const windowSize2 = 5000; // 2 second window
const maxRequests2 = 6;
const slidingWindow1 = new SlidingWindow(redis, windowSize1);
const config1: RateLimiterConfig = {
window: slidingWindow1,
max: maxRequests1,
};
const slidingWindow2 = new SlidingWindow(redis, windowSize2);
const config2: RateLimiterConfig = {
window: slidingWindow2,
max: maxRequests2,
};
const rateLimiter = new RateLimiter("test_event_multi", [config1, config2]);
// Initial availability should be true
assertEquals(await rateLimiter.getAvailability(), true);
// Trigger events up to the limit of the first config
for (let i = 0; i < maxRequests1; i++) {
await rateLimiter.trigger();
}
// Availability should now be false (due to config1)
assertEquals(await rateLimiter.getAvailability(), false);
// Wait for the first window to slide
await new Promise((resolve) => setTimeout(resolve, windowSize1 + 500)); // Add a small buffer
// Availability should now be true (due to config1)
assertEquals(await rateLimiter.getAvailability(), true); // Corrected Assertion
// Trigger events up to the limit of the second config
for (let i = maxRequests1; i < maxRequests2; i++) {
await rateLimiter.trigger();
}
// Availability should still be false (due to config2)
assertEquals(await rateLimiter.getAvailability(), false);
// Wait for the second window to slide
await new Promise((resolve) => setTimeout(resolve, windowSize2 + 500)); // Add a small buffer
// Availability should be true again
assertEquals(await rateLimiter.getAvailability(), true);
// Clean up Redis after the test (important!)
await redis.del("cvsa:sliding_window:test_event_multi_config_0"); // Corrected keys
await redis.del("cvsa:sliding_window:test_event_multi_config_1");
});

View File

@ -0,0 +1,101 @@
import { assertEquals } from "jsr:@std/assert";
import { SlidingWindow } from "lib/mq/slidingWindow.ts";
import { Redis } from "ioredis";
Deno.test("SlidingWindow - event and count", async () => {
const redis = new Redis({ maxRetriesPerRequest: null });
const windowSize = 5000; // 5 seconds
const slidingWindow = new SlidingWindow(redis, windowSize);
const eventName = "test_event";
slidingWindow.clear(eventName);
await slidingWindow.event(eventName);
const count = await slidingWindow.count(eventName);
assertEquals(count, 1);
redis.quit();
});
Deno.test("SlidingWindow - multiple events", async () => {
const redis = new Redis({ maxRetriesPerRequest: null });
const windowSize = 5000; // 5 seconds
const slidingWindow = new SlidingWindow(redis, windowSize);
const eventName = "test_event";
slidingWindow.clear(eventName);
await slidingWindow.event(eventName);
await slidingWindow.event(eventName);
await slidingWindow.event(eventName);
const count = await slidingWindow.count(eventName);
assertEquals(count, 3);
redis.quit();
});
Deno.test("SlidingWindow - events outside window", async () => {
const redis = new Redis({ maxRetriesPerRequest: null });
const windowSize = 5000; // 5 seconds
const slidingWindow = new SlidingWindow(redis, windowSize);
const eventName = "test_event";
slidingWindow.clear(eventName);
const now = Date.now();
await redis.zadd(`cvsa:sliding_window:${eventName}`, now - windowSize - 1000, now - windowSize - 1000); // Event outside the window
await slidingWindow.event(eventName); // Event inside the window
const count = await slidingWindow.count(eventName);
assertEquals(count, 1);
redis.quit();
});
Deno.test("SlidingWindow - no events", async () => {
const redis = new Redis({ maxRetriesPerRequest: null });
const windowSize = 5000; // 5 seconds
const slidingWindow = new SlidingWindow(redis, windowSize);
const eventName = "test_event";
slidingWindow.clear(eventName);
const count = await slidingWindow.count(eventName);
assertEquals(count, 0);
redis.quit();
});
Deno.test("SlidingWindow - different event names", async () => {
const redis = new Redis({ maxRetriesPerRequest: null });
const windowSize = 5000; // 5 seconds
const slidingWindow = new SlidingWindow(redis, windowSize);
const eventName1 = "test_event_1";
const eventName2 = "test_event_2";
slidingWindow.clear(eventName1);
slidingWindow.clear(eventName2);
await slidingWindow.event(eventName1);
await slidingWindow.event(eventName2);
const count1 = await slidingWindow.count(eventName1);
const count2 = await slidingWindow.count(eventName2);
assertEquals(count1, 1);
assertEquals(count2, 1);
redis.quit();
});
Deno.test("SlidingWindow - large number of events", async () => {
const redis = new Redis({ maxRetriesPerRequest: null });
const windowSize = 5000; // 5 seconds
const slidingWindow = new SlidingWindow(redis, windowSize);
const eventName = "test_event";
slidingWindow.clear(eventName);
const numEvents = 1000;
for (let i = 0; i < numEvents; i++) {
await slidingWindow.event(eventName);
}
const count = await slidingWindow.count(eventName);
assertEquals(count, numEvents);
redis.quit();
});

View File

@ -1,5 +1,5 @@
import { Job, Worker } from "bullmq"; import { Job, Worker } from "bullmq";
import { insertVideosWorker } from "lib/mq/executors.ts"; import { getLatestVideosWorker } from "lib/mq/executors.ts";
import { redis } from "lib/db/redis.ts"; import { redis } from "lib/db/redis.ts";
import logger from "lib/log/logger.ts"; import logger from "lib/log/logger.ts";
@ -8,7 +8,7 @@ const worker = new Worker(
async (job: Job) => { async (job: Job) => {
switch (job.name) { switch (job.name) {
case "getLatestVideos": case "getLatestVideos":
await insertVideosWorker(job); await getLatestVideosWorker(job);
break; break;
default: default:
break; break;