diff --git a/.gitignore b/.gitignore index 5080ef6..c9064be 100644 --- a/.gitignore +++ b/.gitignore @@ -16,14 +16,14 @@ node_modules/ # project specific logs/ __pycache__ -ml/filter/runs -ml/pred/runs -ml/pred/checkpoints -ml/pred/observed -ml/data/ -ml/filter/checkpoints -scripts -model/ +/ml/filter/runs +/ml/pred/runs +/ml/pred/checkpoints +/ml/pred/observed +/ml/data/ +/ml/filter/checkpoints +/scripts +/model .astro @@ -32,19 +32,17 @@ model/ *.db *.sqlite *.sqlite3 -data/ -redis/ +/data +/redis # Build dist/ build/ -docker-compose.yml - -ucaptcha-config.yaml - temp/ -meili +datasets -datasets \ No newline at end of file +mutagen.yml + +mutagen.yml.lock \ No newline at end of file diff --git a/.idea/data_source_mapping.xml b/.idea/data_source_mapping.xml index e369253..7bc8724 100644 --- a/.idea/data_source_mapping.xml +++ b/.idea/data_source_mapping.xml @@ -3,5 +3,7 @@ + + \ No newline at end of file diff --git a/.idea/sqldialects.xml b/.idea/sqldialects.xml index 6df4889..bf5df38 100644 --- a/.idea/sqldialects.xml +++ b/.idea/sqldialects.xml @@ -1,6 +1,8 @@ + + \ No newline at end of file diff --git a/Dockerfile.backend b/Dockerfile.backend deleted file mode 100644 index 12618e9..0000000 --- a/Dockerfile.backend +++ /dev/null @@ -1,23 +0,0 @@ -FROM oven/bun:1.2.8-debian - -WORKDIR /app - -COPY ./packages/core ./core - -COPY ./packages/backend/package.json ./packages/backend/bun.lock ./backend/ - -RUN apt update && apt install -y curl - -RUN ln -s /bin/uname /usr/bin/uname - -RUN /bin/bash -c "$(curl -fsSL https://aliyuncli.alicdn.com/install.sh)" - -WORKDIR backend - -RUN bun install - -COPY ./packages/backend/ . - -RUN mkdir -p /app/logs - -CMD ["bun", "start"] \ No newline at end of file diff --git a/Dockerfile.crawler b/Dockerfile.crawler deleted file mode 100644 index aeb8230..0000000 --- a/Dockerfile.crawler +++ /dev/null @@ -1,19 +0,0 @@ -FROM oven/bun:1.2.8-debian - -WORKDIR /app - -COPY . . - -RUN bun i - -RUN mkdir -p /app/logs - -RUN apt update && apt install -y curl - -RUN ln -s /bin/uname /usr/bin/uname - -RUN /bin/bash -c "$(curl -fsSL https://aliyuncli.alicdn.com/install.sh)" - -WORKDIR packages/crawler - -CMD ["bun", "all"] \ No newline at end of file diff --git a/Dockerfile.frontend b/Dockerfile.frontend deleted file mode 100644 index 4260127..0000000 --- a/Dockerfile.frontend +++ /dev/null @@ -1,23 +0,0 @@ -FROM oven/bun - -ARG BACKEND_URL - -WORKDIR /app - -COPY . . - -RUN bun install - -WORKDIR packages/frontend - -RUN bun run build - -ENV HOST=0.0.0.0 -ENV PORT=4321 -ENV BACKEND_URL=${BACKEND_URL} - -EXPOSE 4321 - -RUN mkdir -p /app/logs - -CMD ["bun", "/app/packages/frontend/dist/server/entry.mjs"] diff --git a/Dockerfile.next b/Dockerfile.next deleted file mode 100644 index 081a8b0..0000000 --- a/Dockerfile.next +++ /dev/null @@ -1,14 +0,0 @@ -FROM node:lts-slim AS production - -WORKDIR /app - -COPY ./packages/next/.next ./.next -COPY ./packages/next/public ./public -COPY ./packages/next/package.json ./package.json -COPY ./packages/next/node_modules ./node_modules - -ENV NODE_ENV production - -EXPOSE 7400 - -CMD ["npm", "start"] diff --git a/docker-compose.example.yml b/docker-compose.example.yml deleted file mode 100644 index f01d5b6..0000000 --- a/docker-compose.example.yml +++ /dev/null @@ -1,71 +0,0 @@ -version: '3.8' - -services: - db: - image: postgres:17 - ports: - - "5431:5432" - environment: - POSTGRES_USER: cvsa - POSTGRES_PASSWORD: "" - POSTGRES_DB: cvsa_main - volumes: - - ./data:/var/lib/postgresql/data - - redis: - image: redis:latest - ports: - - "6378:6379" - volumes: - - ./redis/data:/data - - ./redis/redis.conf:/usr/local/etc/redis/redis.conf - - ./redis/logs:/logs - - frontend: - build: - context: . - dockerfile: Dockerfile.frontend - ports: - - "4321:4321" - environment: - - HOST=0.0.0.0 - - PORT=4321 - - DB_HOST=db - - DB_NAME=cvsa_main - - DB_NAME_CRED=cvsa_cred - - DB_USER=cvsa - - DB_PORT=5432 - - DB_PASSWORD="" - - LOG_VERBOSE=/app/logs/verbose.log - - LOG_WARN=/app/logs/warn.log - - LOG_ERR=/app/logs/error.log - depends_on: - - db - volumes: - - /path/to/your/logs:/app/logs - backend: - build: - context: . - dockerfile: Dockerfile.backend - ports: - - "8000:8000" - environment: - - HOST=0.0.0.0 - - DB_HOST=db - - DB_NAME=cvsa_main - - DB_NAME_CRED=cvsa_cred - - DB_USER=cvsa - - DB_PORT=5432 - - DB_PASSWORD="" - - LOG_VERBOSE=/app/logs/verbose.log - - LOG_WARN=/app/logs/warn.log - - LOG_ERR=/app/logs/error.log - - REDIS_HOST=redis - - REDIS_PORT=6379 - depends_on: - - db - volumes: - - /path/to/your/logs:/app/logs - -volumes: - db_data: \ No newline at end of file diff --git a/packages/core/net/delegate.ts b/packages/core/net/delegate.ts index 6292713..b48cfb9 100644 --- a/packages/core/net/delegate.ts +++ b/packages/core/net/delegate.ts @@ -14,6 +14,7 @@ import * as OpenApi from "@alicloud/openapi-client"; import Stream from "@alicloud/darabonba-stream"; import * as Util from "@alicloud/tea-util"; import { Readable } from "stream"; +import { ipProxyCounter, ipProxyErrorCounter } from "crawler/metrics"; type ProxyType = "native" | "alicloud-fc" | "ip-proxy"; @@ -23,8 +24,7 @@ interface FCResponse { serverTime: number; } -interface NativeProxyData { -} +interface NativeProxyData {} interface AlicloudFcProxyData { region: string; @@ -288,8 +288,12 @@ export class NetworkDelegate { if (isIpProxy(proxyDef)) { this.ipPools[proxyName] = new IPPoolManager(proxyDef.data); // Initialize asynchronously but don't wait - this.ipPools[proxyName].initialize().catch(error => { - logger.error(error as Error, "net", `Failed to initialize IP pool for ${proxyName}`); + this.ipPools[proxyName].initialize().catch((error) => { + logger.error( + error as Error, + "net", + `Failed to initialize IP pool for ${proxyName}` + ); }); } } @@ -416,14 +420,27 @@ export class NetworkDelegate { return await this.nativeRequest(url); case "alicloud-fc": if (!isAlicloudFcProxy(proxy)) { - throw new NetSchedulerError("Invalid alicloud-fc proxy configuration", "ALICLOUD_PROXY_ERR"); + throw new NetSchedulerError( + "Invalid alicloud-fc proxy configuration", + "ALICLOUD_PROXY_ERR" + ); } return await this.alicloudFcRequest(url, proxy.data); case "ip-proxy": if (!isIpProxy(proxy)) { - throw new NetSchedulerError("Invalid ip-proxy configuration", "NOT_IMPLEMENTED"); + throw new NetSchedulerError( + "Invalid ip-proxy configuration", + "NOT_IMPLEMENTED" + ); + } + try { + return await this.ipProxyRequest(url, proxy); + } catch (e) { + ipProxyErrorCounter.add(1); + throw e; + } finally { + ipProxyCounter.add(1); } - return await this.ipProxyRequest(url, proxy); default: throw new NetSchedulerError( `Proxy type ${proxy.type} not supported`, @@ -501,44 +518,69 @@ export class NetworkDelegate { url: string, proxyDef: ProxyDef ): Promise<{ data: R; time: number }> { - const proxyName = Object.entries(this.proxies).find(([_, proxy]) => proxy === proxyDef)?.[0]; + const proxyName = Object.entries(this.proxies).find( + ([_, proxy]) => proxy === proxyDef + )?.[0]; if (!proxyName || !this.ipPools[proxyName]) { throw new NetSchedulerError("IP pool not found", "IP_POOL_EXHAUSTED"); } const ipPool = this.ipPools[proxyName]; - const ipEntry = await ipPool.getNextIP(); + const maxRetries = 3; - if (!ipEntry) { - throw new NetSchedulerError("No IP available in pool", "IP_POOL_EXHAUSTED"); + let lastError: Error | null = null; + + for (let attempt = 0; attempt < maxRetries; attempt++) { + const ipEntry = await ipPool.getNextIP(); + + if (!ipEntry) { + throw new NetSchedulerError("No IP available in pool", "IP_POOL_EXHAUSTED"); + } + + try { + const controller = new AbortController(); + const now = Date.now(); + const timeout = setTimeout( + () => controller.abort(), + ipEntry.lifespan - (now - ipEntry.createdAt) + ); + + const response = await fetch(url, { + signal: controller.signal, + proxy: `http://${ipEntry.address}:${ipEntry.port}` + }); + + clearTimeout(timeout); + + const start = Date.now(); + const data = await response.json(); + const end = Date.now(); + const serverTime = start + (end - start) / 2; + + return { data: data as R, time: serverTime }; + } catch (error) { + lastError = error as Error; + + // If this is not the last attempt, retry immediately + if (attempt < maxRetries - 1) { + continue; + } + + throw new NetSchedulerError( + "IP proxy request failed", + "IP_EXTRACTION_FAILED", + error + ); + } finally { + await ipPool.markIPUsed(ipEntry.address); + } } - try { - const controller = new AbortController(); - const now = Date.now(); - const timeout = setTimeout(() => controller.abort(), ipEntry.lifespan - (now - ipEntry.createdAt)); - - const response = await fetch(url, { - signal: controller.signal, - proxy: `http://${ipEntry.address}:${ipEntry.port}` - }); - - clearTimeout(timeout); - - const start = Date.now(); - const data = await response.json(); - const end = Date.now(); - const serverTime = start + (end - start) / 2; - - // Mark IP as used - await ipPool.markIPUsed(ipEntry.address); - - return { data: data as R, time: serverTime }; - } catch (error) { - // Mark IP as used even if request failed (single-use strategy) - await ipPool.markIPUsed(ipEntry.address); - throw new NetSchedulerError("IP proxy request failed", "IP_EXTRACTION_FAILED", error); - } + throw new NetSchedulerError( + "IP proxy request failed after all retries", + "IP_EXTRACTION_FAILED", + lastError + ); } } @@ -585,12 +627,12 @@ const proxies = { port: number; endtime: string; city: string; - }[] + }[]; } const url = Bun.env.IP_PROXY_EXTRACTOR_URL; const response = await fetch(url); - const data = await response.json() as APIResponse; - if (data.code !== 0){ + const data = (await response.json()) as APIResponse; + if (data.code !== 0) { throw new Error(`IP proxy extractor failed with code ${data.code}`); } const ips = data.data; @@ -598,15 +640,15 @@ const proxies = { return { address: item.ip, port: item.port, - lifespan: Date.parse(item.endtime+'+08') - Date.now(), + lifespan: Date.parse(item.endtime + "+08") - Date.now(), createdAt: Date.now(), used: false - } - }) + }; + }); }, strategy: "round-robin", minPoolSize: 10, - maxPoolSize: 500, + maxPoolSize: 100, refreshInterval: 5 * SECOND, initialPoolSize: 10 } @@ -621,7 +663,7 @@ const config = { proxies: proxies, providers: { test: { limiters: [] }, - bilibili: { limiters: biliLimiterConfig } + bilibili: { limiters: [] } }, tasks: { test: { diff --git a/packages/crawler/metrics/index.ts b/packages/crawler/metrics/index.ts index 9851cfc..8e20b56 100644 --- a/packages/crawler/metrics/index.ts +++ b/packages/crawler/metrics/index.ts @@ -15,6 +15,15 @@ const meterProvider = new MeterProvider({ }); const meter = meterProvider.getMeter("bullmq-worker"); +const anotherMeter = meterProvider.getMeter("networking"); + +export const ipProxyCounter = anotherMeter.createCounter("ip_proxy_count", { + description: "Number of requests using IP proxy" +}); + +export const ipProxyErrorCounter = anotherMeter.createCounter("ip_proxy_error_count", { + description: "Number of errors thrown by IP proxy" +}); export const jobCounter = meter.createCounter("job_count", { description: "Number of executed BullMQ jobs" @@ -30,4 +39,4 @@ export const jobDurationRaw = meter.createGauge("job_duration_raw", { export const snapshotCounter = meter.createCounter("snapshot_count", { description: "Number of snapshots taken" -}); \ No newline at end of file +}); diff --git a/queries/schedule_count.sql b/queries/schedule_count.sql new file mode 100644 index 0000000..8e99df3 --- /dev/null +++ b/queries/schedule_count.sql @@ -0,0 +1,14 @@ +SET TIME ZONE 'Asia/Shanghai'; +SELECT + date_trunc('hour', started_at) AS time_slot, + type, + COUNT(*) AS record_count +FROM + snapshot_schedule +WHERE + started_at <= NOW() + INTERVAL '24 hours' + AND started_at >= NOW() +GROUP BY + type, time_slot +ORDER BY + type, time_slot \ No newline at end of file diff --git a/queries/snapshots_count.sql b/queries/snapshots_count.sql new file mode 100644 index 0000000..0a29b16 --- /dev/null +++ b/queries/snapshots_count.sql @@ -0,0 +1,14 @@ +SELECT + date_trunc('hour', created_at) + + (floor(extract(minute from created_at) / 10) * 10 || ' minutes')::interval + AS time_slot, + COUNT(*) AS record_count +FROM + video_snapshot +WHERE + created_at >= NOW() - INTERVAL '48 hours' + AND created_at <= NOW() +GROUP BY + time_slot +ORDER BY + time_slot; \ No newline at end of file