update: support for provider-level limiter in NetScheduler

add: fn:getLatestVideoAids()
This commit is contained in:
alikia2x (寒寒) 2025-02-26 01:40:01 +08:00
parent 232585594a
commit 7566722d04
Signed by: alikia2x
GPG Key ID: 56209E0CCD8420C6
2 changed files with 138 additions and 73 deletions

View File

@ -1,14 +1,17 @@
import logger from "lib/log/logger.ts"; import logger from "lib/log/logger.ts";
import {RateLimiter} from "lib/mq/rateLimiter.ts"; import {RateLimiter, RateLimiterConfig} from "lib/mq/rateLimiter.ts";
import {SlidingWindow} from "lib/mq/slidingWindow.ts"; import {SlidingWindow} from "lib/mq/slidingWindow.ts";
import {redis} from "lib/db/redis.ts"; import {redis} from "lib/db/redis.ts";
import Redis from "ioredis"; import Redis from "ioredis";
interface Proxy { interface Proxy {
type: string; type: string;
task: string; data: string;
}
interface Task {
provider: string; provider: string;
limiter?: RateLimiter; proxies: string[] | "all";
} }
interface ProxiesMap { interface ProxiesMap {
@ -33,28 +36,87 @@ export class NetSchedulerError extends Error {
} }
} }
interface LimiterMap { type LimiterMap = {
[name: string]: RateLimiter; [name: string]: RateLimiter;
};
type OptionalLimiterMap = {
[name: string]: RateLimiter | null;
};
type TaskMap = {
[name: string]: Task;
};
function shuffleArray<T>(array: T[]): T[] {
const newArray = [...array]; // Create a shallow copy to avoid in-place modification
for (let i = newArray.length - 1; i > 0; i--) {
const j = Math.floor(Math.random() * (i + 1));
[newArray[i], newArray[j]] = [newArray[j], newArray[i]]; // Swap elements
}
return newArray;
} }
class NetScheduler { class NetScheduler {
private proxies: ProxiesMap = {}; private proxies: ProxiesMap = {};
private providerLimiters: LimiterMap = {}; private providerLimiters: LimiterMap = {};
private proxyLimiters: OptionalLimiterMap = {};
private tasks: TaskMap = {};
addProxy(name: string, type: string, task: string, provider: string): void { addProxy(proxyName: string, type: string, data: string): void {
this.proxies[name] = { type, task, provider }; this.proxies[proxyName] = { type, data };
} }
removeProxy(name: string): void { removeProxy(proxyName: string): void {
delete this.proxies[name]; delete this.proxies[proxyName];
} }
setProxyLimiter(name: string, limiter: RateLimiter): void { addTask(taskName: string, provider: string, proxies: string[] | "all"): void {
this.proxies[name].limiter = limiter; this.tasks[taskName] = { provider, proxies };
} }
setProviderLimiter(name: string, limiter: RateLimiter): void { getTaskProxies(taskName: string): string[] {
this.providerLimiters[name] = limiter; if (this.tasks[taskName].proxies === "all") {
return Object.keys(this.proxies);
}
return this.tasks[taskName].proxies;
}
setTaskLimiter(taskName: string, config: RateLimiterConfig[] | null): void {
const proxies = this.getTaskProxies(taskName);
for (const proxyName of proxies) {
const limiterId = "proxy-" + proxyName + "-" + taskName;
this.proxyLimiters[limiterId] = config ? new RateLimiter(limiterId, config) : null;
}
}
async triggerLimiter(task: string, proxy: string): Promise<void> {
const limiterId = proxy + "-" + task;
if (!this.proxyLimiters[limiterId]) {
return;
}
try {
await this.proxyLimiters[limiterId].trigger();
} catch (e) {
const error = e as Error;
if (e instanceof Redis.ReplyError) {
logger.error(error, "redis");
}
logger.warn(`Unhandled error: ${error.message}`, "mq", "proxyRequest");
}
}
setProviderLimiter(providerName: string, config: RateLimiterConfig[]): void {
let bindProxies: string[] = [];
for (const taskName in this.tasks) {
if (this.tasks[taskName].provider !== providerName) continue;
const proxies = this.getTaskProxies(taskName);
bindProxies = bindProxies.concat(proxies);
}
for (const proxyName of bindProxies) {
const limiterId = "provider-" + proxyName + "-" + providerName;
this.providerLimiters[providerName] = new RateLimiter(limiterId, config);
}
} }
/* /*
@ -70,11 +132,9 @@ class NetScheduler {
*/ */
async request<R>(url: string, task: string, method: string = "GET"): Promise<R> { async request<R>(url: string, task: string, method: string = "GET"): Promise<R> {
// find a available proxy // find a available proxy
const proxiesNames = Object.keys(this.proxies); const proxiesNames = this.getTaskProxies(task);
for (const proxyName of proxiesNames) { for (const proxyName of shuffleArray(proxiesNames)) {
const proxy = this.proxies[proxyName]; if (await this.getProxyAvailability(proxyName, task)) {
if (proxy.task !== task) continue;
if (await this.getProxyAvailability(proxyName)) {
return await this.proxyRequest<R>(url, proxyName, method); return await this.proxyRequest<R>(url, proxyName, method);
} }
} }
@ -85,6 +145,7 @@ class NetScheduler {
* Make a request to the specified URL with the specified proxy * Make a request to the specified URL with the specified proxy
* @param {string} url - The URL to request. * @param {string} url - The URL to request.
* @param {string} proxyName - The name of the proxy to use. * @param {string} proxyName - The name of the proxy to use.
* @param {string} task - The name of the task to use.
* @param {string} method - The HTTP method to use for the request. Default is "GET". * @param {string} method - The HTTP method to use for the request. Default is "GET".
* @param {boolean} force - If true, the request will be made even if the proxy is rate limited. Default is false. * @param {boolean} force - If true, the request will be made even if the proxy is rate limited. Default is false.
* @returns {Promise<any>} - A promise that resolves to the response body. * @returns {Promise<any>} - A promise that resolves to the response body.
@ -94,27 +155,23 @@ class NetScheduler {
* - The native `fetch` function threw an error: with error code FETCH_ERROR * - The native `fetch` function threw an error: with error code FETCH_ERROR
* - The proxy type is not supported: with error code NOT_IMPLEMENTED * - The proxy type is not supported: with error code NOT_IMPLEMENTED
*/ */
async proxyRequest<R>(url: string, proxyName: string, method: string = "GET", force: boolean = false): Promise<R> { async proxyRequest<R>(
url: string,
proxyName: string,
task: string,
method: string = "GET",
force: boolean = false,
): Promise<R> {
const proxy = this.proxies[proxyName]; const proxy = this.proxies[proxyName];
if (!proxy) { if (!proxy) {
throw new NetSchedulerError(`Proxy "${proxyName}" not found`, "PROXY_NOT_FOUND"); throw new NetSchedulerError(`Proxy "${proxyName}" not found`, "PROXY_NOT_FOUND");
} }
if (!force && await this.getProxyAvailability(proxyName) === false) { if (!force && await this.getProxyAvailability(proxyName, task) === false) {
throw new NetSchedulerError(`Proxy "${proxyName}" is rate limited`, "PROXY_RATE_LIMITED"); throw new NetSchedulerError(`Proxy "${proxyName}" is rate limited`, "PROXY_RATE_LIMITED");
} }
if (proxy.limiter) { await this.triggerLimiter(task, proxyName);
try {
await proxy.limiter!.trigger();
} catch (e) {
const error = e as Error;
if (e instanceof Redis.ReplyError) {
logger.error(error, "redis");
}
logger.warn(`Unhandled error: ${error.message}`, "mq", "proxyRequest");
}
}
switch (proxy.type) { switch (proxy.type) {
case "native": case "native":
@ -124,21 +181,20 @@ class NetScheduler {
} }
} }
private async getProxyAvailability(name: string): Promise<boolean> { private async getProxyAvailability(proxyName: string, taskName: string): Promise<boolean> {
try { try {
const proxyConfig = this.proxies[name]; const task = this.tasks[taskName];
if (!proxyConfig) { const provider = task.provider;
const proxyLimiterId = "proxy-" + proxyName + "-" + task;
const providerLimiterId = "provider-" + proxyName + "-" + provider;
if (!this.proxyLimiters[proxyLimiterId] || !this.providerLimiters[providerLimiterId]) {
return true; return true;
} }
const provider = proxyConfig.provider; const proxyLimiter = this.proxyLimiters[proxyLimiterId];
const providerLimiter = await this.providerLimiters[provider].getAvailability(); const providerLimiter = this.providerLimiters[providerLimiterId];
if (!providerLimiter) { const providerAvailable = await providerLimiter.getAvailability();
return false; const proxyAvailable = await proxyLimiter.getAvailability();
} return providerAvailable && proxyAvailable;
if (!proxyConfig.limiter) {
return true;
}
return await proxyConfig.limiter.getAvailability();
} catch (e) { } catch (e) {
const error = e as Error; const error = e as Error;
if (e instanceof Redis.ReplyError) { if (e instanceof Redis.ReplyError) {
@ -161,9 +217,7 @@ class NetScheduler {
} }
const netScheduler = new NetScheduler(); const netScheduler = new NetScheduler();
netScheduler.addProxy("default", "native", "default", "bilibili-native"); const videoInfoRateLimiterConfig: RateLimiterConfig[] = [
netScheduler.addProxy("tags-native", "native", "getVideoTags", "bilibili-native");
const tagsRateLimiter = new RateLimiter("getVideoTags", [
{ {
window: new SlidingWindow(redis, 1), window: new SlidingWindow(redis, 1),
max: 3, max: 3,
@ -176,22 +230,26 @@ const tagsRateLimiter = new RateLimiter("getVideoTags", [
window: new SlidingWindow(redis, 2 * 60), window: new SlidingWindow(redis, 2 * 60),
max: 50, max: 50,
}, },
]); ];
const biliLimiterNative = new RateLimiter("bilibili-native", [ const biliLimiterConfig: RateLimiterConfig[] = [
{ {
window: new SlidingWindow(redis, 1), window: new SlidingWindow(redis, 1),
max: 5 max: 5,
}, },
{ {
window: new SlidingWindow(redis, 30), window: new SlidingWindow(redis, 30),
max: 100 max: 100,
}, },
{ {
window: new SlidingWindow(redis, 5 * 60), window: new SlidingWindow(redis, 5 * 60),
max: 180 max: 180,
} },
]); ];
netScheduler.setProxyLimiter("tags-native", tagsRateLimiter); netScheduler.addProxy("native", "native", "");
netScheduler.setProviderLimiter("bilibili-native", biliLimiterNative) netScheduler.addTask("getVideoInfo", "bilibili", "all");
netScheduler.addTask("getLatestVideos", "bilibili", "all");
netScheduler.setTaskLimiter("getVideoInfo", videoInfoRateLimiterConfig)
netScheduler.setTaskLimiter("getLatestVideos", null);
netScheduler.setProviderLimiter("bilibili", biliLimiterConfig);
export default netScheduler; export default netScheduler;

View File

@ -1,29 +1,36 @@
import { VideoListResponse, VideoListVideo } from "lib/net/bilibili.d.ts"; import {VideoListResponse } from "lib/net/bilibili.d.ts";
import logger from "lib/log/logger.ts"; import logger from "lib/log/logger.ts";
import netScheduler, {NetSchedulerError} from "lib/mq/scheduler.ts";
export async function getLatestVideos( export async function getLatestVideoAids(page: number = 1, pageSize: number = 10): Promise<number[] | null> {
page: number = 1, const startFrom = 1 + pageSize * (page - 1);
pageSize: number = 10 const endTo = pageSize * page;
): Promise<VideoListVideo[] | null> { const range = `${startFrom}-${endTo}`
const errMessage = `Error fetching latest aid for ${range}:`
try { try {
const response = await fetch( const url = `https://api.bilibili.com/x/web-interface/newlist?rid=30&ps=${pageSize}&pn=${page}`;
`https://api.bilibili.com/x/web-interface/newlist?rid=30&ps=${pageSize}&pn=${page}`, const data = await netScheduler.request<VideoListResponse>(url, 'getLatestVideos');
); if (data.code != 0) {
const data: VideoListResponse = await response.json(); logger.error(errMessage + data.message, 'net', 'getLastestVideos');
return [];
if (data.code !== 0) {
logger.error(`Error fetching videos: ${data.message}`, "net", "getLatestVideos");
return null;
} }
if (data.data.archives.length === 0) { if (data.data.archives.length === 0) {
logger.verbose("No more videos found", "net", "getLatestVideos"); logger.verbose("No more videos found", "net", "getLatestVideos");
return []; return [];
} }
return data.data.archives.map(video => video.aid);
return data.data.archives; }
} catch (error) { catch (e) {
logger.error(error as Error, "net", "getLatestVideos"); const error = e as NetSchedulerError;
if (error.code == "FETCH_ERROR") {
const rawError = error.rawError! as Error;
rawError.message = errMessage + rawError.message;
logger.error(rawError, 'net', 'getVideoTags');
return null; return null;
} }
else {
// Re-throw the error
throw e;
}
}
} }