diff --git a/lib/mq/scheduler.ts b/lib/mq/scheduler.ts index 14ed12f..0843b48 100644 --- a/lib/mq/scheduler.ts +++ b/lib/mq/scheduler.ts @@ -1,14 +1,17 @@ import logger from "lib/log/logger.ts"; -import {RateLimiter} from "lib/mq/rateLimiter.ts"; +import {RateLimiter, RateLimiterConfig} from "lib/mq/rateLimiter.ts"; import {SlidingWindow} from "lib/mq/slidingWindow.ts"; import {redis} from "lib/db/redis.ts"; import Redis from "ioredis"; interface Proxy { type: string; - task: string; + data: string; +} + +interface Task { provider: string; - limiter?: RateLimiter; + proxies: string[] | "all"; } interface ProxiesMap { @@ -33,28 +36,87 @@ export class NetSchedulerError extends Error { } } -interface LimiterMap { +type LimiterMap = { [name: string]: RateLimiter; +}; + +type OptionalLimiterMap = { + [name: string]: RateLimiter | null; +}; + +type TaskMap = { + [name: string]: Task; +}; + +function shuffleArray(array: T[]): T[] { + const newArray = [...array]; // Create a shallow copy to avoid in-place modification + for (let i = newArray.length - 1; i > 0; i--) { + const j = Math.floor(Math.random() * (i + 1)); + [newArray[i], newArray[j]] = [newArray[j], newArray[i]]; // Swap elements + } + return newArray; } class NetScheduler { private proxies: ProxiesMap = {}; private providerLimiters: LimiterMap = {}; + private proxyLimiters: OptionalLimiterMap = {}; + private tasks: TaskMap = {}; - addProxy(name: string, type: string, task: string, provider: string): void { - this.proxies[name] = { type, task, provider }; + addProxy(proxyName: string, type: string, data: string): void { + this.proxies[proxyName] = { type, data }; } - removeProxy(name: string): void { - delete this.proxies[name]; + removeProxy(proxyName: string): void { + delete this.proxies[proxyName]; } - setProxyLimiter(name: string, limiter: RateLimiter): void { - this.proxies[name].limiter = limiter; + addTask(taskName: string, provider: string, proxies: string[] | "all"): void { + this.tasks[taskName] = { provider, proxies }; } - setProviderLimiter(name: string, limiter: RateLimiter): void { - this.providerLimiters[name] = limiter; + getTaskProxies(taskName: string): string[] { + if (this.tasks[taskName].proxies === "all") { + return Object.keys(this.proxies); + } + return this.tasks[taskName].proxies; + } + + setTaskLimiter(taskName: string, config: RateLimiterConfig[] | null): void { + const proxies = this.getTaskProxies(taskName); + for (const proxyName of proxies) { + const limiterId = "proxy-" + proxyName + "-" + taskName; + this.proxyLimiters[limiterId] = config ? new RateLimiter(limiterId, config) : null; + } + } + + async triggerLimiter(task: string, proxy: string): Promise { + const limiterId = proxy + "-" + task; + if (!this.proxyLimiters[limiterId]) { + return; + } + try { + await this.proxyLimiters[limiterId].trigger(); + } catch (e) { + const error = e as Error; + if (e instanceof Redis.ReplyError) { + logger.error(error, "redis"); + } + logger.warn(`Unhandled error: ${error.message}`, "mq", "proxyRequest"); + } + } + + setProviderLimiter(providerName: string, config: RateLimiterConfig[]): void { + let bindProxies: string[] = []; + for (const taskName in this.tasks) { + if (this.tasks[taskName].provider !== providerName) continue; + const proxies = this.getTaskProxies(taskName); + bindProxies = bindProxies.concat(proxies); + } + for (const proxyName of bindProxies) { + const limiterId = "provider-" + proxyName + "-" + providerName; + this.providerLimiters[providerName] = new RateLimiter(limiterId, config); + } } /* @@ -70,11 +132,9 @@ class NetScheduler { */ async request(url: string, task: string, method: string = "GET"): Promise { // find a available proxy - const proxiesNames = Object.keys(this.proxies); - for (const proxyName of proxiesNames) { - const proxy = this.proxies[proxyName]; - if (proxy.task !== task) continue; - if (await this.getProxyAvailability(proxyName)) { + const proxiesNames = this.getTaskProxies(task); + for (const proxyName of shuffleArray(proxiesNames)) { + if (await this.getProxyAvailability(proxyName, task)) { return await this.proxyRequest(url, proxyName, method); } } @@ -85,6 +145,7 @@ class NetScheduler { * Make a request to the specified URL with the specified proxy * @param {string} url - The URL to request. * @param {string} proxyName - The name of the proxy to use. + * @param {string} task - The name of the task to use. * @param {string} method - The HTTP method to use for the request. Default is "GET". * @param {boolean} force - If true, the request will be made even if the proxy is rate limited. Default is false. * @returns {Promise} - A promise that resolves to the response body. @@ -94,27 +155,23 @@ class NetScheduler { * - The native `fetch` function threw an error: with error code FETCH_ERROR * - The proxy type is not supported: with error code NOT_IMPLEMENTED */ - async proxyRequest(url: string, proxyName: string, method: string = "GET", force: boolean = false): Promise { + async proxyRequest( + url: string, + proxyName: string, + task: string, + method: string = "GET", + force: boolean = false, + ): Promise { const proxy = this.proxies[proxyName]; if (!proxy) { throw new NetSchedulerError(`Proxy "${proxyName}" not found`, "PROXY_NOT_FOUND"); } - if (!force && await this.getProxyAvailability(proxyName) === false) { + if (!force && await this.getProxyAvailability(proxyName, task) === false) { throw new NetSchedulerError(`Proxy "${proxyName}" is rate limited`, "PROXY_RATE_LIMITED"); } - if (proxy.limiter) { - try { - await proxy.limiter!.trigger(); - } catch (e) { - const error = e as Error; - if (e instanceof Redis.ReplyError) { - logger.error(error, "redis"); - } - logger.warn(`Unhandled error: ${error.message}`, "mq", "proxyRequest"); - } - } + await this.triggerLimiter(task, proxyName); switch (proxy.type) { case "native": @@ -124,21 +181,20 @@ class NetScheduler { } } - private async getProxyAvailability(name: string): Promise { + private async getProxyAvailability(proxyName: string, taskName: string): Promise { try { - const proxyConfig = this.proxies[name]; - if (!proxyConfig) { + const task = this.tasks[taskName]; + const provider = task.provider; + const proxyLimiterId = "proxy-" + proxyName + "-" + task; + const providerLimiterId = "provider-" + proxyName + "-" + provider; + if (!this.proxyLimiters[proxyLimiterId] || !this.providerLimiters[providerLimiterId]) { return true; } - const provider = proxyConfig.provider; - const providerLimiter = await this.providerLimiters[provider].getAvailability(); - if (!providerLimiter) { - return false; - } - if (!proxyConfig.limiter) { - return true; - } - return await proxyConfig.limiter.getAvailability(); + const proxyLimiter = this.proxyLimiters[proxyLimiterId]; + const providerLimiter = this.providerLimiters[providerLimiterId]; + const providerAvailable = await providerLimiter.getAvailability(); + const proxyAvailable = await proxyLimiter.getAvailability(); + return providerAvailable && proxyAvailable; } catch (e) { const error = e as Error; if (e instanceof Redis.ReplyError) { @@ -161,9 +217,7 @@ class NetScheduler { } const netScheduler = new NetScheduler(); -netScheduler.addProxy("default", "native", "default", "bilibili-native"); -netScheduler.addProxy("tags-native", "native", "getVideoTags", "bilibili-native"); -const tagsRateLimiter = new RateLimiter("getVideoTags", [ +const videoInfoRateLimiterConfig: RateLimiterConfig[] = [ { window: new SlidingWindow(redis, 1), max: 3, @@ -176,22 +230,26 @@ const tagsRateLimiter = new RateLimiter("getVideoTags", [ window: new SlidingWindow(redis, 2 * 60), max: 50, }, -]); -const biliLimiterNative = new RateLimiter("bilibili-native", [ +]; +const biliLimiterConfig: RateLimiterConfig[] = [ { window: new SlidingWindow(redis, 1), - max: 5 + max: 5, }, { window: new SlidingWindow(redis, 30), - max: 100 + max: 100, }, { window: new SlidingWindow(redis, 5 * 60), - max: 180 - } -]); -netScheduler.setProxyLimiter("tags-native", tagsRateLimiter); -netScheduler.setProviderLimiter("bilibili-native", biliLimiterNative) + max: 180, + }, +]; +netScheduler.addProxy("native", "native", ""); +netScheduler.addTask("getVideoInfo", "bilibili", "all"); +netScheduler.addTask("getLatestVideos", "bilibili", "all"); +netScheduler.setTaskLimiter("getVideoInfo", videoInfoRateLimiterConfig) +netScheduler.setTaskLimiter("getLatestVideos", null); +netScheduler.setProviderLimiter("bilibili", biliLimiterConfig); export default netScheduler; diff --git a/lib/net/getLatestVideos.ts b/lib/net/getLatestVideos.ts index a3db732..b41eae5 100644 --- a/lib/net/getLatestVideos.ts +++ b/lib/net/getLatestVideos.ts @@ -1,29 +1,36 @@ -import { VideoListResponse, VideoListVideo } from "lib/net/bilibili.d.ts"; +import {VideoListResponse } from "lib/net/bilibili.d.ts"; import logger from "lib/log/logger.ts"; +import netScheduler, {NetSchedulerError} from "lib/mq/scheduler.ts"; -export async function getLatestVideos( - page: number = 1, - pageSize: number = 10 -): Promise { +export async function getLatestVideoAids(page: number = 1, pageSize: number = 10): Promise { + const startFrom = 1 + pageSize * (page - 1); + const endTo = pageSize * page; + const range = `${startFrom}-${endTo}` + const errMessage = `Error fetching latest aid for ${range}:` try { - const response = await fetch( - `https://api.bilibili.com/x/web-interface/newlist?rid=30&ps=${pageSize}&pn=${page}`, - ); - const data: VideoListResponse = await response.json(); - - if (data.code !== 0) { - logger.error(`Error fetching videos: ${data.message}`, "net", "getLatestVideos"); - return null; + const url = `https://api.bilibili.com/x/web-interface/newlist?rid=30&ps=${pageSize}&pn=${page}`; + const data = await netScheduler.request(url, 'getLatestVideos'); + if (data.code != 0) { + logger.error(errMessage + data.message, 'net', 'getLastestVideos'); + return []; } - if (data.data.archives.length === 0) { logger.verbose("No more videos found", "net", "getLatestVideos"); return []; } - - return data.data.archives; - } catch (error) { - logger.error(error as Error, "net", "getLatestVideos"); - return null; + return data.data.archives.map(video => video.aid); + } + catch (e) { + const error = e as NetSchedulerError; + if (error.code == "FETCH_ERROR") { + const rawError = error.rawError! as Error; + rawError.message = errMessage + rawError.message; + logger.error(rawError, 'net', 'getVideoTags'); + return null; + } + else { + // Re-throw the error + throw e; + } } }