fix: incorrect key in setProviderLimiter
of Scheduler
This commit is contained in:
parent
7566722d04
commit
c67e3d8e36
@ -3,6 +3,7 @@ import {RateLimiter, RateLimiterConfig} from "lib/mq/rateLimiter.ts";
|
|||||||
import {SlidingWindow} from "lib/mq/slidingWindow.ts";
|
import {SlidingWindow} from "lib/mq/slidingWindow.ts";
|
||||||
import {redis} from "lib/db/redis.ts";
|
import {redis} from "lib/db/redis.ts";
|
||||||
import Redis from "ioredis";
|
import Redis from "ioredis";
|
||||||
|
import { SECOND } from "$std/datetime/constants.ts";
|
||||||
|
|
||||||
interface Proxy {
|
interface Proxy {
|
||||||
type: string;
|
type: string;
|
||||||
@ -68,7 +69,20 @@ class NetScheduler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
removeProxy(proxyName: string): void {
|
removeProxy(proxyName: string): void {
|
||||||
|
if (!this.proxies[proxyName]) {
|
||||||
|
throw new Error(`Proxy ${proxyName} not found`);
|
||||||
|
}
|
||||||
delete this.proxies[proxyName];
|
delete this.proxies[proxyName];
|
||||||
|
// Clean up associated limiters
|
||||||
|
this.cleanupProxyLimiters(proxyName);
|
||||||
|
}
|
||||||
|
|
||||||
|
private cleanupProxyLimiters(proxyName: string): void {
|
||||||
|
for (const limiterId in this.proxyLimiters) {
|
||||||
|
if (limiterId.startsWith(`proxy-${proxyName}`)) {
|
||||||
|
delete this.proxyLimiters[limiterId];
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
addTask(taskName: string, provider: string, proxies: string[] | "all"): void {
|
addTask(taskName: string, provider: string, proxies: string[] | "all"): void {
|
||||||
@ -76,6 +90,9 @@ class NetScheduler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
getTaskProxies(taskName: string): string[] {
|
getTaskProxies(taskName: string): string[] {
|
||||||
|
if (!this.tasks[taskName]) {
|
||||||
|
return [];
|
||||||
|
}
|
||||||
if (this.tasks[taskName].proxies === "all") {
|
if (this.tasks[taskName].proxies === "all") {
|
||||||
return Object.keys(this.proxies);
|
return Object.keys(this.proxies);
|
||||||
}
|
}
|
||||||
@ -115,7 +132,7 @@ class NetScheduler {
|
|||||||
}
|
}
|
||||||
for (const proxyName of bindProxies) {
|
for (const proxyName of bindProxies) {
|
||||||
const limiterId = "provider-" + proxyName + "-" + providerName;
|
const limiterId = "provider-" + proxyName + "-" + providerName;
|
||||||
this.providerLimiters[providerName] = new RateLimiter(limiterId, config);
|
this.providerLimiters[limiterId] = new RateLimiter(limiterId, config);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -167,17 +184,24 @@ class NetScheduler {
|
|||||||
throw new NetSchedulerError(`Proxy "${proxyName}" not found`, "PROXY_NOT_FOUND");
|
throw new NetSchedulerError(`Proxy "${proxyName}" not found`, "PROXY_NOT_FOUND");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!force && await this.getProxyAvailability(proxyName, task) === false) {
|
if (!force) {
|
||||||
throw new NetSchedulerError(`Proxy "${proxyName}" is rate limited`, "PROXY_RATE_LIMITED");
|
const isAvailable = await this.getProxyAvailability(proxyName, task);
|
||||||
|
if (!isAvailable) {
|
||||||
|
throw new NetSchedulerError(`Proxy "${proxyName}" is rate limited`, "PROXY_RATE_LIMITED");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const result = await this.makeRequest<R>(url, proxy, method);
|
||||||
await this.triggerLimiter(task, proxyName);
|
await this.triggerLimiter(task, proxyName);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private async makeRequest<R>(url: string, proxy: Proxy, method: string): Promise<R> {
|
||||||
switch (proxy.type) {
|
switch (proxy.type) {
|
||||||
case "native":
|
case "native":
|
||||||
return await this.nativeRequest<R>(url, method);
|
return await this.nativeRequest<R>(url, method);
|
||||||
default:
|
default:
|
||||||
throw new NetSchedulerError(`Proxy type ${proxy.type} not supported.`, "NOT_IMPLEMENTED");
|
throw new NetSchedulerError(`Proxy type ${proxy.type} not supported`, "NOT_IMPLEMENTED");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -208,7 +232,16 @@ class NetScheduler {
|
|||||||
|
|
||||||
private async nativeRequest<R>(url: string, method: string): Promise<R> {
|
private async nativeRequest<R>(url: string, method: string): Promise<R> {
|
||||||
try {
|
try {
|
||||||
const response = await fetch(url, { method });
|
const controller = new AbortController();
|
||||||
|
const timeout = setTimeout(() => controller.abort(), 10 * SECOND);
|
||||||
|
|
||||||
|
const response = await fetch(url, {
|
||||||
|
method,
|
||||||
|
signal: controller.signal
|
||||||
|
});
|
||||||
|
|
||||||
|
clearTimeout(timeout);
|
||||||
|
|
||||||
return await response.json() as R;
|
return await response.json() as R;
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
throw new NetSchedulerError("Fetch error", "FETCH_ERROR", e);
|
throw new NetSchedulerError("Fetch error", "FETCH_ERROR", e);
|
||||||
|
Loading…
Reference in New Issue
Block a user