1
0

update: error handling & metrics for IP proxies

This commit is contained in:
alikia2x (寒寒) 2025-12-14 06:06:22 +08:00
parent fc610d417b
commit ec90557be0
WARNING! Although there is a key with this ID in the database it does not verify this commit! This commit is SUSPICIOUS.
GPG Key ID: 56209E0CCD8420C6
12 changed files with 143 additions and 212 deletions

30
.gitignore vendored
View File

@ -16,14 +16,14 @@ node_modules/
# project specific
logs/
__pycache__
ml/filter/runs
ml/pred/runs
ml/pred/checkpoints
ml/pred/observed
ml/data/
ml/filter/checkpoints
scripts
model/
/ml/filter/runs
/ml/pred/runs
/ml/pred/checkpoints
/ml/pred/observed
/ml/data/
/ml/filter/checkpoints
/scripts
/model
.astro
@ -32,19 +32,17 @@ model/
*.db
*.sqlite
*.sqlite3
data/
redis/
/data
/redis
# Build
dist/
build/
docker-compose.yml
ucaptcha-config.yaml
temp/
meili
datasets
datasets
mutagen.yml
mutagen.yml.lock

View File

@ -3,5 +3,7 @@
<component name="DataSourcePerFileMappings">
<file url="file://$PROJECT_DIR$/packages/crawler/db/snapshot.ts" value="0d2dd3d3-bd27-4e5f-b0fa-ff14fb2a6bef" />
<file url="file://$PROJECT_DIR$/packages/crawler/mq/task/removeAllTimeoutSchedules.ts" value="0d2dd3d3-bd27-4e5f-b0fa-ff14fb2a6bef" />
<file url="file://$PROJECT_DIR$/queries/schedule_count.sql" value="0d2dd3d3-bd27-4e5f-b0fa-ff14fb2a6bef" />
<file url="file://$PROJECT_DIR$/queries/snapshots_count.sql" value="0d2dd3d3-bd27-4e5f-b0fa-ff14fb2a6bef" />
</component>
</project>

View File

@ -1,6 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="SqlDialectMappings">
<file url="file://$PROJECT_DIR$/queries/schedule_count.sql" dialect="PostgreSQL" />
<file url="file://$PROJECT_DIR$/queries/snapshots_count.sql" dialect="PostgreSQL" />
<file url="PROJECT" dialect="PostgreSQL" />
</component>
</project>

View File

@ -1,23 +0,0 @@
FROM oven/bun:1.2.8-debian
WORKDIR /app
COPY ./packages/core ./core
COPY ./packages/backend/package.json ./packages/backend/bun.lock ./backend/
RUN apt update && apt install -y curl
RUN ln -s /bin/uname /usr/bin/uname
RUN /bin/bash -c "$(curl -fsSL https://aliyuncli.alicdn.com/install.sh)"
WORKDIR backend
RUN bun install
COPY ./packages/backend/ .
RUN mkdir -p /app/logs
CMD ["bun", "start"]

View File

@ -1,19 +0,0 @@
FROM oven/bun:1.2.8-debian
WORKDIR /app
COPY . .
RUN bun i
RUN mkdir -p /app/logs
RUN apt update && apt install -y curl
RUN ln -s /bin/uname /usr/bin/uname
RUN /bin/bash -c "$(curl -fsSL https://aliyuncli.alicdn.com/install.sh)"
WORKDIR packages/crawler
CMD ["bun", "all"]

View File

@ -1,23 +0,0 @@
FROM oven/bun
ARG BACKEND_URL
WORKDIR /app
COPY . .
RUN bun install
WORKDIR packages/frontend
RUN bun run build
ENV HOST=0.0.0.0
ENV PORT=4321
ENV BACKEND_URL=${BACKEND_URL}
EXPOSE 4321
RUN mkdir -p /app/logs
CMD ["bun", "/app/packages/frontend/dist/server/entry.mjs"]

View File

@ -1,14 +0,0 @@
FROM node:lts-slim AS production
WORKDIR /app
COPY ./packages/next/.next ./.next
COPY ./packages/next/public ./public
COPY ./packages/next/package.json ./package.json
COPY ./packages/next/node_modules ./node_modules
ENV NODE_ENV production
EXPOSE 7400
CMD ["npm", "start"]

View File

@ -1,71 +0,0 @@
version: '3.8'
services:
db:
image: postgres:17
ports:
- "5431:5432"
environment:
POSTGRES_USER: cvsa
POSTGRES_PASSWORD: ""
POSTGRES_DB: cvsa_main
volumes:
- ./data:/var/lib/postgresql/data
redis:
image: redis:latest
ports:
- "6378:6379"
volumes:
- ./redis/data:/data
- ./redis/redis.conf:/usr/local/etc/redis/redis.conf
- ./redis/logs:/logs
frontend:
build:
context: .
dockerfile: Dockerfile.frontend
ports:
- "4321:4321"
environment:
- HOST=0.0.0.0
- PORT=4321
- DB_HOST=db
- DB_NAME=cvsa_main
- DB_NAME_CRED=cvsa_cred
- DB_USER=cvsa
- DB_PORT=5432
- DB_PASSWORD=""
- LOG_VERBOSE=/app/logs/verbose.log
- LOG_WARN=/app/logs/warn.log
- LOG_ERR=/app/logs/error.log
depends_on:
- db
volumes:
- /path/to/your/logs:/app/logs
backend:
build:
context: .
dockerfile: Dockerfile.backend
ports:
- "8000:8000"
environment:
- HOST=0.0.0.0
- DB_HOST=db
- DB_NAME=cvsa_main
- DB_NAME_CRED=cvsa_cred
- DB_USER=cvsa
- DB_PORT=5432
- DB_PASSWORD=""
- LOG_VERBOSE=/app/logs/verbose.log
- LOG_WARN=/app/logs/warn.log
- LOG_ERR=/app/logs/error.log
- REDIS_HOST=redis
- REDIS_PORT=6379
depends_on:
- db
volumes:
- /path/to/your/logs:/app/logs
volumes:
db_data:

View File

@ -14,6 +14,7 @@ import * as OpenApi from "@alicloud/openapi-client";
import Stream from "@alicloud/darabonba-stream";
import * as Util from "@alicloud/tea-util";
import { Readable } from "stream";
import { ipProxyCounter, ipProxyErrorCounter } from "crawler/metrics";
type ProxyType = "native" | "alicloud-fc" | "ip-proxy";
@ -23,8 +24,7 @@ interface FCResponse {
serverTime: number;
}
interface NativeProxyData {
}
interface NativeProxyData {}
interface AlicloudFcProxyData {
region: string;
@ -288,8 +288,12 @@ export class NetworkDelegate<const C extends NetworkConfig> {
if (isIpProxy(proxyDef)) {
this.ipPools[proxyName] = new IPPoolManager(proxyDef.data);
// Initialize asynchronously but don't wait
this.ipPools[proxyName].initialize().catch(error => {
logger.error(error as Error, "net", `Failed to initialize IP pool for ${proxyName}`);
this.ipPools[proxyName].initialize().catch((error) => {
logger.error(
error as Error,
"net",
`Failed to initialize IP pool for ${proxyName}`
);
});
}
}
@ -416,14 +420,27 @@ export class NetworkDelegate<const C extends NetworkConfig> {
return await this.nativeRequest<R>(url);
case "alicloud-fc":
if (!isAlicloudFcProxy(proxy)) {
throw new NetSchedulerError("Invalid alicloud-fc proxy configuration", "ALICLOUD_PROXY_ERR");
throw new NetSchedulerError(
"Invalid alicloud-fc proxy configuration",
"ALICLOUD_PROXY_ERR"
);
}
return await this.alicloudFcRequest<R>(url, proxy.data);
case "ip-proxy":
if (!isIpProxy(proxy)) {
throw new NetSchedulerError("Invalid ip-proxy configuration", "NOT_IMPLEMENTED");
throw new NetSchedulerError(
"Invalid ip-proxy configuration",
"NOT_IMPLEMENTED"
);
}
try {
return await this.ipProxyRequest<R>(url, proxy);
} catch (e) {
ipProxyErrorCounter.add(1);
throw e;
} finally {
ipProxyCounter.add(1);
}
return await this.ipProxyRequest<R>(url, proxy);
default:
throw new NetSchedulerError(
`Proxy type ${proxy.type} not supported`,
@ -501,44 +518,69 @@ export class NetworkDelegate<const C extends NetworkConfig> {
url: string,
proxyDef: ProxyDef<IPProxyConfig>
): Promise<{ data: R; time: number }> {
const proxyName = Object.entries(this.proxies).find(([_, proxy]) => proxy === proxyDef)?.[0];
const proxyName = Object.entries(this.proxies).find(
([_, proxy]) => proxy === proxyDef
)?.[0];
if (!proxyName || !this.ipPools[proxyName]) {
throw new NetSchedulerError("IP pool not found", "IP_POOL_EXHAUSTED");
}
const ipPool = this.ipPools[proxyName];
const ipEntry = await ipPool.getNextIP();
const maxRetries = 3;
if (!ipEntry) {
throw new NetSchedulerError("No IP available in pool", "IP_POOL_EXHAUSTED");
let lastError: Error | null = null;
for (let attempt = 0; attempt < maxRetries; attempt++) {
const ipEntry = await ipPool.getNextIP();
if (!ipEntry) {
throw new NetSchedulerError("No IP available in pool", "IP_POOL_EXHAUSTED");
}
try {
const controller = new AbortController();
const now = Date.now();
const timeout = setTimeout(
() => controller.abort(),
ipEntry.lifespan - (now - ipEntry.createdAt)
);
const response = await fetch(url, {
signal: controller.signal,
proxy: `http://${ipEntry.address}:${ipEntry.port}`
});
clearTimeout(timeout);
const start = Date.now();
const data = await response.json();
const end = Date.now();
const serverTime = start + (end - start) / 2;
return { data: data as R, time: serverTime };
} catch (error) {
lastError = error as Error;
// If this is not the last attempt, retry immediately
if (attempt < maxRetries - 1) {
continue;
}
throw new NetSchedulerError(
"IP proxy request failed",
"IP_EXTRACTION_FAILED",
error
);
} finally {
await ipPool.markIPUsed(ipEntry.address);
}
}
try {
const controller = new AbortController();
const now = Date.now();
const timeout = setTimeout(() => controller.abort(), ipEntry.lifespan - (now - ipEntry.createdAt));
const response = await fetch(url, {
signal: controller.signal,
proxy: `http://${ipEntry.address}:${ipEntry.port}`
});
clearTimeout(timeout);
const start = Date.now();
const data = await response.json();
const end = Date.now();
const serverTime = start + (end - start) / 2;
// Mark IP as used
await ipPool.markIPUsed(ipEntry.address);
return { data: data as R, time: serverTime };
} catch (error) {
// Mark IP as used even if request failed (single-use strategy)
await ipPool.markIPUsed(ipEntry.address);
throw new NetSchedulerError("IP proxy request failed", "IP_EXTRACTION_FAILED", error);
}
throw new NetSchedulerError(
"IP proxy request failed after all retries",
"IP_EXTRACTION_FAILED",
lastError
);
}
}
@ -585,12 +627,12 @@ const proxies = {
port: number;
endtime: string;
city: string;
}[]
}[];
}
const url = Bun.env.IP_PROXY_EXTRACTOR_URL;
const response = await fetch(url);
const data = await response.json() as APIResponse;
if (data.code !== 0){
const data = (await response.json()) as APIResponse;
if (data.code !== 0) {
throw new Error(`IP proxy extractor failed with code ${data.code}`);
}
const ips = data.data;
@ -598,15 +640,15 @@ const proxies = {
return {
address: item.ip,
port: item.port,
lifespan: Date.parse(item.endtime+'+08') - Date.now(),
lifespan: Date.parse(item.endtime + "+08") - Date.now(),
createdAt: Date.now(),
used: false
}
})
};
});
},
strategy: "round-robin",
minPoolSize: 10,
maxPoolSize: 500,
maxPoolSize: 100,
refreshInterval: 5 * SECOND,
initialPoolSize: 10
}
@ -621,7 +663,7 @@ const config = {
proxies: proxies,
providers: {
test: { limiters: [] },
bilibili: { limiters: biliLimiterConfig }
bilibili: { limiters: [] }
},
tasks: {
test: {

View File

@ -15,6 +15,15 @@ const meterProvider = new MeterProvider({
});
const meter = meterProvider.getMeter("bullmq-worker");
const anotherMeter = meterProvider.getMeter("networking");
export const ipProxyCounter = anotherMeter.createCounter("ip_proxy_count", {
description: "Number of requests using IP proxy"
});
export const ipProxyErrorCounter = anotherMeter.createCounter("ip_proxy_error_count", {
description: "Number of errors thrown by IP proxy"
});
export const jobCounter = meter.createCounter("job_count", {
description: "Number of executed BullMQ jobs"
@ -30,4 +39,4 @@ export const jobDurationRaw = meter.createGauge("job_duration_raw", {
export const snapshotCounter = meter.createCounter("snapshot_count", {
description: "Number of snapshots taken"
});
});

View File

@ -0,0 +1,14 @@
SET TIME ZONE 'Asia/Shanghai';
SELECT
date_trunc('hour', started_at) AS time_slot,
type,
COUNT(*) AS record_count
FROM
snapshot_schedule
WHERE
started_at <= NOW() + INTERVAL '24 hours'
AND started_at >= NOW()
GROUP BY
type, time_slot
ORDER BY
type, time_slot

View File

@ -0,0 +1,14 @@
SELECT
date_trunc('hour', created_at)
+ (floor(extract(minute from created_at) / 10) * 10 || ' minutes')::interval
AS time_slot,
COUNT(*) AS record_count
FROM
video_snapshot
WHERE
created_at >= NOW() - INTERVAL '48 hours'
AND created_at <= NOW()
GROUP BY
time_slot
ORDER BY
time_slot;