1
0

add: support for IP proxies

This commit is contained in:
alikia2x (寒寒) 2025-12-14 04:35:59 +08:00
parent 24d0833072
commit fc610d417b
WARNING! Although there is a key with this ID in the database it does not verify this commit! This commit is SUSPICIOUS.
GPG Key ID: 56209E0CCD8420C6
17 changed files with 539 additions and 270 deletions

View File

@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="DataSourcePerFileMappings">
<file url="file://$PROJECT_DIR$/packages/crawler/db/snapshot.ts" value="0d2dd3d3-bd27-4e5f-b0fa-ff14fb2a6bef" />
<file url="file://$PROJECT_DIR$/packages/crawler/mq/task/removeAllTimeoutSchedules.ts" value="0d2dd3d3-bd27-4e5f-b0fa-ff14fb2a6bef" />
</component>
</project>

View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="db-tree-configuration">
<option name="data" value="----------------------------------------&#10;1:0:3143734c-9dde-47a1-a57b-4d6865cff2da&#10;2:0:0d2dd3d3-bd27-4e5f-b0fa-ff14fb2a6bef&#10;" />
</component>
</project>

View File

@ -15,7 +15,7 @@ export const pingHandler = new Elysia({ prefix: "/ping" }).use(ip()).get(
url: request.url
},
response: {
time: new Date().getTime(),
time: Date.now(),
status: 200,
version: VERSION
}

View File

@ -10,7 +10,7 @@ export const getGroundTruthMilestoneETA = async (
const DELTA = 1e-5;
let minETAHours = Infinity;
const timeIntervals = [3 * MINUTE, 20 * MINUTE, HOUR, 3 * HOUR, 6 * HOUR, 72 * HOUR];
const currentTimestamp = new Date().getTime();
const currentTimestamp = Date.now();
const latestSnapshot = await getLatestSnapshot(aid);
const latestSnapshotTime = new Date(latestSnapshot.time).getTime();
for (const timeInterval of timeIntervals) {

View File

@ -7,7 +7,6 @@ interface BaseResponse<T> {
export type VideoListResponse = BaseResponse<VideoListData>;
export type VideoDetailsResponse = BaseResponse<VideoDetailsData>;
export type VideoTagsResponse = BaseResponse<VideoTagsData>;
export type VideoInfoResponse = BaseResponse<VideoInfoData>;
export type MediaListInfoResponse = BaseResponse<MediaListInfoData>;

View File

@ -1,3 +1,5 @@
// noinspection ExceptionCaughtLocallyJS
import logger from "@core/log";
import {
MultipleRateLimiter,
@ -13,7 +15,7 @@ import Stream from "@alicloud/darabonba-stream";
import * as Util from "@alicloud/tea-util";
import { Readable } from "stream";
type ProxyType = "native" | "alicloud-fc" | "baidu-cfc";
type ProxyType = "native" | "alicloud-fc" | "ip-proxy";
interface FCResponse {
statusCode: number;
@ -21,18 +23,71 @@ interface FCResponse {
serverTime: number;
}
interface Proxy {
interface NativeProxyData {
}
interface AlicloudFcProxyData {
region: string;
timeout?: number;
}
// New IP proxy system interfaces
interface IPEntry {
address: string;
/*
Lifespan of this IP addressin milliseconds
*/
lifespan: number;
port?: number;
/*
When this IP was created, UNIX timestamp in milliseconds
*/
createdAt: number;
used: boolean;
}
type IPExtractor = () => Promise<IPEntry[]>;
type IPRotationStrategy = "single-use" | "round-robin" | "random";
interface IPProxyConfig {
extractor: IPExtractor;
strategy?: IPRotationStrategy; // defaults to "single-use"
minPoolSize?: number; // minimum IPs to maintain (default: 5)
maxPoolSize?: number; // maximum IPs to cache (default: 50)
refreshInterval?: number; // how often to check for new IPs (default: 30s)
initialPoolSize?: number; // how many IPs to fetch initially (default: 10)
}
type ProxyData = NativeProxyData | AlicloudFcProxyData | IPProxyConfig;
interface ProxyDef<T extends ProxyData = ProxyData> {
type: ProxyType;
data: string;
data: T;
}
interface Task {
provider: string;
proxies: string[] | "all";
function isAlicloudFcProxy(proxy: ProxyDef): proxy is ProxyDef<AlicloudFcProxyData> {
return proxy.type === "alicloud-fc";
}
interface ProxiesMap {
[name: string]: Proxy;
function isIpProxy(proxy: ProxyDef): proxy is ProxyDef<IPProxyConfig> {
return proxy.type === "ip-proxy";
}
interface ProviderDef {
limiters: readonly RateLimiterConfig[];
}
interface TaskDef<ProxyKeys extends string = string, ProviderKeys extends string = string> {
provider: ProviderKeys;
proxies: readonly ProxyKeys[] | "all";
limiters?: readonly RateLimiterConfig[];
}
interface NetworkConfig {
proxies: Record<string, ProxyDef>;
providers: Record<string, ProviderDef>;
tasks: Record<string, TaskDef<any, any>>;
}
type NetworkDelegateErrorCode =
@ -41,7 +96,9 @@ type NetworkDelegateErrorCode =
| "PROXY_NOT_FOUND"
| "FETCH_ERROR"
| "NOT_IMPLEMENTED"
| "ALICLOUD_PROXY_ERR";
| "ALICLOUD_PROXY_ERR"
| "IP_POOL_EXHAUSTED"
| "IP_EXTRACTION_FAILED";
export class NetSchedulerError extends Error {
public code: NetworkDelegateErrorCode;
@ -54,34 +111,153 @@ export class NetSchedulerError extends Error {
}
}
type LimiterMap = {
[name: string]: MultipleRateLimiter;
};
type OptionalLimiterMap = {
[name: string]: MultipleRateLimiter | null;
};
type TaskMap = {
[name: string]: Task;
};
function shuffleArray<T>(array: T[]): T[] {
const newArray = [...array]; // Create a shallow copy to avoid in-place modification
function shuffleArray<T>(array: readonly T[]): T[] {
const newArray = [...array];
for (let i = newArray.length - 1; i > 0; i--) {
const j = Math.floor(Math.random() * (i + 1));
[newArray[i], newArray[j]] = [newArray[j], newArray[i]]; // Swap elements
[newArray[i], newArray[j]] = [newArray[j], newArray[i]];
}
return newArray;
}
class IPPoolManager {
private pool: IPEntry[] = [];
private readonly config: Required<IPProxyConfig>;
protected refreshTimer: NodeJS.Timeout;
private isRefreshing = false;
constructor(config: IPProxyConfig) {
this.config = {
extractor: config.extractor,
strategy: config.strategy ?? "single-use",
minPoolSize: config.minPoolSize ?? 5,
maxPoolSize: config.maxPoolSize ?? 50,
refreshInterval: config.refreshInterval ?? 30_000,
initialPoolSize: config.initialPoolSize ?? 10
};
}
async initialize(): Promise<void> {
await this.refreshPool();
this.startPeriodicRefresh();
}
private startPeriodicRefresh(): void {
this.refreshTimer = setInterval(async () => {
await this.refreshPool();
}, this.config.refreshInterval);
}
async getNextIP(): Promise<IPEntry | null> {
// Clean expired IPs first
this.cleanExpiredIPs();
// Try to get available IP based on strategy
let selectedIP: IPEntry | null = null;
switch (this.config.strategy) {
case "single-use":
selectedIP = this.getAvailableIP();
break;
case "round-robin":
selectedIP = this.getRoundRobinIP();
break;
case "random":
selectedIP = this.getRandomIP();
break;
}
// If no IP available and pool is low, try to refresh
if (!selectedIP && this.pool.length < this.config.minPoolSize) {
await this.refreshPool();
selectedIP = this.getAvailableIP();
}
return selectedIP;
}
private getAvailableIP(): IPEntry | null {
const availableIPs = this.pool.filter((ip) => !ip.used);
if (availableIPs.length === 0) return null;
// For single-use, mark IP as used immediately
const selectedIP = availableIPs[0];
selectedIP.used = true;
return selectedIP;
}
private getRoundRobinIP(): IPEntry | null {
const availableIPs = this.pool.filter((ip) => !ip.used);
if (availableIPs.length === 0) return null;
const selectedIP = availableIPs[0];
selectedIP.used = true;
return selectedIP;
}
private getRandomIP(): IPEntry | null {
const availableIPs = this.pool.filter((ip) => !ip.used);
if (availableIPs.length === 0) return null;
const randomIndex = Math.floor(Math.random() * availableIPs.length);
const selectedIP = availableIPs[randomIndex];
selectedIP.used = true;
return selectedIP;
}
private cleanExpiredIPs(): void {
const now = Date.now();
this.pool = this.pool.filter((ip) => {
const expiryTime = ip.createdAt + ip.lifespan;
return expiryTime > now;
});
}
private async refreshPool(): Promise<void> {
if (this.isRefreshing) return;
this.isRefreshing = true;
try {
logger.debug("Refreshing IP pool", "net", "IPPoolManager.refreshPool");
const extractedIPs = await this.config.extractor();
const newIPs = extractedIPs.slice(0, this.config.maxPoolSize - this.pool.length);
// Add new IPs to pool
for (const ipData of newIPs) {
const ipEntry: IPEntry = {
...ipData,
createdAt: Date.now(),
used: false
};
this.pool.push(ipEntry);
}
logger.debug(
`IP pool refreshed. Pool size: ${this.pool.length}`,
"net",
"IPPoolManager.refreshPool"
);
} catch (error) {
logger.error(error as Error, "net", "IPPoolManager.refreshPool");
} finally {
this.isRefreshing = false;
}
}
async markIPUsed(address: string): Promise<void> {
const ip = this.pool.find((p) => p.address === address);
if (ip) {
ip.used = true;
}
}
}
const getEndpoint = (region: string) => `fcv3.cn-${region}.aliyuncs.com`;
const getAlicloudClient = (region: string) => {
const credential = new Credential();
const config = new OpenApi.Config({
credential: credential
});
const config = new OpenApi.Config({ credential: credential });
config.endpoint = getEndpoint(region);
return new FC20230330(config);
};
@ -94,94 +270,121 @@ const streamToString = async (readableStream: Readable) => {
return data;
};
class NetworkDelegate {
private proxies: ProxiesMap = {};
private providerLimiters: LimiterMap = {};
private proxyLimiters: OptionalLimiterMap = {};
private tasks: TaskMap = {};
export class NetworkDelegate<const C extends NetworkConfig> {
private readonly proxies: Record<string, ProxyDef>;
private readonly tasks: Record<string, { provider: string; proxies: string[] }>;
private readonly ipPools: Record<string, IPPoolManager> = {};
addProxy(proxyName: string, type: ProxyType, data: string): void {
this.proxies[proxyName] = { type, data };
}
private providerLimiters: Record<string, MultipleRateLimiter> = {};
private proxyLimiters: Record<string, MultipleRateLimiter> = {};
addTask(taskName: string, provider: string, proxies: string[] | "all"): void {
this.tasks[taskName] = { provider, proxies };
}
constructor(config: C) {
this.proxies = config.proxies;
this.tasks = {};
this.ipPools = {};
getTaskProxies(taskName: string): string[] {
if (!this.tasks[taskName]) {
return [];
// Initialize IP pools for ip-proxy configurations
for (const [proxyName, proxyDef] of Object.entries(this.proxies)) {
if (isIpProxy(proxyDef)) {
this.ipPools[proxyName] = new IPPoolManager(proxyDef.data);
// Initialize asynchronously but don't wait
this.ipPools[proxyName].initialize().catch(error => {
logger.error(error as Error, "net", `Failed to initialize IP pool for ${proxyName}`);
});
}
}
if (this.tasks[taskName].proxies === "all") {
return Object.keys(this.proxies);
}
return this.tasks[taskName].proxies;
}
setTaskLimiter(taskName: string, config: RateLimiterConfig[] | null): void {
const proxies = this.getTaskProxies(taskName);
for (const proxyName of proxies) {
const limiterId = "proxy-" + proxyName + "-" + taskName;
this.proxyLimiters[limiterId] = config
? new MultipleRateLimiter(limiterId, config)
: null;
const allProxyNames = Object.keys(this.proxies);
for (const [taskName, taskDef] of Object.entries(config.tasks)) {
const targetProxies =
taskDef.proxies === "all" ? allProxyNames : (taskDef.proxies as readonly string[]);
for (const p of targetProxies) {
if (!this.proxies[p]) {
throw new Error(`Task ${taskName} references missing proxy: ${p}`);
}
}
this.tasks[taskName] = {
provider: taskDef.provider,
proxies: [...targetProxies]
};
if (taskDef.limiters && taskDef.limiters.length > 0) {
for (const proxyName of targetProxies) {
const limiterId = `proxy-${proxyName}-${taskName}`;
this.proxyLimiters[limiterId] = new MultipleRateLimiter(limiterId, [
...taskDef.limiters
]);
}
}
}
for (const [providerName, providerDef] of Object.entries(config.providers)) {
if (!providerDef.limiters || providerDef.limiters.length === 0) continue;
const boundProxies = new Set<string>();
for (const [_taskName, taskImpl] of Object.entries(this.tasks)) {
if (taskImpl.provider === providerName) {
taskImpl.proxies.forEach((p) => boundProxies.add(p));
}
}
for (const proxyName of boundProxies) {
const limiterId = `provider-${proxyName}-${providerName}`;
if (!this.providerLimiters[limiterId]) {
this.providerLimiters[limiterId] = new MultipleRateLimiter(limiterId, [
...providerDef.limiters
]);
}
}
}
}
async triggerLimiter(task: string, proxy: string, force: boolean = false): Promise<void> {
const limiterId = "proxy-" + proxy + "-" + task;
const providerLimiterId = "provider-" + proxy + "-" + this.tasks[task].provider;
private async triggerLimiter(
taskName: string,
proxyName: string,
force: boolean = false
): Promise<void> {
const taskImpl = this.tasks[taskName];
if (!taskImpl) return;
const proxyLimiterId = `proxy-${proxyName}-${taskName}`;
const providerLimiterId = `provider-${proxyName}-${taskImpl.provider}`;
try {
await this.proxyLimiters[limiterId]?.trigger(!force);
await this.providerLimiters[providerLimiterId]?.trigger(!force);
if (this.proxyLimiters[proxyLimiterId]) {
await this.proxyLimiters[proxyLimiterId].trigger(!force);
}
if (this.providerLimiters[providerLimiterId]) {
await this.providerLimiters[providerLimiterId].trigger(!force);
}
} catch (e) {
const error = e as Error;
if (e instanceof ReplyError) {
logger.error(error, "redis", "fn:triggerLimiter");
} else if (e instanceof RateLimiterError) {
// Re-throw it to ensure this.request can catch it
throw e;
} else {
logger.warn(`Unhandled error: ${error.message}`, "mq", "proxyRequest");
}
logger.warn(`Unhandled error: ${error.message}`, "mq", "proxyRequest");
}
}
setProviderLimiter(providerName: string, config: RateLimiterConfig[]): void {
let bindProxies: string[] = [];
for (const taskName in this.tasks) {
if (this.tasks[taskName].provider !== providerName) continue;
const proxies = this.getTaskProxies(taskName);
bindProxies = bindProxies.concat(proxies);
}
for (const proxyName of bindProxies) {
const limiterId = "provider-" + proxyName + "-" + providerName;
this.providerLimiters[limiterId] = new MultipleRateLimiter(limiterId, config);
}
}
async request<R>(url: string, task: keyof C["tasks"]): Promise<{ data: R; time: number }> {
const taskName = task as string;
const taskImpl = this.tasks[taskName];
if (!taskImpl) {
throw new Error(`Task definition missing for ${taskName}`);
}
const proxiesNames = taskImpl.proxies;
/*
* Make a request to the specified URL with any available proxy
* @param {string} url - The URL to request.
* @param {string} method - The HTTP method to use for the request. Default is "GET".
* @returns {Promise<any>} - A promise that resolves to the response body.
* @throws {NetSchedulerError} - The error will be thrown in following cases:
* - No proxy is available currently: with error code `NO_PROXY_AVAILABLE`
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
* - The proxy type is not supported: with error code `NOT_IMPLEMENTED`
*/
async request<R>(
url: string,
task: string
): Promise<{
data: R;
time: number;
}> {
// find a available proxy
const proxiesNames = this.getTaskProxies(task);
for (const proxyName of shuffleArray(proxiesNames)) {
try {
return await this.proxyRequest<R>(url, proxyName, task);
return await this.proxyRequest<R>(url, proxyName, taskName);
} catch (e) {
if (e instanceof RateLimiterError) {
continue;
@ -192,30 +395,12 @@ class NetworkDelegate {
throw new NetSchedulerError("No proxy is available currently.", "NO_PROXY_AVAILABLE");
}
/*
* Make a request to the specified URL with the specified proxy
* @param {string} url - The URL to request.
* @param {string} proxyName - The name of the proxy to use.
* @param {string} task - The name of the task to use.
* @param {string} method - The HTTP method to use for the request. Default is "GET".
* @param {boolean} force - If true, the request will be made even if the proxy is rate limited. Default is false.
* @returns {Promise<any>} - A promise that resolves to the response body.
* @throws {NetSchedulerError} - The error will be thrown in following cases:
* - Proxy not found: with error code `PROXY_NOT_FOUND`
* - Proxy is under rate limit: with error code `PROXY_RATE_LIMITED`
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
* - The proxy type is not supported: with error code `NOT_IMPLEMENTED`
*/
async proxyRequest<R>(
url: string,
proxyName: string,
task: string,
force: boolean = false
): Promise<{
data: R;
time: number;
}> {
): Promise<{ data: R; time: number }> {
const proxy = this.proxies[proxyName];
if (!proxy) {
throw new NetSchedulerError(`Proxy "${proxyName}" not found`, "PROXY_NOT_FOUND");
@ -225,18 +410,20 @@ class NetworkDelegate {
return this.makeRequest<R>(url, proxy);
}
private async makeRequest<R>(
url: string,
proxy: Proxy
): Promise<{
data: R;
time: number;
}> {
private async makeRequest<R>(url: string, proxy: ProxyDef): Promise<{ data: R; time: number }> {
switch (proxy.type) {
case "native":
return await this.nativeRequest<R>(url);
case "alicloud-fc":
if (!isAlicloudFcProxy(proxy)) {
throw new NetSchedulerError("Invalid alicloud-fc proxy configuration", "ALICLOUD_PROXY_ERR");
}
return await this.alicloudFcRequest<R>(url, proxy.data);
case "ip-proxy":
if (!isIpProxy(proxy)) {
throw new NetSchedulerError("Invalid ip-proxy configuration", "NOT_IMPLEMENTED");
}
return await this.ipProxyRequest<R>(url, proxy);
default:
throw new NetSchedulerError(
`Proxy type ${proxy.type} not supported`,
@ -245,28 +432,19 @@ class NetworkDelegate {
}
}
private async nativeRequest<R>(url: string): Promise<{
data: R;
time: number;
}> {
private async nativeRequest<R>(url: string): Promise<{ data: R; time: number }> {
try {
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), 10 * SECOND);
const response = await fetch(url, {
signal: controller.signal
});
const response = await fetch(url, { signal: controller.signal });
clearTimeout(timeout);
const start = Date.now();
const data = await response.json();
const end = Date.now();
const serverTime = start + (end - start) / 2;
return {
data: data as R,
time: serverTime
};
return { data: data as R, time: serverTime };
} catch (e) {
throw new NetSchedulerError("Fetch error", "FETCH_ERROR", e);
}
@ -274,37 +452,33 @@ class NetworkDelegate {
private async alicloudFcRequest<R>(
url: string,
region: string
): Promise<{
data: R;
time: number;
}> {
proxyData: AlicloudFcProxyData
): Promise<{ data: R; time: number }> {
try {
const client = getAlicloudClient(region);
const client = getAlicloudClient(proxyData.region);
const bodyStream = Stream.readFromString(JSON.stringify({ url: url }));
const headers = new $FC20230330.InvokeFunctionHeaders({});
const request = new $FC20230330.InvokeFunctionRequest({
body: bodyStream
});
const request = new $FC20230330.InvokeFunctionRequest({ body: bodyStream });
const runtime = new Util.RuntimeOptions({});
const response = await client.invokeFunctionWithOptions(
`proxy-${region}`,
`proxy-${proxyData.region}`,
request,
headers,
runtime
);
if (response.statusCode !== 200) {
// noinspection ExceptionCaughtLocallyJS
throw new NetSchedulerError(
`Error proxying ${url} to ali-fc region ${region}, code: ${response.statusCode} (Not correctly invoked).`,
`Error proxying ${url} to ali-fc region ${proxyData.region}, code: ${response.statusCode}`,
"ALICLOUD_PROXY_ERR"
);
}
const rawData = JSON.parse(await streamToString(response.body)) as FCResponse;
if (rawData.statusCode !== 200) {
// noinspection ExceptionCaughtLocallyJS
throw new NetSchedulerError(
`Error proxying ${url} to ali-fc region ${region}, code: ${rawData.statusCode}. (fetch error)`,
`Error proxying ${url} to ali-fc region ${proxyData.region}, remote code: ${rawData.statusCode}`,
"ALICLOUD_PROXY_ERR"
);
} else {
@ -316,102 +490,177 @@ class NetworkDelegate {
} catch (e) {
logger.error(e as Error, "net", "fn:alicloudFcRequest");
throw new NetSchedulerError(
`Unhandled error: Cannot proxy ${url} to ali-fc-${region}.`,
`Unhandled error: Cannot proxy ${url} to ali-fc-${proxyData.region}.`,
"ALICLOUD_PROXY_ERR",
e
);
}
}
private async ipProxyRequest<R>(
url: string,
proxyDef: ProxyDef<IPProxyConfig>
): Promise<{ data: R; time: number }> {
const proxyName = Object.entries(this.proxies).find(([_, proxy]) => proxy === proxyDef)?.[0];
if (!proxyName || !this.ipPools[proxyName]) {
throw new NetSchedulerError("IP pool not found", "IP_POOL_EXHAUSTED");
}
const ipPool = this.ipPools[proxyName];
const ipEntry = await ipPool.getNextIP();
if (!ipEntry) {
throw new NetSchedulerError("No IP available in pool", "IP_POOL_EXHAUSTED");
}
try {
const controller = new AbortController();
const now = Date.now();
const timeout = setTimeout(() => controller.abort(), ipEntry.lifespan - (now - ipEntry.createdAt));
const response = await fetch(url, {
signal: controller.signal,
proxy: `http://${ipEntry.address}:${ipEntry.port}`
});
clearTimeout(timeout);
const start = Date.now();
const data = await response.json();
const end = Date.now();
const serverTime = start + (end - start) / 2;
// Mark IP as used
await ipPool.markIPUsed(ipEntry.address);
return { data: data as R, time: serverTime };
} catch (error) {
// Mark IP as used even if request failed (single-use strategy)
await ipPool.markIPUsed(ipEntry.address);
throw new NetSchedulerError("IP proxy request failed", "IP_EXTRACTION_FAILED", error);
}
}
}
const networkDelegate = new NetworkDelegate();
const videoInfoRateLimiterConfig: RateLimiterConfig[] = [
{
duration: 0.3,
max: 1
},
{
duration: 3,
max: 5
},
{
duration: 30,
max: 30
},
{
duration: 2 * 60,
max: 50
}
];
const biliLimiterConfig: RateLimiterConfig[] = [
{
duration: 1,
max: 20
},
{
duration: 15,
max: 130
},
{
duration: 5 * 60,
max: 2000
}
{ duration: 1, max: 20 },
{ duration: 15, max: 130 },
{ duration: 5 * 60, max: 2000 }
];
const bili_normal = [...biliLimiterConfig];
const bili_normal = structuredClone(biliLimiterConfig);
bili_normal[0].max = 5;
bili_normal[1].max = 40;
bili_normal[2].max = 200;
const bili_strict = [...biliLimiterConfig];
const bili_strict = structuredClone(biliLimiterConfig);
bili_strict[0].max = 1;
bili_strict[1].max = 6;
bili_strict[2].max = 100;
/*
Execution order for setup:
const aliRegions = ["hangzhou"] as const;
1. addProxy(proxyName, type, data):
- Must be called first. Registers proxies in the system, making them available for tasks.
- Define all proxies before proceeding to define tasks or set up limiters.
2. addTask(taskName, provider, proxies):
- Call after addProxy. Defines tasks and associates them with providers and proxies.
- Relies on proxies being already added.
- Must be called before setting task-specific or provider-specific limiters.
3. setTaskLimiter(taskName, config):
- Call after addProxy and addTask. Configures rate limiters specifically for tasks and their associated proxies.
- Depends on tasks and proxies being defined to apply limiters correctly.
4. setProviderLimiter(providerName, config):
- Call after addProxy and addTask.
- It sets rate limiters at the provider level, affecting all proxies used by tasks of that provider.
- Depends on tasks and proxies being defined to identify which proxies to apply provider-level limiters to.
const proxies = {
native: {
type: "native" as const,
data: {}
},
In summary: addProxy -> addTask -> (setTaskLimiter and/or setProviderLimiter).
The order of setTaskLimiter and setProviderLimiter relative to each other is flexible,
but both should come after addProxy and addTask to ensure proper setup and dependencies are met.
*/
alicloud_hangzhou: {
type: "alicloud-fc" as const,
data: {
region: "hangzhou",
timeout: 15000
}
},
const aliRegions = ["beijing", "hangzhou"];
const fcProxies = aliRegions.map((region) => `alicloud-${region}`);
const fcProxiesL = aliRegions.slice(1).map((region) => `alicloud-${region}`);
networkDelegate.addProxy("native", "native", "");
for (const region of aliRegions) {
networkDelegate.addProxy(`alicloud-${region}`, "alicloud-fc", region);
}
ip_proxy_pool: {
type: "ip-proxy" as const,
data: {
extractor: async (): Promise<IPEntry[]> => {
interface APIResponse {
code: number;
data: {
ip: string;
port: number;
endtime: string;
city: string;
}[]
}
const url = Bun.env.IP_PROXY_EXTRACTOR_URL;
const response = await fetch(url);
const data = await response.json() as APIResponse;
if (data.code !== 0){
throw new Error(`IP proxy extractor failed with code ${data.code}`);
}
const ips = data.data;
return ips.map((item) => {
return {
address: item.ip,
port: item.port,
lifespan: Date.parse(item.endtime+'+08') - Date.now(),
createdAt: Date.now(),
used: false
}
})
},
strategy: "round-robin",
minPoolSize: 10,
maxPoolSize: 500,
refreshInterval: 5 * SECOND,
initialPoolSize: 10
}
}
} satisfies Record<string, ProxyDef>;
networkDelegate.addTask("test", "test", "all");
networkDelegate.addTask("getVideoInfo", "bilibili", "all");
networkDelegate.addTask("getLatestVideos", "bilibili", "all");
networkDelegate.addTask("snapshotMilestoneVideo", "bilibili", fcProxies);
networkDelegate.addTask("snapshotVideo", "bilibili", fcProxiesL);
networkDelegate.addTask("bulkSnapshot", "bilibili", fcProxiesL);
type MyProxyKeys = keyof typeof proxies;
networkDelegate.setTaskLimiter("getVideoInfo", videoInfoRateLimiterConfig);
networkDelegate.setTaskLimiter("bulkSnapshot", bili_strict);
networkDelegate.setTaskLimiter("getLatestVideos", bili_strict);
networkDelegate.setTaskLimiter("getVideoInfo", bili_strict);
networkDelegate.setTaskLimiter("snapshotVideo", bili_normal);
networkDelegate.setProviderLimiter("test", []);
networkDelegate.setProviderLimiter("bilibili", biliLimiterConfig);
const fcProxies = aliRegions.map((region) => `alicloud_${region}`) as MyProxyKeys[];
const config = {
proxies: proxies,
providers: {
test: { limiters: [] },
bilibili: { limiters: biliLimiterConfig }
},
tasks: {
test: {
provider: "test",
proxies: fcProxies
},
test_ip: {
provider: "test",
proxies: ["ip_proxy_pool"]
},
getVideoInfo: {
provider: "bilibili",
proxies: "all",
limiters: bili_strict
},
getLatestVideos: {
provider: "bilibili",
proxies: "all",
limiters: bili_strict
},
snapshotMilestoneVideo: {
provider: "bilibili",
proxies: ["ip_proxy_pool"]
},
snapshotVideo: {
provider: "bilibili",
proxies: ["ip_proxy_pool"],
limiters: bili_normal
},
bulkSnapshot: {
provider: "bilibili",
proxies: ["ip_proxy_pool"],
limiters: bili_strict
}
}
} as const satisfies NetworkConfig;
export type RequestTasks = keyof typeof config.tasks;
const networkDelegate = new NetworkDelegate(config);
export default networkDelegate;

View File

@ -2,15 +2,9 @@ import networkDelegate from "@core/net/delegate";
import type { VideoDetailsData, VideoDetailsResponse } from "@core/net/bilibili.d";
import logger from "@core/log";
export async function getVideoDetails(
aid: number,
archive: boolean = false
): Promise<VideoDetailsData | null> {
export async function getVideoDetails(aid: number): Promise<VideoDetailsData | null> {
const url = `https://api.bilibili.com/x/web-interface/view/detail?aid=${aid}`;
const { data } = await networkDelegate.request<VideoDetailsResponse>(
url,
archive ? "" : "getVideoInfo"
);
const { data } = await networkDelegate.request<VideoDetailsResponse>(url, "getVideoInfo");
const errMessage = `Error fetching metadata for ${aid}:`;
if (data.code !== 0) {
logger.error(errMessage + data.code + "-" + data.message, "net", "fn:getVideoInfo");

View File

@ -17,7 +17,7 @@ import logger from "@core/log";
*/
export async function getVideoInfo(
aid: number,
task: string
task: "snapshotVideo" | "getVideoInfo" | "snapshotMilestoneVideo"
): Promise<
| {
data: VideoInfoData;
@ -37,30 +37,3 @@ export async function getVideoInfo(
time: time
};
}
/*
* Fetch video metadata from bilibili API by BVID
* @param {string} bvid - The video's BVID
* @param {string} task - The task name used in scheduler. It can be one of the following:
* - snapshotVideo
* - getVideoInfo
* - snapshotMilestoneVideo
* @returns {Promise<VideoInfoData | number>} VideoInfoData or the error code returned by bilibili API
* @throws {NetSchedulerError} - The error will be thrown in following cases:
* - No proxy is available currently: with error code `NO_PROXY_AVAILABLE`
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
*/
export async function getVideoInfoByBV(
bvid: string,
task: string
): Promise<VideoInfoData | number> {
const url = `https://api.bilibili.com/x/web-interface/view?bvid=${bvid}`;
const { data } = await networkDelegate.request<VideoInfoResponse>(url, task);
const errMessage = `Error fetching metadata for ${bvid}:`;
if (data.code !== 0) {
logger.error(errMessage + data.code + "-" + data.message, "net", "fn:getVideoInfoByBV");
return data.code;
}
return data.data;
}

View File

@ -0,0 +1,30 @@
import networkDelegate, { type RequestTasks } from "@core/net/delegate.ts";
import { VideoInfoResponse } from "@core/net/bilibili";
import { PartialBy } from "@core/lib";
import { VideoSnapshotType } from "@core/drizzle";
export class BilibiliService {
private static videoMetadataUrl = "https://api.bilibili.com/x/web-interface/view";
private static async getVideoMetadata(aid: number, task: RequestTasks) {
const url = new URL(this.videoMetadataUrl);
url.searchParams.set("aid", aid.toString());
return networkDelegate.request<VideoInfoResponse>(url.toString(), task);
}
static async milestoneSnapshot(aid: number): Promise<PartialBy<VideoSnapshotType, "id">> {
const metadata = await this.getVideoMetadata(aid, "snapshotMilestoneVideo");
const stats = metadata.data.data.stat;
return {
aid,
createdAt: new Date(metadata.time).toISOString(),
views: stats.view,
likes: stats.like,
coins: stats.coin,
favorites: stats.favorite,
replies: stats.reply,
shares: stats.share,
danmakus: stats.danmaku
};
}
}

View File

@ -3,7 +3,18 @@ import { test, expect, describe } from "bun:test";
describe("proxying requests", () => {
test("Alibaba Cloud FC", async () => {
const { res } = await networkDelegate.request("https://postman-echo.com/get", "test") as any;
expect(res.headers.referer).toBe('https://www.bilibili.com/');
const { data } = (await networkDelegate.request<{
headers: Record<string, string>;
}>(
"https://postman-echo.com/get",
"test"
));
expect(data.headers.referer).toBe('https://www.bilibili.com/');
});
test("IP Proxy", async () => {
const { data } = await networkDelegate.request<{
headers: Record<string, string>;
}>("https://postman-echo.com/get", "test_ip");
expect(data.headers).toBeObject();
});
});

View File

@ -61,9 +61,9 @@ export const classifyVideosWorker = async () => {
const videos = await getUnlabelledVideos();
logger.log(`Found ${videos.length} unlabelled videos`);
const startTime = new Date().getTime();
const startTime = Date.now();
for (const aid of videos) {
const now = new Date().getTime();
const now = Date.now();
if (now - startTime > 4.2 * MINUTE) {
await lockManager.releaseLock("classifyVideos");
return 1;

View File

@ -15,16 +15,16 @@ import { NetSchedulerError } from "@core/net/delegate";
import { sql } from "@core/db/dbNew";
import { closetMilestone } from "./snapshotTick";
const snapshotTypeToTaskMap: { [key: string]: string } = {
const snapshotTypeToTaskMap = {
milestone: "snapshotMilestoneVideo",
normal: "snapshotVideo",
new: "snapshotMilestoneVideo"
};
} as const;
export const snapshotVideoWorker = async (job: Job): Promise<void> => {
const id = job.data.id;
const aid = Number(job.data.aid);
const type = job.data.type;
const type = job.data.type as "milestone" | "normal" | "new";
const task = snapshotTypeToTaskMap[type] ?? "snapshotVideo";
const retryInterval = type === "milestone" ? 5 * SECOND : 2 * MINUTE;
const latestSnapshot = await getLatestSnapshot(sql, aid);

View File

@ -52,7 +52,7 @@ export const takeBulkSnapshotForVideosWorker = async (job: Job) => {
if (currentSnapshot) {
const DELTA = 0.0001;
const viewsDiff = views - currentSnapshot.views;
const hoursDiff = (new Date().getTime() - currentSnapshot.created_at) / HOUR;
const hoursDiff = (Date.now() - currentSnapshot.created_at) / HOUR;
const speed = viewsDiff / (hoursDiff + DELTA);
const target = closetMilestone(views, true);
const viewsToIncrease = target - views;

View File

@ -33,7 +33,7 @@ export const getAdjustedShortTermETA = async (sql: Psql, aid: number) => {
const snapshotsEnough = await hasAtLeast2Snapshots(sql, aid);
if (!snapshotsEnough) return 0;
const currentTimestamp = new Date().getTime();
const currentTimestamp = Date.now();
const timeIntervals = [3 * MINUTE, 20 * MINUTE, HOUR, 3 * HOUR, 6 * HOUR, 72 * HOUR];
const DELTA = 0.00001;
let minETAHours = Infinity;

View File

@ -28,7 +28,7 @@ export interface SnapshotNumber {
export async function takeVideoSnapshot(
sql: Psql,
aid: number,
task: string
task: "snapshotMilestoneVideo" | "snapshotVideo"
): Promise<number | SnapshotNumber> {
const r = await getVideoInfo(aid, task);
if (typeof r == "number") {
@ -53,7 +53,7 @@ export async function takeVideoSnapshot(
danmakus,
replies,
aid
})
});
logger.log(`Taken snapshot for video ${aid}.`, "net", "fn:insertVideoSnapshot");

View File

@ -1,5 +1,4 @@
import { sql } from "@core/db/dbNew";
import logger from "@core/log";
export async function removeAllTimeoutSchedules() {
return sql`

View File

@ -1,5 +1,6 @@
import { useTheme } from "next-themes";
import { Toaster as Sonner, type ToasterProps } from "sonner";
import React from "react";
const Toaster = ({ ...props }: ToasterProps) => {
const { theme = "system" } = useTheme();