diff --git a/lib/mq/scheduler.ts b/lib/mq/scheduler.ts index 0843b48..ba9fbb2 100644 --- a/lib/mq/scheduler.ts +++ b/lib/mq/scheduler.ts @@ -3,6 +3,7 @@ import {RateLimiter, RateLimiterConfig} from "lib/mq/rateLimiter.ts"; import {SlidingWindow} from "lib/mq/slidingWindow.ts"; import {redis} from "lib/db/redis.ts"; import Redis from "ioredis"; +import { SECOND } from "$std/datetime/constants.ts"; interface Proxy { type: string; @@ -68,7 +69,20 @@ class NetScheduler { } removeProxy(proxyName: string): void { + if (!this.proxies[proxyName]) { + throw new Error(`Proxy ${proxyName} not found`); + } delete this.proxies[proxyName]; + // Clean up associated limiters + this.cleanupProxyLimiters(proxyName); + } + + private cleanupProxyLimiters(proxyName: string): void { + for (const limiterId in this.proxyLimiters) { + if (limiterId.startsWith(`proxy-${proxyName}`)) { + delete this.proxyLimiters[limiterId]; + } + } } addTask(taskName: string, provider: string, proxies: string[] | "all"): void { @@ -76,6 +90,9 @@ class NetScheduler { } getTaskProxies(taskName: string): string[] { + if (!this.tasks[taskName]) { + return []; + } if (this.tasks[taskName].proxies === "all") { return Object.keys(this.proxies); } @@ -115,7 +132,7 @@ class NetScheduler { } for (const proxyName of bindProxies) { const limiterId = "provider-" + proxyName + "-" + providerName; - this.providerLimiters[providerName] = new RateLimiter(limiterId, config); + this.providerLimiters[limiterId] = new RateLimiter(limiterId, config); } } @@ -167,17 +184,24 @@ class NetScheduler { throw new NetSchedulerError(`Proxy "${proxyName}" not found`, "PROXY_NOT_FOUND"); } - if (!force && await this.getProxyAvailability(proxyName, task) === false) { - throw new NetSchedulerError(`Proxy "${proxyName}" is rate limited`, "PROXY_RATE_LIMITED"); + if (!force) { + const isAvailable = await this.getProxyAvailability(proxyName, task); + if (!isAvailable) { + throw new NetSchedulerError(`Proxy "${proxyName}" is rate limited`, "PROXY_RATE_LIMITED"); + } } + const result = await this.makeRequest(url, proxy, method); await this.triggerLimiter(task, proxyName); + return result; + } + private async makeRequest(url: string, proxy: Proxy, method: string): Promise { switch (proxy.type) { case "native": return await this.nativeRequest(url, method); default: - throw new NetSchedulerError(`Proxy type ${proxy.type} not supported.`, "NOT_IMPLEMENTED"); + throw new NetSchedulerError(`Proxy type ${proxy.type} not supported`, "NOT_IMPLEMENTED"); } } @@ -208,7 +232,16 @@ class NetScheduler { private async nativeRequest(url: string, method: string): Promise { try { - const response = await fetch(url, { method }); + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), 10 * SECOND); + + const response = await fetch(url, { + method, + signal: controller.signal + }); + + clearTimeout(timeout); + return await response.json() as R; } catch (e) { throw new NetSchedulerError("Fetch error", "FETCH_ERROR", e);