diff --git a/.idea/data_source_mapping.xml b/.idea/data_source_mapping.xml new file mode 100644 index 0000000..e369253 --- /dev/null +++ b/.idea/data_source_mapping.xml @@ -0,0 +1,7 @@ + + + + + + + \ No newline at end of file diff --git a/.idea/db-forest-config.xml b/.idea/db-forest-config.xml new file mode 100644 index 0000000..91cc24e --- /dev/null +++ b/.idea/db-forest-config.xml @@ -0,0 +1,6 @@ + + + + + \ No newline at end of file diff --git a/packages/backend/routes/ping/index.ts b/packages/backend/routes/ping/index.ts index 7bea0af..f3fbdd7 100644 --- a/packages/backend/routes/ping/index.ts +++ b/packages/backend/routes/ping/index.ts @@ -15,7 +15,7 @@ export const pingHandler = new Elysia({ prefix: "/ping" }).use(ip()).get( url: request.url }, response: { - time: new Date().getTime(), + time: Date.now(), status: 200, version: VERSION } diff --git a/packages/core/db/snapshots/milestone.ts b/packages/core/db/snapshots/milestone.ts index e07c876..f859427 100644 --- a/packages/core/db/snapshots/milestone.ts +++ b/packages/core/db/snapshots/milestone.ts @@ -10,7 +10,7 @@ export const getGroundTruthMilestoneETA = async ( const DELTA = 1e-5; let minETAHours = Infinity; const timeIntervals = [3 * MINUTE, 20 * MINUTE, HOUR, 3 * HOUR, 6 * HOUR, 72 * HOUR]; - const currentTimestamp = new Date().getTime(); + const currentTimestamp = Date.now(); const latestSnapshot = await getLatestSnapshot(aid); const latestSnapshotTime = new Date(latestSnapshot.time).getTime(); for (const timeInterval of timeIntervals) { diff --git a/packages/core/net/bilibili.d.ts b/packages/core/net/bilibili.d.ts index 17c6339..8abdf81 100644 --- a/packages/core/net/bilibili.d.ts +++ b/packages/core/net/bilibili.d.ts @@ -7,7 +7,6 @@ interface BaseResponse { export type VideoListResponse = BaseResponse; export type VideoDetailsResponse = BaseResponse; -export type VideoTagsResponse = BaseResponse; export type VideoInfoResponse = BaseResponse; export type MediaListInfoResponse = BaseResponse; diff --git a/packages/core/net/delegate.ts b/packages/core/net/delegate.ts index 8c3f7d6..6292713 100644 --- a/packages/core/net/delegate.ts +++ b/packages/core/net/delegate.ts @@ -1,3 +1,5 @@ +// noinspection ExceptionCaughtLocallyJS + import logger from "@core/log"; import { MultipleRateLimiter, @@ -13,7 +15,7 @@ import Stream from "@alicloud/darabonba-stream"; import * as Util from "@alicloud/tea-util"; import { Readable } from "stream"; -type ProxyType = "native" | "alicloud-fc" | "baidu-cfc"; +type ProxyType = "native" | "alicloud-fc" | "ip-proxy"; interface FCResponse { statusCode: number; @@ -21,18 +23,71 @@ interface FCResponse { serverTime: number; } -interface Proxy { +interface NativeProxyData { +} + +interface AlicloudFcProxyData { + region: string; + timeout?: number; +} + +// New IP proxy system interfaces +interface IPEntry { + address: string; + /* + Lifespan of this IP addressin milliseconds + */ + lifespan: number; + port?: number; + /* + When this IP was created, UNIX timestamp in milliseconds + */ + createdAt: number; + used: boolean; +} + +type IPExtractor = () => Promise; + +type IPRotationStrategy = "single-use" | "round-robin" | "random"; + +interface IPProxyConfig { + extractor: IPExtractor; + strategy?: IPRotationStrategy; // defaults to "single-use" + minPoolSize?: number; // minimum IPs to maintain (default: 5) + maxPoolSize?: number; // maximum IPs to cache (default: 50) + refreshInterval?: number; // how often to check for new IPs (default: 30s) + initialPoolSize?: number; // how many IPs to fetch initially (default: 10) +} + +type ProxyData = NativeProxyData | AlicloudFcProxyData | IPProxyConfig; + +interface ProxyDef { type: ProxyType; - data: string; + data: T; } -interface Task { - provider: string; - proxies: string[] | "all"; +function isAlicloudFcProxy(proxy: ProxyDef): proxy is ProxyDef { + return proxy.type === "alicloud-fc"; } -interface ProxiesMap { - [name: string]: Proxy; +function isIpProxy(proxy: ProxyDef): proxy is ProxyDef { + return proxy.type === "ip-proxy"; +} + +interface ProviderDef { + limiters: readonly RateLimiterConfig[]; +} + +interface TaskDef { + provider: ProviderKeys; + proxies: readonly ProxyKeys[] | "all"; + limiters?: readonly RateLimiterConfig[]; +} + +interface NetworkConfig { + proxies: Record; + providers: Record; + tasks: Record>; } type NetworkDelegateErrorCode = @@ -41,7 +96,9 @@ type NetworkDelegateErrorCode = | "PROXY_NOT_FOUND" | "FETCH_ERROR" | "NOT_IMPLEMENTED" - | "ALICLOUD_PROXY_ERR"; + | "ALICLOUD_PROXY_ERR" + | "IP_POOL_EXHAUSTED" + | "IP_EXTRACTION_FAILED"; export class NetSchedulerError extends Error { public code: NetworkDelegateErrorCode; @@ -54,34 +111,153 @@ export class NetSchedulerError extends Error { } } -type LimiterMap = { - [name: string]: MultipleRateLimiter; -}; - -type OptionalLimiterMap = { - [name: string]: MultipleRateLimiter | null; -}; - -type TaskMap = { - [name: string]: Task; -}; - -function shuffleArray(array: T[]): T[] { - const newArray = [...array]; // Create a shallow copy to avoid in-place modification +function shuffleArray(array: readonly T[]): T[] { + const newArray = [...array]; for (let i = newArray.length - 1; i > 0; i--) { const j = Math.floor(Math.random() * (i + 1)); - [newArray[i], newArray[j]] = [newArray[j], newArray[i]]; // Swap elements + [newArray[i], newArray[j]] = [newArray[j], newArray[i]]; } return newArray; } +class IPPoolManager { + private pool: IPEntry[] = []; + private readonly config: Required; + protected refreshTimer: NodeJS.Timeout; + private isRefreshing = false; + + constructor(config: IPProxyConfig) { + this.config = { + extractor: config.extractor, + strategy: config.strategy ?? "single-use", + minPoolSize: config.minPoolSize ?? 5, + maxPoolSize: config.maxPoolSize ?? 50, + refreshInterval: config.refreshInterval ?? 30_000, + initialPoolSize: config.initialPoolSize ?? 10 + }; + } + + async initialize(): Promise { + await this.refreshPool(); + this.startPeriodicRefresh(); + } + + private startPeriodicRefresh(): void { + this.refreshTimer = setInterval(async () => { + await this.refreshPool(); + }, this.config.refreshInterval); + } + + async getNextIP(): Promise { + // Clean expired IPs first + this.cleanExpiredIPs(); + + // Try to get available IP based on strategy + let selectedIP: IPEntry | null = null; + + switch (this.config.strategy) { + case "single-use": + selectedIP = this.getAvailableIP(); + break; + case "round-robin": + selectedIP = this.getRoundRobinIP(); + break; + case "random": + selectedIP = this.getRandomIP(); + break; + } + + // If no IP available and pool is low, try to refresh + if (!selectedIP && this.pool.length < this.config.minPoolSize) { + await this.refreshPool(); + selectedIP = this.getAvailableIP(); + } + + return selectedIP; + } + + private getAvailableIP(): IPEntry | null { + const availableIPs = this.pool.filter((ip) => !ip.used); + if (availableIPs.length === 0) return null; + + // For single-use, mark IP as used immediately + const selectedIP = availableIPs[0]; + selectedIP.used = true; + return selectedIP; + } + + private getRoundRobinIP(): IPEntry | null { + const availableIPs = this.pool.filter((ip) => !ip.used); + if (availableIPs.length === 0) return null; + + const selectedIP = availableIPs[0]; + selectedIP.used = true; + return selectedIP; + } + + private getRandomIP(): IPEntry | null { + const availableIPs = this.pool.filter((ip) => !ip.used); + if (availableIPs.length === 0) return null; + + const randomIndex = Math.floor(Math.random() * availableIPs.length); + const selectedIP = availableIPs[randomIndex]; + selectedIP.used = true; + return selectedIP; + } + + private cleanExpiredIPs(): void { + const now = Date.now(); + this.pool = this.pool.filter((ip) => { + const expiryTime = ip.createdAt + ip.lifespan; + return expiryTime > now; + }); + } + + private async refreshPool(): Promise { + if (this.isRefreshing) return; + + this.isRefreshing = true; + try { + logger.debug("Refreshing IP pool", "net", "IPPoolManager.refreshPool"); + + const extractedIPs = await this.config.extractor(); + const newIPs = extractedIPs.slice(0, this.config.maxPoolSize - this.pool.length); + + // Add new IPs to pool + for (const ipData of newIPs) { + const ipEntry: IPEntry = { + ...ipData, + createdAt: Date.now(), + used: false + }; + this.pool.push(ipEntry); + } + + logger.debug( + `IP pool refreshed. Pool size: ${this.pool.length}`, + "net", + "IPPoolManager.refreshPool" + ); + } catch (error) { + logger.error(error as Error, "net", "IPPoolManager.refreshPool"); + } finally { + this.isRefreshing = false; + } + } + + async markIPUsed(address: string): Promise { + const ip = this.pool.find((p) => p.address === address); + if (ip) { + ip.used = true; + } + } +} + const getEndpoint = (region: string) => `fcv3.cn-${region}.aliyuncs.com`; const getAlicloudClient = (region: string) => { const credential = new Credential(); - const config = new OpenApi.Config({ - credential: credential - }); + const config = new OpenApi.Config({ credential: credential }); config.endpoint = getEndpoint(region); return new FC20230330(config); }; @@ -94,94 +270,121 @@ const streamToString = async (readableStream: Readable) => { return data; }; -class NetworkDelegate { - private proxies: ProxiesMap = {}; - private providerLimiters: LimiterMap = {}; - private proxyLimiters: OptionalLimiterMap = {}; - private tasks: TaskMap = {}; +export class NetworkDelegate { + private readonly proxies: Record; + private readonly tasks: Record; + private readonly ipPools: Record = {}; - addProxy(proxyName: string, type: ProxyType, data: string): void { - this.proxies[proxyName] = { type, data }; - } + private providerLimiters: Record = {}; + private proxyLimiters: Record = {}; - addTask(taskName: string, provider: string, proxies: string[] | "all"): void { - this.tasks[taskName] = { provider, proxies }; - } + constructor(config: C) { + this.proxies = config.proxies; + this.tasks = {}; + this.ipPools = {}; - getTaskProxies(taskName: string): string[] { - if (!this.tasks[taskName]) { - return []; + // Initialize IP pools for ip-proxy configurations + for (const [proxyName, proxyDef] of Object.entries(this.proxies)) { + if (isIpProxy(proxyDef)) { + this.ipPools[proxyName] = new IPPoolManager(proxyDef.data); + // Initialize asynchronously but don't wait + this.ipPools[proxyName].initialize().catch(error => { + logger.error(error as Error, "net", `Failed to initialize IP pool for ${proxyName}`); + }); + } } - if (this.tasks[taskName].proxies === "all") { - return Object.keys(this.proxies); - } - return this.tasks[taskName].proxies; - } - setTaskLimiter(taskName: string, config: RateLimiterConfig[] | null): void { - const proxies = this.getTaskProxies(taskName); - for (const proxyName of proxies) { - const limiterId = "proxy-" + proxyName + "-" + taskName; - this.proxyLimiters[limiterId] = config - ? new MultipleRateLimiter(limiterId, config) - : null; + const allProxyNames = Object.keys(this.proxies); + + for (const [taskName, taskDef] of Object.entries(config.tasks)) { + const targetProxies = + taskDef.proxies === "all" ? allProxyNames : (taskDef.proxies as readonly string[]); + + for (const p of targetProxies) { + if (!this.proxies[p]) { + throw new Error(`Task ${taskName} references missing proxy: ${p}`); + } + } + + this.tasks[taskName] = { + provider: taskDef.provider, + proxies: [...targetProxies] + }; + + if (taskDef.limiters && taskDef.limiters.length > 0) { + for (const proxyName of targetProxies) { + const limiterId = `proxy-${proxyName}-${taskName}`; + this.proxyLimiters[limiterId] = new MultipleRateLimiter(limiterId, [ + ...taskDef.limiters + ]); + } + } + } + + for (const [providerName, providerDef] of Object.entries(config.providers)) { + if (!providerDef.limiters || providerDef.limiters.length === 0) continue; + + const boundProxies = new Set(); + for (const [_taskName, taskImpl] of Object.entries(this.tasks)) { + if (taskImpl.provider === providerName) { + taskImpl.proxies.forEach((p) => boundProxies.add(p)); + } + } + + for (const proxyName of boundProxies) { + const limiterId = `provider-${proxyName}-${providerName}`; + if (!this.providerLimiters[limiterId]) { + this.providerLimiters[limiterId] = new MultipleRateLimiter(limiterId, [ + ...providerDef.limiters + ]); + } + } } } - async triggerLimiter(task: string, proxy: string, force: boolean = false): Promise { - const limiterId = "proxy-" + proxy + "-" + task; - const providerLimiterId = "provider-" + proxy + "-" + this.tasks[task].provider; + private async triggerLimiter( + taskName: string, + proxyName: string, + force: boolean = false + ): Promise { + const taskImpl = this.tasks[taskName]; + if (!taskImpl) return; + + const proxyLimiterId = `proxy-${proxyName}-${taskName}`; + const providerLimiterId = `provider-${proxyName}-${taskImpl.provider}`; + try { - await this.proxyLimiters[limiterId]?.trigger(!force); - await this.providerLimiters[providerLimiterId]?.trigger(!force); + if (this.proxyLimiters[proxyLimiterId]) { + await this.proxyLimiters[proxyLimiterId].trigger(!force); + } + if (this.providerLimiters[providerLimiterId]) { + await this.providerLimiters[providerLimiterId].trigger(!force); + } } catch (e) { const error = e as Error; if (e instanceof ReplyError) { logger.error(error, "redis", "fn:triggerLimiter"); } else if (e instanceof RateLimiterError) { - // Re-throw it to ensure this.request can catch it throw e; + } else { + logger.warn(`Unhandled error: ${error.message}`, "mq", "proxyRequest"); } - logger.warn(`Unhandled error: ${error.message}`, "mq", "proxyRequest"); } } - setProviderLimiter(providerName: string, config: RateLimiterConfig[]): void { - let bindProxies: string[] = []; - for (const taskName in this.tasks) { - if (this.tasks[taskName].provider !== providerName) continue; - const proxies = this.getTaskProxies(taskName); - bindProxies = bindProxies.concat(proxies); - } - for (const proxyName of bindProxies) { - const limiterId = "provider-" + proxyName + "-" + providerName; - this.providerLimiters[limiterId] = new MultipleRateLimiter(limiterId, config); - } - } + async request(url: string, task: keyof C["tasks"]): Promise<{ data: R; time: number }> { + const taskName = task as string; + const taskImpl = this.tasks[taskName]; + + if (!taskImpl) { + throw new Error(`Task definition missing for ${taskName}`); + } + + const proxiesNames = taskImpl.proxies; - /* - * Make a request to the specified URL with any available proxy - * @param {string} url - The URL to request. - * @param {string} method - The HTTP method to use for the request. Default is "GET". - * @returns {Promise} - A promise that resolves to the response body. - * @throws {NetSchedulerError} - The error will be thrown in following cases: - * - No proxy is available currently: with error code `NO_PROXY_AVAILABLE` - * - The native `fetch` function threw an error: with error code `FETCH_ERROR` - * - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR` - * - The proxy type is not supported: with error code `NOT_IMPLEMENTED` - */ - async request( - url: string, - task: string - ): Promise<{ - data: R; - time: number; - }> { - // find a available proxy - const proxiesNames = this.getTaskProxies(task); for (const proxyName of shuffleArray(proxiesNames)) { try { - return await this.proxyRequest(url, proxyName, task); + return await this.proxyRequest(url, proxyName, taskName); } catch (e) { if (e instanceof RateLimiterError) { continue; @@ -192,30 +395,12 @@ class NetworkDelegate { throw new NetSchedulerError("No proxy is available currently.", "NO_PROXY_AVAILABLE"); } - /* - * Make a request to the specified URL with the specified proxy - * @param {string} url - The URL to request. - * @param {string} proxyName - The name of the proxy to use. - * @param {string} task - The name of the task to use. - * @param {string} method - The HTTP method to use for the request. Default is "GET". - * @param {boolean} force - If true, the request will be made even if the proxy is rate limited. Default is false. - * @returns {Promise} - A promise that resolves to the response body. - * @throws {NetSchedulerError} - The error will be thrown in following cases: - * - Proxy not found: with error code `PROXY_NOT_FOUND` - * - Proxy is under rate limit: with error code `PROXY_RATE_LIMITED` - * - The native `fetch` function threw an error: with error code `FETCH_ERROR` - * - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR` - * - The proxy type is not supported: with error code `NOT_IMPLEMENTED` - */ async proxyRequest( url: string, proxyName: string, task: string, force: boolean = false - ): Promise<{ - data: R; - time: number; - }> { + ): Promise<{ data: R; time: number }> { const proxy = this.proxies[proxyName]; if (!proxy) { throw new NetSchedulerError(`Proxy "${proxyName}" not found`, "PROXY_NOT_FOUND"); @@ -225,18 +410,20 @@ class NetworkDelegate { return this.makeRequest(url, proxy); } - private async makeRequest( - url: string, - proxy: Proxy - ): Promise<{ - data: R; - time: number; - }> { + private async makeRequest(url: string, proxy: ProxyDef): Promise<{ data: R; time: number }> { switch (proxy.type) { case "native": return await this.nativeRequest(url); case "alicloud-fc": + if (!isAlicloudFcProxy(proxy)) { + throw new NetSchedulerError("Invalid alicloud-fc proxy configuration", "ALICLOUD_PROXY_ERR"); + } return await this.alicloudFcRequest(url, proxy.data); + case "ip-proxy": + if (!isIpProxy(proxy)) { + throw new NetSchedulerError("Invalid ip-proxy configuration", "NOT_IMPLEMENTED"); + } + return await this.ipProxyRequest(url, proxy); default: throw new NetSchedulerError( `Proxy type ${proxy.type} not supported`, @@ -245,28 +432,19 @@ class NetworkDelegate { } } - private async nativeRequest(url: string): Promise<{ - data: R; - time: number; - }> { + private async nativeRequest(url: string): Promise<{ data: R; time: number }> { try { const controller = new AbortController(); const timeout = setTimeout(() => controller.abort(), 10 * SECOND); - const response = await fetch(url, { - signal: controller.signal - }); - + const response = await fetch(url, { signal: controller.signal }); clearTimeout(timeout); const start = Date.now(); const data = await response.json(); const end = Date.now(); const serverTime = start + (end - start) / 2; - return { - data: data as R, - time: serverTime - }; + return { data: data as R, time: serverTime }; } catch (e) { throw new NetSchedulerError("Fetch error", "FETCH_ERROR", e); } @@ -274,37 +452,33 @@ class NetworkDelegate { private async alicloudFcRequest( url: string, - region: string - ): Promise<{ - data: R; - time: number; - }> { + proxyData: AlicloudFcProxyData + ): Promise<{ data: R; time: number }> { try { - const client = getAlicloudClient(region); + const client = getAlicloudClient(proxyData.region); const bodyStream = Stream.readFromString(JSON.stringify({ url: url })); const headers = new $FC20230330.InvokeFunctionHeaders({}); - const request = new $FC20230330.InvokeFunctionRequest({ - body: bodyStream - }); + const request = new $FC20230330.InvokeFunctionRequest({ body: bodyStream }); const runtime = new Util.RuntimeOptions({}); + const response = await client.invokeFunctionWithOptions( - `proxy-${region}`, + `proxy-${proxyData.region}`, request, headers, runtime ); + if (response.statusCode !== 200) { - // noinspection ExceptionCaughtLocallyJS throw new NetSchedulerError( - `Error proxying ${url} to ali-fc region ${region}, code: ${response.statusCode} (Not correctly invoked).`, + `Error proxying ${url} to ali-fc region ${proxyData.region}, code: ${response.statusCode}`, "ALICLOUD_PROXY_ERR" ); } + const rawData = JSON.parse(await streamToString(response.body)) as FCResponse; if (rawData.statusCode !== 200) { - // noinspection ExceptionCaughtLocallyJS throw new NetSchedulerError( - `Error proxying ${url} to ali-fc region ${region}, code: ${rawData.statusCode}. (fetch error)`, + `Error proxying ${url} to ali-fc region ${proxyData.region}, remote code: ${rawData.statusCode}`, "ALICLOUD_PROXY_ERR" ); } else { @@ -316,102 +490,177 @@ class NetworkDelegate { } catch (e) { logger.error(e as Error, "net", "fn:alicloudFcRequest"); throw new NetSchedulerError( - `Unhandled error: Cannot proxy ${url} to ali-fc-${region}.`, + `Unhandled error: Cannot proxy ${url} to ali-fc-${proxyData.region}.`, "ALICLOUD_PROXY_ERR", e ); } } + + private async ipProxyRequest( + url: string, + proxyDef: ProxyDef + ): Promise<{ data: R; time: number }> { + const proxyName = Object.entries(this.proxies).find(([_, proxy]) => proxy === proxyDef)?.[0]; + if (!proxyName || !this.ipPools[proxyName]) { + throw new NetSchedulerError("IP pool not found", "IP_POOL_EXHAUSTED"); + } + + const ipPool = this.ipPools[proxyName]; + const ipEntry = await ipPool.getNextIP(); + + if (!ipEntry) { + throw new NetSchedulerError("No IP available in pool", "IP_POOL_EXHAUSTED"); + } + + try { + const controller = new AbortController(); + const now = Date.now(); + const timeout = setTimeout(() => controller.abort(), ipEntry.lifespan - (now - ipEntry.createdAt)); + + const response = await fetch(url, { + signal: controller.signal, + proxy: `http://${ipEntry.address}:${ipEntry.port}` + }); + + clearTimeout(timeout); + + const start = Date.now(); + const data = await response.json(); + const end = Date.now(); + const serverTime = start + (end - start) / 2; + + // Mark IP as used + await ipPool.markIPUsed(ipEntry.address); + + return { data: data as R, time: serverTime }; + } catch (error) { + // Mark IP as used even if request failed (single-use strategy) + await ipPool.markIPUsed(ipEntry.address); + throw new NetSchedulerError("IP proxy request failed", "IP_EXTRACTION_FAILED", error); + } + } } -const networkDelegate = new NetworkDelegate(); -const videoInfoRateLimiterConfig: RateLimiterConfig[] = [ - { - duration: 0.3, - max: 1 - }, - { - duration: 3, - max: 5 - }, - { - duration: 30, - max: 30 - }, - { - duration: 2 * 60, - max: 50 - } -]; const biliLimiterConfig: RateLimiterConfig[] = [ - { - duration: 1, - max: 20 - }, - { - duration: 15, - max: 130 - }, - { - duration: 5 * 60, - max: 2000 - } + { duration: 1, max: 20 }, + { duration: 15, max: 130 }, + { duration: 5 * 60, max: 2000 } ]; -const bili_normal = [...biliLimiterConfig]; +const bili_normal = structuredClone(biliLimiterConfig); bili_normal[0].max = 5; bili_normal[1].max = 40; bili_normal[2].max = 200; -const bili_strict = [...biliLimiterConfig]; +const bili_strict = structuredClone(biliLimiterConfig); bili_strict[0].max = 1; bili_strict[1].max = 6; bili_strict[2].max = 100; -/* -Execution order for setup: +const aliRegions = ["hangzhou"] as const; -1. addProxy(proxyName, type, data): - - Must be called first. Registers proxies in the system, making them available for tasks. - - Define all proxies before proceeding to define tasks or set up limiters. -2. addTask(taskName, provider, proxies): - - Call after addProxy. Defines tasks and associates them with providers and proxies. - - Relies on proxies being already added. - - Must be called before setting task-specific or provider-specific limiters. -3. setTaskLimiter(taskName, config): - - Call after addProxy and addTask. Configures rate limiters specifically for tasks and their associated proxies. - - Depends on tasks and proxies being defined to apply limiters correctly. -4. setProviderLimiter(providerName, config): - - Call after addProxy and addTask. - - It sets rate limiters at the provider level, affecting all proxies used by tasks of that provider. - - Depends on tasks and proxies being defined to identify which proxies to apply provider-level limiters to. +const proxies = { + native: { + type: "native" as const, + data: {} + }, -In summary: addProxy -> addTask -> (setTaskLimiter and/or setProviderLimiter). -The order of setTaskLimiter and setProviderLimiter relative to each other is flexible, -but both should come after addProxy and addTask to ensure proper setup and dependencies are met. -*/ + alicloud_hangzhou: { + type: "alicloud-fc" as const, + data: { + region: "hangzhou", + timeout: 15000 + } + }, -const aliRegions = ["beijing", "hangzhou"]; -const fcProxies = aliRegions.map((region) => `alicloud-${region}`); -const fcProxiesL = aliRegions.slice(1).map((region) => `alicloud-${region}`); -networkDelegate.addProxy("native", "native", ""); -for (const region of aliRegions) { - networkDelegate.addProxy(`alicloud-${region}`, "alicloud-fc", region); -} + ip_proxy_pool: { + type: "ip-proxy" as const, + data: { + extractor: async (): Promise => { + interface APIResponse { + code: number; + data: { + ip: string; + port: number; + endtime: string; + city: string; + }[] + } + const url = Bun.env.IP_PROXY_EXTRACTOR_URL; + const response = await fetch(url); + const data = await response.json() as APIResponse; + if (data.code !== 0){ + throw new Error(`IP proxy extractor failed with code ${data.code}`); + } + const ips = data.data; + return ips.map((item) => { + return { + address: item.ip, + port: item.port, + lifespan: Date.parse(item.endtime+'+08') - Date.now(), + createdAt: Date.now(), + used: false + } + }) + }, + strategy: "round-robin", + minPoolSize: 10, + maxPoolSize: 500, + refreshInterval: 5 * SECOND, + initialPoolSize: 10 + } + } +} satisfies Record; -networkDelegate.addTask("test", "test", "all"); -networkDelegate.addTask("getVideoInfo", "bilibili", "all"); -networkDelegate.addTask("getLatestVideos", "bilibili", "all"); -networkDelegate.addTask("snapshotMilestoneVideo", "bilibili", fcProxies); -networkDelegate.addTask("snapshotVideo", "bilibili", fcProxiesL); -networkDelegate.addTask("bulkSnapshot", "bilibili", fcProxiesL); +type MyProxyKeys = keyof typeof proxies; -networkDelegate.setTaskLimiter("getVideoInfo", videoInfoRateLimiterConfig); -networkDelegate.setTaskLimiter("bulkSnapshot", bili_strict); -networkDelegate.setTaskLimiter("getLatestVideos", bili_strict); -networkDelegate.setTaskLimiter("getVideoInfo", bili_strict); -networkDelegate.setTaskLimiter("snapshotVideo", bili_normal); -networkDelegate.setProviderLimiter("test", []); -networkDelegate.setProviderLimiter("bilibili", biliLimiterConfig); +const fcProxies = aliRegions.map((region) => `alicloud_${region}`) as MyProxyKeys[]; + +const config = { + proxies: proxies, + providers: { + test: { limiters: [] }, + bilibili: { limiters: biliLimiterConfig } + }, + tasks: { + test: { + provider: "test", + proxies: fcProxies + }, + test_ip: { + provider: "test", + proxies: ["ip_proxy_pool"] + }, + getVideoInfo: { + provider: "bilibili", + proxies: "all", + limiters: bili_strict + }, + getLatestVideos: { + provider: "bilibili", + proxies: "all", + limiters: bili_strict + }, + snapshotMilestoneVideo: { + provider: "bilibili", + proxies: ["ip_proxy_pool"] + }, + snapshotVideo: { + provider: "bilibili", + proxies: ["ip_proxy_pool"], + limiters: bili_normal + }, + bulkSnapshot: { + provider: "bilibili", + proxies: ["ip_proxy_pool"], + limiters: bili_strict + } + } +} as const satisfies NetworkConfig; + +export type RequestTasks = keyof typeof config.tasks; + +const networkDelegate = new NetworkDelegate(config); export default networkDelegate; diff --git a/packages/core/net/getVideoDetails.ts b/packages/core/net/getVideoDetails.ts index 67f5b5d..696478b 100644 --- a/packages/core/net/getVideoDetails.ts +++ b/packages/core/net/getVideoDetails.ts @@ -2,15 +2,9 @@ import networkDelegate from "@core/net/delegate"; import type { VideoDetailsData, VideoDetailsResponse } from "@core/net/bilibili.d"; import logger from "@core/log"; -export async function getVideoDetails( - aid: number, - archive: boolean = false -): Promise { +export async function getVideoDetails(aid: number): Promise { const url = `https://api.bilibili.com/x/web-interface/view/detail?aid=${aid}`; - const { data } = await networkDelegate.request( - url, - archive ? "" : "getVideoInfo" - ); + const { data } = await networkDelegate.request(url, "getVideoInfo"); const errMessage = `Error fetching metadata for ${aid}:`; if (data.code !== 0) { logger.error(errMessage + data.code + "-" + data.message, "net", "fn:getVideoInfo"); diff --git a/packages/core/net/getVideoInfo.ts b/packages/core/net/getVideoInfo.ts index cfc6d3b..c9bf91e 100644 --- a/packages/core/net/getVideoInfo.ts +++ b/packages/core/net/getVideoInfo.ts @@ -17,7 +17,7 @@ import logger from "@core/log"; */ export async function getVideoInfo( aid: number, - task: string + task: "snapshotVideo" | "getVideoInfo" | "snapshotMilestoneVideo" ): Promise< | { data: VideoInfoData; @@ -37,30 +37,3 @@ export async function getVideoInfo( time: time }; } - -/* - * Fetch video metadata from bilibili API by BVID - * @param {string} bvid - The video's BVID - * @param {string} task - The task name used in scheduler. It can be one of the following: - * - snapshotVideo - * - getVideoInfo - * - snapshotMilestoneVideo - * @returns {Promise} VideoInfoData or the error code returned by bilibili API - * @throws {NetSchedulerError} - The error will be thrown in following cases: - * - No proxy is available currently: with error code `NO_PROXY_AVAILABLE` - * - The native `fetch` function threw an error: with error code `FETCH_ERROR` - * - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR` - */ -export async function getVideoInfoByBV( - bvid: string, - task: string -): Promise { - const url = `https://api.bilibili.com/x/web-interface/view?bvid=${bvid}`; - const { data } = await networkDelegate.request(url, task); - const errMessage = `Error fetching metadata for ${bvid}:`; - if (data.code !== 0) { - logger.error(errMessage + data.code + "-" + data.message, "net", "fn:getVideoInfoByBV"); - return data.code; - } - return data.data; -} diff --git a/packages/core/net/services.ts b/packages/core/net/services.ts new file mode 100644 index 0000000..43372a3 --- /dev/null +++ b/packages/core/net/services.ts @@ -0,0 +1,30 @@ +import networkDelegate, { type RequestTasks } from "@core/net/delegate.ts"; +import { VideoInfoResponse } from "@core/net/bilibili"; +import { PartialBy } from "@core/lib"; +import { VideoSnapshotType } from "@core/drizzle"; + +export class BilibiliService { + private static videoMetadataUrl = "https://api.bilibili.com/x/web-interface/view"; + + private static async getVideoMetadata(aid: number, task: RequestTasks) { + const url = new URL(this.videoMetadataUrl); + url.searchParams.set("aid", aid.toString()); + return networkDelegate.request(url.toString(), task); + } + + static async milestoneSnapshot(aid: number): Promise> { + const metadata = await this.getVideoMetadata(aid, "snapshotMilestoneVideo"); + const stats = metadata.data.data.stat; + return { + aid, + createdAt: new Date(metadata.time).toISOString(), + views: stats.view, + likes: stats.like, + coins: stats.coin, + favorites: stats.favorite, + replies: stats.reply, + shares: stats.share, + danmakus: stats.danmaku + }; + } +} diff --git a/packages/core/test/netDelegate.test.ts b/packages/core/test/netDelegate.test.ts index d531b3c..e58485a 100644 --- a/packages/core/test/netDelegate.test.ts +++ b/packages/core/test/netDelegate.test.ts @@ -3,7 +3,18 @@ import { test, expect, describe } from "bun:test"; describe("proxying requests", () => { test("Alibaba Cloud FC", async () => { - const { res } = await networkDelegate.request("https://postman-echo.com/get", "test") as any; - expect(res.headers.referer).toBe('https://www.bilibili.com/'); + const { data } = (await networkDelegate.request<{ + headers: Record; + }>( + "https://postman-echo.com/get", + "test" + )); + expect(data.headers.referer).toBe('https://www.bilibili.com/'); + }); + test("IP Proxy", async () => { + const { data } = await networkDelegate.request<{ + headers: Record; + }>("https://postman-echo.com/get", "test_ip"); + expect(data.headers).toBeObject(); }); }); diff --git a/packages/crawler/mq/exec/classifyVideo.ts b/packages/crawler/mq/exec/classifyVideo.ts index b361136..883bc1a 100644 --- a/packages/crawler/mq/exec/classifyVideo.ts +++ b/packages/crawler/mq/exec/classifyVideo.ts @@ -61,9 +61,9 @@ export const classifyVideosWorker = async () => { const videos = await getUnlabelledVideos(); logger.log(`Found ${videos.length} unlabelled videos`); - const startTime = new Date().getTime(); + const startTime = Date.now(); for (const aid of videos) { - const now = new Date().getTime(); + const now = Date.now(); if (now - startTime > 4.2 * MINUTE) { await lockManager.releaseLock("classifyVideos"); return 1; diff --git a/packages/crawler/mq/exec/snapshotVideo.ts b/packages/crawler/mq/exec/snapshotVideo.ts index d386bad..4b18537 100644 --- a/packages/crawler/mq/exec/snapshotVideo.ts +++ b/packages/crawler/mq/exec/snapshotVideo.ts @@ -15,16 +15,16 @@ import { NetSchedulerError } from "@core/net/delegate"; import { sql } from "@core/db/dbNew"; import { closetMilestone } from "./snapshotTick"; -const snapshotTypeToTaskMap: { [key: string]: string } = { +const snapshotTypeToTaskMap = { milestone: "snapshotMilestoneVideo", normal: "snapshotVideo", new: "snapshotMilestoneVideo" -}; +} as const; export const snapshotVideoWorker = async (job: Job): Promise => { const id = job.data.id; const aid = Number(job.data.aid); - const type = job.data.type; + const type = job.data.type as "milestone" | "normal" | "new"; const task = snapshotTypeToTaskMap[type] ?? "snapshotVideo"; const retryInterval = type === "milestone" ? 5 * SECOND : 2 * MINUTE; const latestSnapshot = await getLatestSnapshot(sql, aid); diff --git a/packages/crawler/mq/exec/takeBulkSnapshot.ts b/packages/crawler/mq/exec/takeBulkSnapshot.ts index 735d7fe..e62b7d1 100644 --- a/packages/crawler/mq/exec/takeBulkSnapshot.ts +++ b/packages/crawler/mq/exec/takeBulkSnapshot.ts @@ -52,7 +52,7 @@ export const takeBulkSnapshotForVideosWorker = async (job: Job) => { if (currentSnapshot) { const DELTA = 0.0001; const viewsDiff = views - currentSnapshot.views; - const hoursDiff = (new Date().getTime() - currentSnapshot.created_at) / HOUR; + const hoursDiff = (Date.now() - currentSnapshot.created_at) / HOUR; const speed = viewsDiff / (hoursDiff + DELTA); const target = closetMilestone(views, true); const viewsToIncrease = target - views; diff --git a/packages/crawler/mq/scheduling.ts b/packages/crawler/mq/scheduling.ts index 8dbf11b..e267b8a 100644 --- a/packages/crawler/mq/scheduling.ts +++ b/packages/crawler/mq/scheduling.ts @@ -33,7 +33,7 @@ export const getAdjustedShortTermETA = async (sql: Psql, aid: number) => { const snapshotsEnough = await hasAtLeast2Snapshots(sql, aid); if (!snapshotsEnough) return 0; - const currentTimestamp = new Date().getTime(); + const currentTimestamp = Date.now(); const timeIntervals = [3 * MINUTE, 20 * MINUTE, HOUR, 3 * HOUR, 6 * HOUR, 72 * HOUR]; const DELTA = 0.00001; let minETAHours = Infinity; diff --git a/packages/crawler/mq/task/getVideoStats.ts b/packages/crawler/mq/task/getVideoStats.ts index c6d02e0..cdfe02c 100644 --- a/packages/crawler/mq/task/getVideoStats.ts +++ b/packages/crawler/mq/task/getVideoStats.ts @@ -28,7 +28,7 @@ export interface SnapshotNumber { export async function takeVideoSnapshot( sql: Psql, aid: number, - task: string + task: "snapshotMilestoneVideo" | "snapshotVideo" ): Promise { const r = await getVideoInfo(aid, task); if (typeof r == "number") { @@ -53,7 +53,7 @@ export async function takeVideoSnapshot( danmakus, replies, aid - }) + }); logger.log(`Taken snapshot for video ${aid}.`, "net", "fn:insertVideoSnapshot"); diff --git a/packages/crawler/mq/task/removeAllTimeoutSchedules.ts b/packages/crawler/mq/task/removeAllTimeoutSchedules.ts index 29f1da1..23149f6 100644 --- a/packages/crawler/mq/task/removeAllTimeoutSchedules.ts +++ b/packages/crawler/mq/task/removeAllTimeoutSchedules.ts @@ -1,5 +1,4 @@ import { sql } from "@core/db/dbNew"; -import logger from "@core/log"; export async function removeAllTimeoutSchedules() { return sql` diff --git a/packages/temp_frontend/app/components/ui/sonner.tsx b/packages/temp_frontend/app/components/ui/sonner.tsx index ec702d9..0e480b8 100644 --- a/packages/temp_frontend/app/components/ui/sonner.tsx +++ b/packages/temp_frontend/app/components/ui/sonner.tsx @@ -1,5 +1,6 @@ import { useTheme } from "next-themes"; import { Toaster as Sonner, type ToasterProps } from "sonner"; +import React from "react"; const Toaster = ({ ...props }: ToasterProps) => { const { theme = "system" } = useTheme();