400 lines
12 KiB
TypeScript
400 lines
12 KiB
TypeScript
import logger from "@core/log/logger.ts";
|
|
import { MultipleRateLimiter, RateLimiterError, type RateLimiterConfig } from "@core/mq/multipleRateLimiter.ts";
|
|
import { ReplyError } from "ioredis";
|
|
import { SECOND } from "@core/const/time.ts";
|
|
import { spawn, SpawnOptions } from "child_process";
|
|
|
|
export function spawnPromise(
|
|
command: string,
|
|
args: string[] = [],
|
|
options?: SpawnOptions,
|
|
): Promise<{ stdout: string; stderr: string }> {
|
|
return new Promise((resolve, reject) => {
|
|
const child = spawn(command, args, options);
|
|
|
|
let stdout = "";
|
|
let stderr = "";
|
|
|
|
child.stdout?.on("data", (data) => {
|
|
stdout += data;
|
|
});
|
|
|
|
child.stderr?.on("data", (data) => {
|
|
stderr += data;
|
|
});
|
|
|
|
child.on("close", (code) => {
|
|
if (code !== 0) {
|
|
reject(new Error(`Error code: ${code}\nstderr: ${stderr}`));
|
|
} else {
|
|
resolve({ stdout, stderr });
|
|
}
|
|
});
|
|
|
|
child.on("error", (err) => {
|
|
reject(err);
|
|
});
|
|
});
|
|
}
|
|
|
|
interface Proxy {
|
|
type: string;
|
|
data: string;
|
|
}
|
|
|
|
interface Task {
|
|
provider: string;
|
|
proxies: string[] | "all";
|
|
}
|
|
|
|
interface ProxiesMap {
|
|
[name: string]: Proxy;
|
|
}
|
|
|
|
type NetworkDelegateErrorCode =
|
|
| "NO_PROXY_AVAILABLE"
|
|
| "PROXY_RATE_LIMITED"
|
|
| "PROXY_NOT_FOUND"
|
|
| "FETCH_ERROR"
|
|
| "NOT_IMPLEMENTED"
|
|
| "ALICLOUD_PROXY_ERR";
|
|
|
|
export class NetSchedulerError extends Error {
|
|
public code: NetworkDelegateErrorCode;
|
|
public rawError: unknown | undefined;
|
|
constructor(message: string, errorCode: NetworkDelegateErrorCode, rawError?: unknown) {
|
|
super(message);
|
|
this.name = "NetSchedulerError";
|
|
this.code = errorCode;
|
|
this.rawError = rawError;
|
|
}
|
|
}
|
|
|
|
type LimiterMap = {
|
|
[name: string]: MultipleRateLimiter;
|
|
};
|
|
|
|
type OptionalLimiterMap = {
|
|
[name: string]: MultipleRateLimiter | null;
|
|
};
|
|
|
|
type TaskMap = {
|
|
[name: string]: Task;
|
|
};
|
|
|
|
function shuffleArray<T>(array: T[]): T[] {
|
|
const newArray = [...array]; // Create a shallow copy to avoid in-place modification
|
|
for (let i = newArray.length - 1; i > 0; i--) {
|
|
const j = Math.floor(Math.random() * (i + 1));
|
|
[newArray[i], newArray[j]] = [newArray[j], newArray[i]]; // Swap elements
|
|
}
|
|
return newArray;
|
|
}
|
|
|
|
class NetworkDelegate {
|
|
private proxies: ProxiesMap = {};
|
|
private providerLimiters: LimiterMap = {};
|
|
private proxyLimiters: OptionalLimiterMap = {};
|
|
private tasks: TaskMap = {};
|
|
|
|
addProxy(proxyName: string, type: string, data: string): void {
|
|
this.proxies[proxyName] = { type, data };
|
|
}
|
|
|
|
addTask(taskName: string, provider: string, proxies: string[] | "all"): void {
|
|
this.tasks[taskName] = { provider, proxies };
|
|
}
|
|
|
|
getTaskProxies(taskName: string): string[] {
|
|
if (!this.tasks[taskName]) {
|
|
return [];
|
|
}
|
|
if (this.tasks[taskName].proxies === "all") {
|
|
return Object.keys(this.proxies);
|
|
}
|
|
return this.tasks[taskName].proxies;
|
|
}
|
|
|
|
setTaskLimiter(taskName: string, config: RateLimiterConfig[] | null): void {
|
|
const proxies = this.getTaskProxies(taskName);
|
|
for (const proxyName of proxies) {
|
|
const limiterId = "proxy-" + proxyName + "-" + taskName;
|
|
this.proxyLimiters[limiterId] = config ? new MultipleRateLimiter(limiterId, config) : null;
|
|
}
|
|
}
|
|
|
|
async triggerLimiter(task: string, proxy: string, force: boolean = false): Promise<void> {
|
|
const limiterId = "proxy-" + proxy + "-" + task;
|
|
const providerLimiterId = "provider-" + proxy + "-" + this.tasks[task].provider;
|
|
try {
|
|
await this.proxyLimiters[limiterId]?.trigger(!force);
|
|
await this.providerLimiters[providerLimiterId]?.trigger(!force);
|
|
} catch (e) {
|
|
const error = e as Error;
|
|
if (e instanceof ReplyError) {
|
|
logger.error(error, "redis");
|
|
} else if (e instanceof RateLimiterError) {
|
|
// Re-throw it to ensure this.request can catch it
|
|
throw e;
|
|
}
|
|
logger.warn(`Unhandled error: ${error.message}`, "mq", "proxyRequest");
|
|
}
|
|
}
|
|
|
|
setProviderLimiter(providerName: string, config: RateLimiterConfig[]): void {
|
|
let bindProxies: string[] = [];
|
|
for (const taskName in this.tasks) {
|
|
if (this.tasks[taskName].provider !== providerName) continue;
|
|
const proxies = this.getTaskProxies(taskName);
|
|
bindProxies = bindProxies.concat(proxies);
|
|
}
|
|
for (const proxyName of bindProxies) {
|
|
const limiterId = "provider-" + proxyName + "-" + providerName;
|
|
this.providerLimiters[limiterId] = new MultipleRateLimiter(limiterId, config);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Make a request to the specified URL with any available proxy
|
|
* @param {string} url - The URL to request.
|
|
* @param {string} method - The HTTP method to use for the request. Default is "GET".
|
|
* @returns {Promise<any>} - A promise that resolves to the response body.
|
|
* @throws {NetSchedulerError} - The error will be thrown in following cases:
|
|
* - No proxy is available currently: with error code `NO_PROXY_AVAILABLE`
|
|
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
|
|
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
|
|
* - The proxy type is not supported: with error code `NOT_IMPLEMENTED`
|
|
*/
|
|
async request<R>(url: string, task: string, method: string = "GET"): Promise<R> {
|
|
// find a available proxy
|
|
const proxiesNames = this.getTaskProxies(task);
|
|
for (const proxyName of shuffleArray(proxiesNames)) {
|
|
try {
|
|
return await this.proxyRequest<R>(url, proxyName, task, method);
|
|
}
|
|
catch (e) {
|
|
if (e instanceof RateLimiterError) {
|
|
continue;
|
|
}
|
|
throw e;
|
|
}
|
|
}
|
|
throw new NetSchedulerError("No proxy is available currently.", "NO_PROXY_AVAILABLE");
|
|
}
|
|
|
|
/*
|
|
* Make a request to the specified URL with the specified proxy
|
|
* @param {string} url - The URL to request.
|
|
* @param {string} proxyName - The name of the proxy to use.
|
|
* @param {string} task - The name of the task to use.
|
|
* @param {string} method - The HTTP method to use for the request. Default is "GET".
|
|
* @param {boolean} force - If true, the request will be made even if the proxy is rate limited. Default is false.
|
|
* @returns {Promise<any>} - A promise that resolves to the response body.
|
|
* @throws {NetSchedulerError} - The error will be thrown in following cases:
|
|
* - Proxy not found: with error code `PROXY_NOT_FOUND`
|
|
* - Proxy is under rate limit: with error code `PROXY_RATE_LIMITED`
|
|
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
|
|
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
|
|
* - The proxy type is not supported: with error code `NOT_IMPLEMENTED`
|
|
*/
|
|
async proxyRequest<R>(
|
|
url: string,
|
|
proxyName: string,
|
|
task: string,
|
|
method: string = "GET",
|
|
force: boolean = false,
|
|
): Promise<R> {
|
|
const proxy = this.proxies[proxyName];
|
|
if (!proxy) {
|
|
throw new NetSchedulerError(`Proxy "${proxyName}" not found`, "PROXY_NOT_FOUND");
|
|
}
|
|
|
|
await this.triggerLimiter(task, proxyName, force);
|
|
const result = await this.makeRequest<R>(url, proxy, method);
|
|
return result;
|
|
}
|
|
|
|
private async makeRequest<R>(url: string, proxy: Proxy, method: string): Promise<R> {
|
|
switch (proxy.type) {
|
|
case "native":
|
|
return await this.nativeRequest<R>(url, method);
|
|
case "alicloud-fc":
|
|
return await this.alicloudFcRequest<R>(url, proxy.data);
|
|
default:
|
|
throw new NetSchedulerError(`Proxy type ${proxy.type} not supported`, "NOT_IMPLEMENTED");
|
|
}
|
|
}
|
|
|
|
private async nativeRequest<R>(url: string, method: string): Promise<R> {
|
|
try {
|
|
const controller = new AbortController();
|
|
const timeout = setTimeout(() => controller.abort(), 10 * SECOND);
|
|
|
|
const response = await fetch(url, {
|
|
method,
|
|
signal: controller.signal,
|
|
});
|
|
|
|
clearTimeout(timeout);
|
|
|
|
return (await response.json()) as R;
|
|
} catch (e) {
|
|
throw new NetSchedulerError("Fetch error", "FETCH_ERROR", e);
|
|
}
|
|
}
|
|
|
|
private async alicloudFcRequest<R>(url: string, region: string): Promise<R> {
|
|
try {
|
|
const output = await spawnPromise("aliyun", [
|
|
"fc",
|
|
"POST",
|
|
`/2023-03-30/functions/proxy-${region}/invocations`,
|
|
"--qualifier",
|
|
"LATEST",
|
|
"--header",
|
|
"Content-Type=application/json;x-fc-invocation-type=Sync;x-fc-log-type=None;",
|
|
"--body",
|
|
JSON.stringify({ url: url }),
|
|
"--retry-count",
|
|
"5",
|
|
"--read-timeout",
|
|
"30",
|
|
"--connect-timeout",
|
|
"10",
|
|
"--profile",
|
|
`CVSA-${region}`,
|
|
]);
|
|
const out = output.stdout;
|
|
const rawData = JSON.parse(out);
|
|
if (rawData.statusCode !== 200) {
|
|
// noinspection ExceptionCaughtLocallyJS
|
|
throw new NetSchedulerError(
|
|
`Error proxying ${url} to ali-fc region ${region}, code: ${rawData.statusCode}.`,
|
|
"ALICLOUD_PROXY_ERR",
|
|
);
|
|
} else {
|
|
return JSON.parse(rawData.body) as R;
|
|
}
|
|
} catch (e) {
|
|
logger.error(e as Error, "net", "fn:alicloudFcRequest");
|
|
throw new NetSchedulerError(
|
|
`Unhandled error: Cannot proxy ${url} to ali-fc-${region}.`,
|
|
"ALICLOUD_PROXY_ERR",
|
|
e,
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
const networkDelegate = new NetworkDelegate();
|
|
const videoInfoRateLimiterConfig: RateLimiterConfig[] = [
|
|
{
|
|
duration: 0.3,
|
|
max: 1,
|
|
},
|
|
{
|
|
duration: 3,
|
|
max: 5,
|
|
},
|
|
{
|
|
duration: 30,
|
|
max: 30,
|
|
},
|
|
{
|
|
duration: 2 * 60,
|
|
max: 50,
|
|
},
|
|
];
|
|
const biliLimiterConfig: RateLimiterConfig[] = [
|
|
{
|
|
duration: 1,
|
|
max: 6,
|
|
},
|
|
{
|
|
duration: 5,
|
|
max: 20,
|
|
},
|
|
{
|
|
duration: 30,
|
|
max: 100,
|
|
},
|
|
{
|
|
duration: 5 * 60,
|
|
max: 200,
|
|
},
|
|
];
|
|
|
|
const bili_test = [...biliLimiterConfig];
|
|
bili_test[0].max = 10;
|
|
bili_test[1].max = 36;
|
|
bili_test[2].max = 150;
|
|
bili_test[3].max = 1000;
|
|
|
|
const bili_strict = [...biliLimiterConfig];
|
|
bili_strict[0].max = 1;
|
|
bili_strict[1].max = 4;
|
|
bili_strict[2].max = 12;
|
|
bili_strict[3].max = 100;
|
|
|
|
/*
|
|
Execution order for setup:
|
|
|
|
1. addProxy(proxyName, type, data):
|
|
- Must be called first. Registers proxies in the system, making them available for tasks.
|
|
- Define all proxies before proceeding to define tasks or set up limiters.
|
|
2. addTask(taskName, provider, proxies):
|
|
- Call after addProxy. Defines tasks and associates them with providers and proxies.
|
|
- Relies on proxies being already added.
|
|
- Must be called before setting task-specific or provider-specific limiters.
|
|
3. setTaskLimiter(taskName, config):
|
|
- Call after addProxy and addTask. Configures rate limiters specifically for tasks and their associated proxies.
|
|
- Depends on tasks and proxies being defined to apply limiters correctly.
|
|
4. setProviderLimiter(providerName, config):
|
|
- Call after addProxy and addTask.
|
|
- It sets rate limiters at the provider level, affecting all proxies used by tasks of that provider.
|
|
- Depends on tasks and proxies being defined to identify which proxies to apply provider-level limiters to.
|
|
|
|
In summary: addProxy -> addTask -> (setTaskLimiter and/or setProviderLimiter).
|
|
The order of setTaskLimiter and setProviderLimiter relative to each other is flexible,
|
|
but both should come after addProxy and addTask to ensure proper setup and dependencies are met.
|
|
*/
|
|
|
|
const regions = ["shanghai", "hangzhou", "qingdao", "beijing", "zhangjiakou", "chengdu", "shenzhen", "hohhot"];
|
|
networkDelegate.addProxy("native", "native", "");
|
|
for (const region of regions) {
|
|
networkDelegate.addProxy(`alicloud-${region}`, "alicloud-fc", region);
|
|
}
|
|
networkDelegate.addTask("getVideoInfo", "bilibili", "all");
|
|
networkDelegate.addTask("getLatestVideos", "bilibili", "all");
|
|
networkDelegate.addTask(
|
|
"snapshotMilestoneVideo",
|
|
"bilibili",
|
|
regions.map((region) => `alicloud-${region}`),
|
|
);
|
|
networkDelegate.addTask("snapshotVideo", "bili_test", [
|
|
"alicloud-qingdao",
|
|
"alicloud-shanghai",
|
|
"alicloud-zhangjiakou",
|
|
"alicloud-chengdu",
|
|
"alicloud-shenzhen",
|
|
"alicloud-hohhot",
|
|
]);
|
|
networkDelegate.addTask("bulkSnapshot", "bili_strict", [
|
|
"alicloud-qingdao",
|
|
"alicloud-shanghai",
|
|
"alicloud-zhangjiakou",
|
|
"alicloud-chengdu",
|
|
"alicloud-shenzhen",
|
|
"alicloud-hohhot",
|
|
]);
|
|
networkDelegate.setTaskLimiter("getVideoInfo", videoInfoRateLimiterConfig);
|
|
networkDelegate.setTaskLimiter("getLatestVideos", null);
|
|
networkDelegate.setTaskLimiter("snapshotMilestoneVideo", null);
|
|
networkDelegate.setTaskLimiter("snapshotVideo", null);
|
|
networkDelegate.setTaskLimiter("bulkSnapshot", null);
|
|
networkDelegate.setProviderLimiter("bilibili", biliLimiterConfig);
|
|
networkDelegate.setProviderLimiter("bili_test", bili_test);
|
|
networkDelegate.setProviderLimiter("bili_strict", bili_strict);
|
|
|
|
export default networkDelegate;
|