cvsa/packages/core/net/delegate.ts
2025-05-18 03:52:42 +08:00

400 lines
12 KiB
TypeScript

import logger from "@core/log/logger.ts";
import { MultipleRateLimiter, RateLimiterError, type RateLimiterConfig } from "@core/mq/multipleRateLimiter.ts";
import { ReplyError } from "ioredis";
import { SECOND } from "@core/const/time.ts";
import { spawn, SpawnOptions } from "child_process";
export function spawnPromise(
command: string,
args: string[] = [],
options?: SpawnOptions,
): Promise<{ stdout: string; stderr: string }> {
return new Promise((resolve, reject) => {
const child = spawn(command, args, options);
let stdout = "";
let stderr = "";
child.stdout?.on("data", (data) => {
stdout += data;
});
child.stderr?.on("data", (data) => {
stderr += data;
});
child.on("close", (code) => {
if (code !== 0) {
reject(new Error(`Error code: ${code}\nstderr: ${stderr}`));
} else {
resolve({ stdout, stderr });
}
});
child.on("error", (err) => {
reject(err);
});
});
}
interface Proxy {
type: string;
data: string;
}
interface Task {
provider: string;
proxies: string[] | "all";
}
interface ProxiesMap {
[name: string]: Proxy;
}
type NetworkDelegateErrorCode =
| "NO_PROXY_AVAILABLE"
| "PROXY_RATE_LIMITED"
| "PROXY_NOT_FOUND"
| "FETCH_ERROR"
| "NOT_IMPLEMENTED"
| "ALICLOUD_PROXY_ERR";
export class NetSchedulerError extends Error {
public code: NetworkDelegateErrorCode;
public rawError: unknown | undefined;
constructor(message: string, errorCode: NetworkDelegateErrorCode, rawError?: unknown) {
super(message);
this.name = "NetSchedulerError";
this.code = errorCode;
this.rawError = rawError;
}
}
type LimiterMap = {
[name: string]: MultipleRateLimiter;
};
type OptionalLimiterMap = {
[name: string]: MultipleRateLimiter | null;
};
type TaskMap = {
[name: string]: Task;
};
function shuffleArray<T>(array: T[]): T[] {
const newArray = [...array]; // Create a shallow copy to avoid in-place modification
for (let i = newArray.length - 1; i > 0; i--) {
const j = Math.floor(Math.random() * (i + 1));
[newArray[i], newArray[j]] = [newArray[j], newArray[i]]; // Swap elements
}
return newArray;
}
class NetworkDelegate {
private proxies: ProxiesMap = {};
private providerLimiters: LimiterMap = {};
private proxyLimiters: OptionalLimiterMap = {};
private tasks: TaskMap = {};
addProxy(proxyName: string, type: string, data: string): void {
this.proxies[proxyName] = { type, data };
}
addTask(taskName: string, provider: string, proxies: string[] | "all"): void {
this.tasks[taskName] = { provider, proxies };
}
getTaskProxies(taskName: string): string[] {
if (!this.tasks[taskName]) {
return [];
}
if (this.tasks[taskName].proxies === "all") {
return Object.keys(this.proxies);
}
return this.tasks[taskName].proxies;
}
setTaskLimiter(taskName: string, config: RateLimiterConfig[] | null): void {
const proxies = this.getTaskProxies(taskName);
for (const proxyName of proxies) {
const limiterId = "proxy-" + proxyName + "-" + taskName;
this.proxyLimiters[limiterId] = config ? new MultipleRateLimiter(limiterId, config) : null;
}
}
async triggerLimiter(task: string, proxy: string, force: boolean = false): Promise<void> {
const limiterId = "proxy-" + proxy + "-" + task;
const providerLimiterId = "provider-" + proxy + "-" + this.tasks[task].provider;
try {
await this.proxyLimiters[limiterId]?.trigger(!force);
await this.providerLimiters[providerLimiterId]?.trigger(!force);
} catch (e) {
const error = e as Error;
if (e instanceof ReplyError) {
logger.error(error, "redis");
} else if (e instanceof RateLimiterError) {
// Re-throw it to ensure this.request can catch it
throw e;
}
logger.warn(`Unhandled error: ${error.message}`, "mq", "proxyRequest");
}
}
setProviderLimiter(providerName: string, config: RateLimiterConfig[]): void {
let bindProxies: string[] = [];
for (const taskName in this.tasks) {
if (this.tasks[taskName].provider !== providerName) continue;
const proxies = this.getTaskProxies(taskName);
bindProxies = bindProxies.concat(proxies);
}
for (const proxyName of bindProxies) {
const limiterId = "provider-" + proxyName + "-" + providerName;
this.providerLimiters[limiterId] = new MultipleRateLimiter(limiterId, config);
}
}
/*
* Make a request to the specified URL with any available proxy
* @param {string} url - The URL to request.
* @param {string} method - The HTTP method to use for the request. Default is "GET".
* @returns {Promise<any>} - A promise that resolves to the response body.
* @throws {NetSchedulerError} - The error will be thrown in following cases:
* - No proxy is available currently: with error code `NO_PROXY_AVAILABLE`
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
* - The proxy type is not supported: with error code `NOT_IMPLEMENTED`
*/
async request<R>(url: string, task: string, method: string = "GET"): Promise<R> {
// find a available proxy
const proxiesNames = this.getTaskProxies(task);
for (const proxyName of shuffleArray(proxiesNames)) {
try {
return await this.proxyRequest<R>(url, proxyName, task, method);
}
catch (e) {
if (e instanceof RateLimiterError) {
continue;
}
throw e;
}
}
throw new NetSchedulerError("No proxy is available currently.", "NO_PROXY_AVAILABLE");
}
/*
* Make a request to the specified URL with the specified proxy
* @param {string} url - The URL to request.
* @param {string} proxyName - The name of the proxy to use.
* @param {string} task - The name of the task to use.
* @param {string} method - The HTTP method to use for the request. Default is "GET".
* @param {boolean} force - If true, the request will be made even if the proxy is rate limited. Default is false.
* @returns {Promise<any>} - A promise that resolves to the response body.
* @throws {NetSchedulerError} - The error will be thrown in following cases:
* - Proxy not found: with error code `PROXY_NOT_FOUND`
* - Proxy is under rate limit: with error code `PROXY_RATE_LIMITED`
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
* - The proxy type is not supported: with error code `NOT_IMPLEMENTED`
*/
async proxyRequest<R>(
url: string,
proxyName: string,
task: string,
method: string = "GET",
force: boolean = false,
): Promise<R> {
const proxy = this.proxies[proxyName];
if (!proxy) {
throw new NetSchedulerError(`Proxy "${proxyName}" not found`, "PROXY_NOT_FOUND");
}
await this.triggerLimiter(task, proxyName, force);
const result = await this.makeRequest<R>(url, proxy, method);
return result;
}
private async makeRequest<R>(url: string, proxy: Proxy, method: string): Promise<R> {
switch (proxy.type) {
case "native":
return await this.nativeRequest<R>(url, method);
case "alicloud-fc":
return await this.alicloudFcRequest<R>(url, proxy.data);
default:
throw new NetSchedulerError(`Proxy type ${proxy.type} not supported`, "NOT_IMPLEMENTED");
}
}
private async nativeRequest<R>(url: string, method: string): Promise<R> {
try {
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), 10 * SECOND);
const response = await fetch(url, {
method,
signal: controller.signal,
});
clearTimeout(timeout);
return (await response.json()) as R;
} catch (e) {
throw new NetSchedulerError("Fetch error", "FETCH_ERROR", e);
}
}
private async alicloudFcRequest<R>(url: string, region: string): Promise<R> {
try {
const output = await spawnPromise("aliyun", [
"fc",
"POST",
`/2023-03-30/functions/proxy-${region}/invocations`,
"--qualifier",
"LATEST",
"--header",
"Content-Type=application/json;x-fc-invocation-type=Sync;x-fc-log-type=None;",
"--body",
JSON.stringify({ url: url }),
"--retry-count",
"5",
"--read-timeout",
"30",
"--connect-timeout",
"10",
"--profile",
`CVSA-${region}`,
]);
const out = output.stdout;
const rawData = JSON.parse(out);
if (rawData.statusCode !== 200) {
// noinspection ExceptionCaughtLocallyJS
throw new NetSchedulerError(
`Error proxying ${url} to ali-fc region ${region}, code: ${rawData.statusCode}.`,
"ALICLOUD_PROXY_ERR",
);
} else {
return JSON.parse(rawData.body) as R;
}
} catch (e) {
logger.error(e as Error, "net", "fn:alicloudFcRequest");
throw new NetSchedulerError(
`Unhandled error: Cannot proxy ${url} to ali-fc-${region}.`,
"ALICLOUD_PROXY_ERR",
e,
);
}
}
}
const networkDelegate = new NetworkDelegate();
const videoInfoRateLimiterConfig: RateLimiterConfig[] = [
{
duration: 0.3,
max: 1,
},
{
duration: 3,
max: 5,
},
{
duration: 30,
max: 30,
},
{
duration: 2 * 60,
max: 50,
},
];
const biliLimiterConfig: RateLimiterConfig[] = [
{
duration: 1,
max: 6,
},
{
duration: 5,
max: 20,
},
{
duration: 30,
max: 100,
},
{
duration: 5 * 60,
max: 200,
},
];
const bili_test = [...biliLimiterConfig];
bili_test[0].max = 10;
bili_test[1].max = 36;
bili_test[2].max = 150;
bili_test[3].max = 1000;
const bili_strict = [...biliLimiterConfig];
bili_strict[0].max = 1;
bili_strict[1].max = 4;
bili_strict[2].max = 12;
bili_strict[3].max = 100;
/*
Execution order for setup:
1. addProxy(proxyName, type, data):
- Must be called first. Registers proxies in the system, making them available for tasks.
- Define all proxies before proceeding to define tasks or set up limiters.
2. addTask(taskName, provider, proxies):
- Call after addProxy. Defines tasks and associates them with providers and proxies.
- Relies on proxies being already added.
- Must be called before setting task-specific or provider-specific limiters.
3. setTaskLimiter(taskName, config):
- Call after addProxy and addTask. Configures rate limiters specifically for tasks and their associated proxies.
- Depends on tasks and proxies being defined to apply limiters correctly.
4. setProviderLimiter(providerName, config):
- Call after addProxy and addTask.
- It sets rate limiters at the provider level, affecting all proxies used by tasks of that provider.
- Depends on tasks and proxies being defined to identify which proxies to apply provider-level limiters to.
In summary: addProxy -> addTask -> (setTaskLimiter and/or setProviderLimiter).
The order of setTaskLimiter and setProviderLimiter relative to each other is flexible,
but both should come after addProxy and addTask to ensure proper setup and dependencies are met.
*/
const regions = ["shanghai", "hangzhou", "qingdao", "beijing", "zhangjiakou", "chengdu", "shenzhen", "hohhot"];
networkDelegate.addProxy("native", "native", "");
for (const region of regions) {
networkDelegate.addProxy(`alicloud-${region}`, "alicloud-fc", region);
}
networkDelegate.addTask("getVideoInfo", "bilibili", "all");
networkDelegate.addTask("getLatestVideos", "bilibili", "all");
networkDelegate.addTask(
"snapshotMilestoneVideo",
"bilibili",
regions.map((region) => `alicloud-${region}`),
);
networkDelegate.addTask("snapshotVideo", "bili_test", [
"alicloud-qingdao",
"alicloud-shanghai",
"alicloud-zhangjiakou",
"alicloud-chengdu",
"alicloud-shenzhen",
"alicloud-hohhot",
]);
networkDelegate.addTask("bulkSnapshot", "bili_strict", [
"alicloud-qingdao",
"alicloud-shanghai",
"alicloud-zhangjiakou",
"alicloud-chengdu",
"alicloud-shenzhen",
"alicloud-hohhot",
]);
networkDelegate.setTaskLimiter("getVideoInfo", videoInfoRateLimiterConfig);
networkDelegate.setTaskLimiter("getLatestVideos", null);
networkDelegate.setTaskLimiter("snapshotMilestoneVideo", null);
networkDelegate.setTaskLimiter("snapshotVideo", null);
networkDelegate.setTaskLimiter("bulkSnapshot", null);
networkDelegate.setProviderLimiter("bilibili", biliLimiterConfig);
networkDelegate.setProviderLimiter("bili_test", bili_test);
networkDelegate.setProviderLimiter("bili_strict", bili_strict);
export default networkDelegate;