diff --git a/.gitignore b/.gitignore index 31d6ddf..710d3e4 100644 --- a/.gitignore +++ b/.gitignore @@ -51,7 +51,6 @@ internal/ !tests/cases/projects/projectOption/**/node_modules !tests/cases/projects/NodeModulesSearch/**/* !tests/baselines/reference/project/nodeModules*/**/* -.idea yarn.lock yarn-error.log .parallelperf.* @@ -78,10 +77,11 @@ node_modules/ # project specific logs/ __pycache__ -filter/runs -pred/runs -pred/checkpoints -data/ -filter/checkpoints +ml/filter/runs +ml/pred/runs +ml/pred/checkpoints +ml/pred/observed +ml/data/ +ml/filter/checkpoints scripts model/ diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..518076d --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,9 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml +dataSources.xml \ No newline at end of file diff --git a/.idea/cvsa.iml b/.idea/cvsa.iml new file mode 100644 index 0000000..c155925 --- /dev/null +++ b/.idea/cvsa.iml @@ -0,0 +1,21 @@ + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 0000000..5535e8f --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,12 @@ + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..4552e71 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/sqldialects.xml b/.idea/sqldialects.xml new file mode 100644 index 0000000..6df4889 --- /dev/null +++ b/.idea/sqldialects.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/.zed/settings.json b/.zed/settings.json index 97f9ab8..a58d028 100644 --- a/.zed/settings.json +++ b/.zed/settings.json @@ -3,33 +3,33 @@ // For a full list of overridable settings, and general information on folder-specific settings, // see the documentation: https://zed.dev/docs/configuring-zed#settings-files { - "lsp": { - "deno": { - "settings": { - "deno": { - "enable": true - } - } - } - }, - "languages": { - "TypeScript": { - "language_servers": [ - "deno", - "!typescript-language-server", - "!vtsls", - "!eslint" - ], - "formatter": "language_server" - }, - "TSX": { - "language_servers": [ - "deno", - "!typescript-language-server", - "!vtsls", - "!eslint" - ], - "formatter": "language_server" - } - } + "lsp": { + "deno": { + "settings": { + "deno": { + "enable": true + } + } + } + }, + "languages": { + "TypeScript": { + "language_servers": [ + "deno", + "!typescript-language-server", + "!vtsls", + "!eslint" + ], + "formatter": "language_server" + }, + "TSX": { + "language_servers": [ + "deno", + "!typescript-language-server", + "!vtsls", + "!eslint" + ], + "formatter": "language_server" + } + } } diff --git a/README-refactor.md b/README-refactor.md new file mode 100644 index 0000000..75ffdb9 --- /dev/null +++ b/README-refactor.md @@ -0,0 +1,65 @@ +# 项目重构方案 + +## 目标架构 +采用monorepo结构管理三个独立部分: +1. `packages/crawler` - 现有爬虫功能 +2. `packages/frontend` - 基于Astro的前端 +3. `packages/backend` - 基于Hono的API后端 + +## 目录结构调整方案 + +### 新结构 +``` +. +├── packages/ +│ ├── crawler/ # 爬虫组件 +│ ├── frontend/ # Astro前端 +│ ├── backend/ # Hono后端API +│ └── core/ # 共享代码(未来提取) +├── docs/ # 文档 +├── scripts/ # 项目脚本 +└── package.json # 根项目配置 +``` + +### 具体迁移方案 + +#### 1. 爬虫部分(crawler) +保留以下目录/文件: +- `lib/` (除前端相关) +- `src/db/raw/` +- `src/filterWorker.ts` +- `src/worker.ts` +- `test/` +- `deno.json` +- `.gitignore` + +需要移除: +- Fresh框架相关文件 +- 前端组件(`components/`) +- 静态资源(`static/`) + +#### 2. 前端部分(frontend) +全新创建Astro项目,不保留任何现有前端代码 + +#### 3. 后端部分(backend) +全新创建Hono项目 + +#### 4. 共享代码(core) +未来可从爬虫中提取以下内容到core package: +- 数据库相关:`lib/db/` +- 消息队列:`lib/mq/` +- 网络请求:`lib/net/` +- 工具函数:`lib/utils/` + +## 重构步骤建议 + +1. 初始化monorepo结构 +2. 迁移爬虫代码到`packages/crawler` +3. 创建新的Astro项目在`packages/frontend` +4. 创建新的Hono项目在`packages/backend` +5. 逐步提取共享代码到`packages/core` + +## 注意事项 +- 机器学习相关代码(`pred/`, `filter/`, `lab/`)保持现状 +- 文档(`doc/`)可以迁移到`docs/`目录 +- 需要更新CI/CD流程支持monorepo \ No newline at end of file diff --git a/components/Button.tsx b/components/Button.tsx deleted file mode 100644 index 6e868c5..0000000 --- a/components/Button.tsx +++ /dev/null @@ -1,12 +0,0 @@ -import { JSX } from "preact"; -import { IS_BROWSER } from "$fresh/runtime.ts"; - -export function Button(props: JSX.HTMLAttributes) { - return ( - -

{props.count}

- - - ); -} diff --git a/lib/db/snapshot.ts b/lib/db/snapshot.ts deleted file mode 100644 index 663a628..0000000 --- a/lib/db/snapshot.ts +++ /dev/null @@ -1,190 +0,0 @@ -import { DAY, SECOND } from "$std/datetime/constants.ts"; -import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; -import { VideoSnapshotType } from "lib/db/schema.d.ts"; -import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts"; - -export async function getSongsNearMilestone(client: Client) { - const queryResult = await client.queryObject(` - WITH max_views_per_aid AS ( - -- 找出每个 aid 的最大 views 值,并确保 aid 存在于 songs 表中 - SELECT - vs.aid, - MAX(vs.views) AS max_views - FROM - video_snapshot vs - INNER JOIN - songs s - ON - vs.aid = s.aid - GROUP BY - vs.aid - ), - filtered_max_views AS ( - -- 筛选出满足条件的最大 views - SELECT - aid, - max_views - FROM - max_views_per_aid - WHERE - (max_views >= 90000 AND max_views < 100000) OR - (max_views >= 900000 AND max_views < 1000000) - ) - -- 获取符合条件的完整行数据 - SELECT - vs.* - FROM - video_snapshot vs - INNER JOIN - filtered_max_views fmv - ON - vs.aid = fmv.aid AND vs.views = fmv.max_views - `); - return queryResult.rows.map((row) => { - return { - ...row, - aid: Number(row.aid), - }; - }); -} - -export async function getUnsnapshotedSongs(client: Client) { - const queryResult = await client.queryObject<{ aid: bigint }>(` - SELECT DISTINCT s.aid - FROM songs s - LEFT JOIN video_snapshot v ON s.aid = v.aid - WHERE v.aid IS NULL; - `); - return queryResult.rows.map((row) => Number(row.aid)); -} - -export async function getSongSnapshotCount(client: Client, aid: number) { - const queryResult = await client.queryObject<{ count: number }>( - ` - SELECT COUNT(*) AS count - FROM video_snapshot - WHERE aid = $1; - `, - [aid], - ); - return queryResult.rows[0].count; -} - -export async function getShortTermEtaPrediction(client: Client, aid: number) { - const queryResult = await client.queryObject<{eta: number}>( - ` - WITH old_snapshot AS ( - SELECT created_at, views - FROM video_snapshot - WHERE aid = $1 AND - NOW() - created_at > '20 min' - ORDER BY created_at DESC - LIMIT 1 - ), - new_snapshot AS ( - SELECT created_at, views - FROM video_snapshot - WHERE aid = $1 - ORDER BY created_at DESC - LIMIT 1 - ) - SELECT - CASE - WHEN n.views > 100000 - THEN - (1000000 - n.views) -- Views remaining - / - ( - (n.views - o.views) -- Views delta - / - (EXTRACT(EPOCH FROM (n.created_at - o.created_at)) + 0.001) -- Time delta in seconds - + 0.001 - ) -- Increment per second - ELSE - (100000 - n.views) -- Views remaining - / - ( - (n.views - o.views) -- Views delta - / - (EXTRACT(EPOCH FROM (n.created_at - o.created_at)) + 0.001) -- Time delta in seconds - + 0.001 - ) -- Increment per second - END AS eta - FROM old_snapshot o, new_snapshot n; - `, - [aid], - ); - if (queryResult.rows.length === 0) { - return null; - } - return queryResult.rows[0].eta; -} - -export async function songEligibleForMilestoneSnapshot(client: Client, aid: number) { - const count = await getSongSnapshotCount(client, aid); - if (count < 2) { - return true; - } - const queryResult = await client.queryObject< - { views1: number; created_at1: string; views2: number; created_at2: string } - >( - ` - WITH latest_snapshot AS ( - SELECT - aid, - views, - created_at - FROM video_snapshot - WHERE aid = $1 - ORDER BY created_at DESC - LIMIT 1 - ), - pairs AS ( - SELECT - a.views AS views1, - a.created_at AS created_at1, - b.views AS views2, - b.created_at AS created_at2, - (b.created_at - a.created_at) AS interval - FROM video_snapshot a - JOIN latest_snapshot b - ON a.aid = b.aid - AND a.created_at < b.created_at - ) - SELECT - views1, - created_at1, - views2, - created_at2 - FROM ( - SELECT - *, - ROW_NUMBER() OVER ( - ORDER BY - CASE WHEN interval <= INTERVAL '3 days' THEN 0 ELSE 1 END, - CASE WHEN interval <= INTERVAL '3 days' THEN -interval ELSE interval END - ) AS rn - FROM pairs - ) ranked - WHERE rn = 1; - `, - [aid], - ); - if (queryResult.rows.length === 0) { - return true; - } - const recentViewsData = queryResult.rows[0]; - const time1 = parseTimestampFromPsql(recentViewsData.created_at1); - const time2 = parseTimestampFromPsql(recentViewsData.created_at2); - const intervalSec = (time2 - time1) / SECOND; - const views1 = recentViewsData.views1; - const views2 = recentViewsData.views2; - const viewsDiff = views2 - views1; - if (viewsDiff == 0) { - return false; - } - const nextMilestone = views2 >= 100000 ? 1000000 : 100000; - const expectedViewsDiff = nextMilestone - views2; - const expectedIntervalSec = expectedViewsDiff / viewsDiff * intervalSec; - return expectedIntervalSec <= 3 * DAY; -} diff --git a/lib/ml/filter_inference.ts b/lib/ml/filter_inference.ts deleted file mode 100644 index 019061f..0000000 --- a/lib/ml/filter_inference.ts +++ /dev/null @@ -1,99 +0,0 @@ -import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers"; -import * as ort from "onnxruntime"; -import logger from "lib/log/logger.ts"; -import { WorkerError } from "lib/mq/schema.ts"; - -const tokenizerModel = "alikia2x/jina-embedding-v3-m2v-1024"; -const onnxClassifierPath = "./model/video_classifier_v3_17.onnx"; -const onnxEmbeddingOriginalPath = "./model/model.onnx"; -export const modelVersion = "3.17"; - -let sessionClassifier: ort.InferenceSession | null = null; -let sessionEmbedding: ort.InferenceSession | null = null; -let tokenizer: PreTrainedTokenizer | null = null; - -export async function initializeModels() { - if (tokenizer && sessionClassifier && sessionEmbedding) { - return; - } - - try { - tokenizer = await AutoTokenizer.from_pretrained(tokenizerModel); - - const [classifierSession, embeddingSession] = await Promise.all([ - ort.InferenceSession.create(onnxClassifierPath), - ort.InferenceSession.create(onnxEmbeddingOriginalPath), - ]); - - sessionClassifier = classifierSession; - sessionEmbedding = embeddingSession; - logger.log("Filter models initialized", "ml"); - } catch (error) { - throw new WorkerError(error as Error, "ml", "fn:initializeModels"); - } -} - -export function softmax(logits: Float32Array): number[] { - const maxLogit = Math.max(...logits); - const exponents = logits.map((logit) => Math.exp(logit - maxLogit)); - const sumOfExponents = exponents.reduce((sum, exp) => sum + exp, 0); - return Array.from(exponents.map((exp) => exp / sumOfExponents)); -} - -async function getONNXEmbeddings(texts: string[], session: ort.InferenceSession): Promise { - if (!tokenizer) { - throw new Error("Tokenizer is not initialized. Call initializeModels() first."); - } - const { input_ids } = await tokenizer(texts, { - add_special_tokens: false, - return_tensor: false, - }); - - const cumsum = (arr: number[]): number[] => - arr.reduce((acc: number[], num: number, i: number) => [...acc, num + (acc[i - 1] || 0)], []); - - const offsets: number[] = [0, ...cumsum(input_ids.slice(0, -1).map((x: string) => x.length))]; - const flattened_input_ids = input_ids.flat(); - - const inputs = { - input_ids: new ort.Tensor("int64", new BigInt64Array(flattened_input_ids.map(BigInt)), [ - flattened_input_ids.length, - ]), - offsets: new ort.Tensor("int64", new BigInt64Array(offsets.map(BigInt)), [offsets.length]), - }; - - const { embeddings } = await session.run(inputs); - return Array.from(embeddings.data as Float32Array); -} - -async function runClassification(embeddings: number[]): Promise { - if (!sessionClassifier) { - throw new Error("Classifier session is not initialized. Call initializeModels() first."); - } - const inputTensor = new ort.Tensor( - Float32Array.from(embeddings), - [1, 3, 1024], - ); - - const { logits } = await sessionClassifier.run({ channel_features: inputTensor }); - return softmax(logits.data as Float32Array); -} - -export async function classifyVideo( - title: string, - description: string, - tags: string, - aid: number, -): Promise { - if (!sessionEmbedding) { - throw new Error("Embedding session is not initialized. Call initializeModels() first."); - } - const embeddings = await getONNXEmbeddings([ - title, - description, - tags, - ], sessionEmbedding); - const probabilities = await runClassification(embeddings); - logger.log(`Prediction result for aid: ${aid}: [${probabilities.map((p) => p.toFixed(5))}]`, "ml"); - return probabilities.indexOf(Math.max(...probabilities)); -} diff --git a/lib/mq/exec/snapshotTick.ts b/lib/mq/exec/snapshotTick.ts deleted file mode 100644 index 12443ff..0000000 --- a/lib/mq/exec/snapshotTick.ts +++ /dev/null @@ -1,229 +0,0 @@ -import { Job } from "bullmq"; -import { MINUTE, SECOND } from "$std/datetime/constants.ts"; -import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; -import { db } from "lib/db/init.ts"; -import { - getShortTermEtaPrediction, - getSongsNearMilestone, - getUnsnapshotedSongs, - songEligibleForMilestoneSnapshot, -} from "lib/db/snapshot.ts"; -import { SnapshotQueue } from "lib/mq/index.ts"; -import { insertVideoStats } from "lib/mq/task/getVideoStats.ts"; -import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts"; -import { redis } from "lib/db/redis.ts"; -import { NetSchedulerError } from "lib/mq/scheduler.ts"; -import logger from "lib/log/logger.ts"; -import { formatSeconds } from "lib/utils/formatSeconds.ts"; -import { truncate } from "lib/utils/truncate.ts"; - -async function snapshotScheduled(aid: number) { - try { - return await redis.exists(`cvsa:snapshot:${aid}`); - } catch { - logger.error(`Failed to check scheduled status for ${aid}`, "mq"); - return false; - } -} - -async function setSnapshotScheduled(aid: number, value: boolean, exp: number) { - try { - if (value) { - await redis.set(`cvsa:snapshot:${aid}`, 1, "EX", exp); - } else { - await redis.del(`cvsa:snapshot:${aid}`); - } - } catch { - logger.error(`Failed to set scheduled status to ${value} for ${aid}`, "mq"); - } -} - -interface SongNearMilestone { - aid: number; - id: number; - created_at: string; - views: number; - coins: number; - likes: number; - favorites: number; - shares: number; - danmakus: number; - replies: number; -} - -async function processMilestoneSnapshots(client: Client, vidoesNearMilestone: SongNearMilestone[]) { - let i = 0; - for (const snapshot of vidoesNearMilestone) { - if (await snapshotScheduled(snapshot.aid)) { - logger.silly( - `Video ${snapshot.aid} is already scheduled for snapshot`, - "mq", - "fn:processMilestoneSnapshots", - ); - continue; - } - if (await songEligibleForMilestoneSnapshot(client, snapshot.aid) === false) { - logger.silly( - `Video ${snapshot.aid} is not eligible for milestone snapshot`, - "mq", - "fn:processMilestoneSnapshots", - ); - continue; - } - const factor = Math.floor(i / 8); - const delayTime = factor * SECOND * 2; - await SnapshotQueue.add("snapshotMilestoneVideo", { - aid: snapshot.aid, - currentViews: snapshot.views, - snapshotedAt: snapshot.created_at, - }, { delay: delayTime, priority: 1 }); - await setSnapshotScheduled(snapshot.aid, true, 20 * 60); - i++; - } -} - -async function processUnsnapshotedVideos(unsnapshotedVideos: number[]) { - let i = 0; - for (const aid of unsnapshotedVideos) { - if (await snapshotScheduled(aid)) { - logger.silly(`Video ${aid} is already scheduled for snapshot`, "mq", "fn:processUnsnapshotedVideos"); - continue; - } - const factor = Math.floor(i / 5); - const delayTime = factor * SECOND * 4; - await SnapshotQueue.add("snapshotVideo", { - aid, - }, { delay: delayTime, priority: 3 }); - await setSnapshotScheduled(aid, true, 6 * 60 * 60); - i++; - } -} - -export const snapshotTickWorker = async (_job: Job) => { - const client = await db.connect(); - try { - const vidoesNearMilestone = await getSongsNearMilestone(client); - await processMilestoneSnapshots(client, vidoesNearMilestone); - - const unsnapshotedVideos = await getUnsnapshotedSongs(client); - await processUnsnapshotedVideos(unsnapshotedVideos); - } finally { - client.release(); - } -}; - -export const takeSnapshotForMilestoneVideoWorker = async (job: Job) => { - const client = await db.connect(); - await setSnapshotScheduled(job.data.aid, true, 20 * 60); - try { - const aid: number = job.data.aid; - const currentViews: number = job.data.currentViews; - const lastSnapshoted: string = job.data.snapshotedAt; - const stat = await insertVideoStats(client, aid, "snapshotMilestoneVideo"); - if (typeof stat === "number") { - if (stat === -404 || stat === 62002 || stat == 62012) { - await setSnapshotScheduled(aid, true, 6 * 60 * 60); - } else { - await setSnapshotScheduled(aid, false, 0); - } - return; - } - const nextMilestone = currentViews >= 100000 ? 1000000 : 100000; - if (stat.views >= nextMilestone) { - await setSnapshotScheduled(aid, false, 0); - return; - } - let eta = await getShortTermEtaPrediction(client, aid); - if (eta === null) { - const DELTA = 0.001; - const intervalSeconds = (Date.now() - parseTimestampFromPsql(lastSnapshoted)) / SECOND; - const viewsIncrement = stat.views - currentViews; - const incrementSpeed = viewsIncrement / (intervalSeconds + DELTA); - const viewsToIncrease = nextMilestone - stat.views; - eta = viewsToIncrease / (incrementSpeed + DELTA); - } - const scheduledNextSnapshotDelay = eta * SECOND / 3; - const maxInterval = 20 * MINUTE; - const minInterval = 1 * SECOND; - const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval); - await SnapshotQueue.add("snapshotMilestoneVideo", { - aid, - currentViews: stat.views, - snapshotedAt: stat.time, - }, { delay, priority: 1 }); - await job.updateData({ - ...job.data, - updatedViews: stat.views, - updatedTime: new Date(stat.time).toISOString(), - etaInMins: eta / 60, - }); - logger.log( - `Scheduled next milestone snapshot for ${aid} in ${ - formatSeconds(delay / 1000) - }, current views: ${stat.views}`, - "mq", - ); - } catch (e) { - if (e instanceof NetSchedulerError && e.code === "NO_AVAILABLE_PROXY") { - logger.warn( - `No available proxy for aid ${job.data.aid}.`, - "mq", - "fn:takeSnapshotForMilestoneVideoWorker", - ); - await SnapshotQueue.add("snapshotMilestoneVideo", { - aid: job.data.aid, - currentViews: job.data.currentViews, - snapshotedAt: job.data.snapshotedAt, - }, { delay: 5 * SECOND, priority: 1 }); - return; - } - throw e; - } finally { - client.release(); - } -}; - -export const takeSnapshotForVideoWorker = async (job: Job) => { - const client = await db.connect(); - await setSnapshotScheduled(job.data.aid, true, 6 * 60 * 60); - try { - const { aid } = job.data; - const stat = await insertVideoStats(client, aid, "getVideoInfo"); - if (typeof stat === "number") { - if (stat === -404 || stat === 62002 || stat == 62012) { - await setSnapshotScheduled(aid, true, 6 * 60 * 60); - } else { - await setSnapshotScheduled(aid, false, 0); - } - return; - } - logger.log(`Taken snapshot for ${aid}`, "mq"); - if (stat == null) { - setSnapshotScheduled(aid, false, 0); - return; - } - await job.updateData({ - ...job.data, - updatedViews: stat.views, - updatedTime: new Date(stat.time).toISOString(), - }); - const nearMilestone = (stat.views >= 90000 && stat.views < 100000) || - (stat.views >= 900000 && stat.views < 1000000); - if (nearMilestone) { - await SnapshotQueue.add("snapshotMilestoneVideo", { - aid, - currentViews: stat.views, - snapshotedAt: stat.time, - }, { delay: 0, priority: 1 }); - } - await setSnapshotScheduled(aid, false, 0); - } catch (e) { - if (e instanceof NetSchedulerError && e.code === "NO_AVAILABLE_PROXY") { - await setSnapshotScheduled(job.data.aid, false, 0); - return; - } - throw e; - } finally { - client.release(); - } -}; diff --git a/lib/mq/executors.ts b/lib/mq/executors.ts deleted file mode 100644 index 85c2cc1..0000000 --- a/lib/mq/executors.ts +++ /dev/null @@ -1 +0,0 @@ -export * from "lib/mq/exec/getLatestVideos.ts"; diff --git a/lib/mq/init.ts b/lib/mq/init.ts deleted file mode 100644 index 95693ab..0000000 --- a/lib/mq/init.ts +++ /dev/null @@ -1,24 +0,0 @@ -import { MINUTE } from "$std/datetime/constants.ts"; -import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "lib/mq/index.ts"; -import logger from "lib/log/logger.ts"; - -export async function initMQ() { - await LatestVideosQueue.upsertJobScheduler("getLatestVideos", { - every: 1 * MINUTE, - immediately: true, - }); - await ClassifyVideoQueue.upsertJobScheduler("classifyVideos", { - every: 5 * MINUTE, - immediately: true, - }); - await LatestVideosQueue.upsertJobScheduler("collectSongs", { - every: 3 * MINUTE, - immediately: true, - }); - await SnapshotQueue.upsertJobScheduler("scheduleSnapshotTick", { - every: 3 * MINUTE, - immediately: true, - }); - - logger.log("Message queue initialized."); -} diff --git a/lib/mq/task/getVideoStats.ts b/lib/mq/task/getVideoStats.ts deleted file mode 100644 index 274d1bb..0000000 --- a/lib/mq/task/getVideoStats.ts +++ /dev/null @@ -1,32 +0,0 @@ -import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; -import { getVideoInfo } from "lib/net/getVideoInfo.ts"; - -export async function insertVideoStats(client: Client, aid: number, task: string) { - const data = await getVideoInfo(aid, task); - const time = new Date().getTime(); - if (typeof data == 'number') { - return data; - } - const views = data.stat.view; - const danmakus = data.stat.danmaku; - const replies = data.stat.reply; - const likes = data.stat.like; - const coins = data.stat.coin; - const shares = data.stat.share; - const favorites = data.stat.favorite; - await client.queryObject(` - INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8) - `, [aid, views, danmakus, replies, likes, coins, shares, favorites]); - return { - aid, - views, - danmakus, - replies, - likes, - coins, - shares, - favorites, - time - } -} diff --git a/lib/net/getVideoInfo.ts b/lib/net/getVideoInfo.ts deleted file mode 100644 index c35bf56..0000000 --- a/lib/net/getVideoInfo.ts +++ /dev/null @@ -1,14 +0,0 @@ -import netScheduler from "lib/mq/scheduler.ts"; -import { VideoInfoData, VideoInfoResponse } from "lib/net/bilibili.d.ts"; -import logger from "lib/log/logger.ts"; - -export async function getVideoInfo(aid: number, task: string): Promise { - const url = `https://api.bilibili.com/x/web-interface/view?aid=${aid}`; - const data = await netScheduler.request(url, task); - const errMessage = `Error fetching metadata for ${aid}:`; - if (data.code !== 0) { - logger.error(errMessage + data.code + "-" + data.message, "net", "fn:getVideoInfo"); - return data.code; - } - return data.data; -} diff --git a/main.ts b/main.ts deleted file mode 100644 index 675f529..0000000 --- a/main.ts +++ /dev/null @@ -1,13 +0,0 @@ -/// -/// -/// -/// -/// - -import "$std/dotenv/load.ts"; - -import { start } from "$fresh/server.ts"; -import manifest from "./fresh.gen.ts"; -import config from "./fresh.config.ts"; - -await start(manifest, config); diff --git a/filter/RunningLogs.txt b/ml/filter/RunningLogs.txt similarity index 100% rename from filter/RunningLogs.txt rename to ml/filter/RunningLogs.txt diff --git a/filter/checkpoint_conversion.py b/ml/filter/checkpoint_conversion.py similarity index 100% rename from filter/checkpoint_conversion.py rename to ml/filter/checkpoint_conversion.py diff --git a/filter/clean_dataset.py b/ml/filter/clean_dataset.py similarity index 100% rename from filter/clean_dataset.py rename to ml/filter/clean_dataset.py diff --git a/filter/dataset.py b/ml/filter/dataset.py similarity index 100% rename from filter/dataset.py rename to ml/filter/dataset.py diff --git a/filter/db_utils.py b/ml/filter/db_utils.py similarity index 100% rename from filter/db_utils.py rename to ml/filter/db_utils.py diff --git a/filter/embedding.py b/ml/filter/embedding.py similarity index 100% rename from filter/embedding.py rename to ml/filter/embedding.py diff --git a/filter/embedding_range.py b/ml/filter/embedding_range.py similarity index 100% rename from filter/embedding_range.py rename to ml/filter/embedding_range.py diff --git a/filter/embedding_visualization.py b/ml/filter/embedding_visualization.py similarity index 100% rename from filter/embedding_visualization.py rename to ml/filter/embedding_visualization.py diff --git a/filter/labeling_system.py b/ml/filter/labeling_system.py similarity index 100% rename from filter/labeling_system.py rename to ml/filter/labeling_system.py diff --git a/filter/model.py b/ml/filter/model.py similarity index 100% rename from filter/model.py rename to ml/filter/model.py diff --git a/filter/modelV3_10.py b/ml/filter/modelV3_10.py similarity index 100% rename from filter/modelV3_10.py rename to ml/filter/modelV3_10.py diff --git a/filter/modelV3_12.py b/ml/filter/modelV3_12.py similarity index 100% rename from filter/modelV3_12.py rename to ml/filter/modelV3_12.py diff --git a/filter/modelV3_15.py b/ml/filter/modelV3_15.py similarity index 100% rename from filter/modelV3_15.py rename to ml/filter/modelV3_15.py diff --git a/filter/modelV6_0.py b/ml/filter/modelV6_0.py similarity index 100% rename from filter/modelV6_0.py rename to ml/filter/modelV6_0.py diff --git a/filter/onnx_export.py b/ml/filter/onnx_export.py similarity index 100% rename from filter/onnx_export.py rename to ml/filter/onnx_export.py diff --git a/filter/predict.py b/ml/filter/predict.py similarity index 100% rename from filter/predict.py rename to ml/filter/predict.py diff --git a/filter/quantize.py b/ml/filter/quantize.py similarity index 100% rename from filter/quantize.py rename to ml/filter/quantize.py diff --git a/filter/tag.py b/ml/filter/tag.py similarity index 100% rename from filter/tag.py rename to ml/filter/tag.py diff --git a/filter/test.py b/ml/filter/test.py similarity index 100% rename from filter/test.py rename to ml/filter/test.py diff --git a/filter/train.py b/ml/filter/train.py similarity index 100% rename from filter/train.py rename to ml/filter/train.py diff --git a/lab/.gitignore b/ml/lab/.gitignore similarity index 100% rename from lab/.gitignore rename to ml/lab/.gitignore diff --git a/lab/align-pipeline.md b/ml/lab/align-pipeline.md similarity index 100% rename from lab/align-pipeline.md rename to ml/lab/align-pipeline.md diff --git a/lab/mmsAlignment/align2LRC.py b/ml/lab/mmsAlignment/align2LRC.py similarity index 100% rename from lab/mmsAlignment/align2LRC.py rename to ml/lab/mmsAlignment/align2LRC.py diff --git a/lab/mmsAlignment/alignWithMMS.py b/ml/lab/mmsAlignment/alignWithMMS.py similarity index 100% rename from lab/mmsAlignment/alignWithMMS.py rename to ml/lab/mmsAlignment/alignWithMMS.py diff --git a/lab/mmsAlignment/splitSong.py b/ml/lab/mmsAlignment/splitSong.py similarity index 100% rename from lab/mmsAlignment/splitSong.py rename to ml/lab/mmsAlignment/splitSong.py diff --git a/lab/utils/audio.py b/ml/lab/utils/audio.py similarity index 100% rename from lab/utils/audio.py rename to ml/lab/utils/audio.py diff --git a/lab/utils/cleanTempDir.py b/ml/lab/utils/cleanTempDir.py similarity index 100% rename from lab/utils/cleanTempDir.py rename to ml/lab/utils/cleanTempDir.py diff --git a/lab/utils/ttml.py b/ml/lab/utils/ttml.py similarity index 100% rename from lab/utils/ttml.py rename to ml/lab/utils/ttml.py diff --git a/lab/whisperAlignment/align2srt.py b/ml/lab/whisperAlignment/align2srt.py similarity index 100% rename from lab/whisperAlignment/align2srt.py rename to ml/lab/whisperAlignment/align2srt.py diff --git a/lab/whisperAlignment/alignWithGroup.py b/ml/lab/whisperAlignment/alignWithGroup.py similarity index 100% rename from lab/whisperAlignment/alignWithGroup.py rename to ml/lab/whisperAlignment/alignWithGroup.py diff --git a/lab/whisperAlignment/splitGroups.py b/ml/lab/whisperAlignment/splitGroups.py similarity index 100% rename from lab/whisperAlignment/splitGroups.py rename to ml/lab/whisperAlignment/splitGroups.py diff --git a/lab/whisperAlignment/srt2lrc.py b/ml/lab/whisperAlignment/srt2lrc.py similarity index 100% rename from lab/whisperAlignment/srt2lrc.py rename to ml/lab/whisperAlignment/srt2lrc.py diff --git a/pred/count.py b/ml/pred/count.py similarity index 100% rename from pred/count.py rename to ml/pred/count.py diff --git a/pred/crawler.py b/ml/pred/crawler.py similarity index 100% rename from pred/crawler.py rename to ml/pred/crawler.py diff --git a/pred/dataset.py b/ml/pred/dataset.py similarity index 100% rename from pred/dataset.py rename to ml/pred/dataset.py diff --git a/pred/export_onnx.py b/ml/pred/export_onnx.py similarity index 100% rename from pred/export_onnx.py rename to ml/pred/export_onnx.py diff --git a/pred/inference.py b/ml/pred/inference.py similarity index 100% rename from pred/inference.py rename to ml/pred/inference.py diff --git a/pred/model.py b/ml/pred/model.py similarity index 100% rename from pred/model.py rename to ml/pred/model.py diff --git a/pred/train.py b/ml/pred/train.py similarity index 100% rename from pred/train.py rename to ml/pred/train.py diff --git a/packages/backend/deno.json b/packages/backend/deno.json new file mode 100644 index 0000000..e69de29 diff --git a/packages/core/deno.json b/packages/core/deno.json new file mode 100644 index 0000000..e69de29 diff --git a/lib/db/allData.ts b/packages/crawler/db/allData.ts similarity index 58% rename from lib/db/allData.ts rename to packages/crawler/db/allData.ts index 8e30780..461cb69 100644 --- a/lib/db/allData.ts +++ b/packages/crawler/db/allData.ts @@ -1,21 +1,24 @@ import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; -import { AllDataType, BiliUserType } from "lib/db/schema.d.ts"; -import { modelVersion } from "lib/ml/filter_inference.ts"; +import { AllDataType, BiliUserType } from "db/schema.d.ts"; +import Akari from "ml/akari.ts"; export async function videoExistsInAllData(client: Client, aid: number) { - return await client.queryObject<{ exists: boolean }>(`SELECT EXISTS(SELECT 1 FROM all_data WHERE aid = $1)`, [aid]) + return await client.queryObject<{ exists: boolean }>( + `SELECT EXISTS(SELECT 1 FROM bilibili_metadata WHERE aid = $1)`, + [aid], + ) .then((result) => result.rows[0].exists); } export async function userExistsInBiliUsers(client: Client, uid: number) { - return await client.queryObject<{ exists: boolean }>(`SELECT EXISTS(SELECT 1 FROM bili_user WHERE uid = $1)`, [ + return await client.queryObject<{ exists: boolean }>(`SELECT EXISTS(SELECT 1 FROM bilibili_user WHERE uid = $1)`, [ uid, ]); } export async function getUnlabelledVideos(client: Client) { const queryResult = await client.queryObject<{ aid: number }>( - `SELECT a.aid FROM all_data a LEFT JOIN labelling_result l ON a.aid = l.aid WHERE l.aid IS NULL`, + `SELECT a.aid FROM bilibili_metadata a LEFT JOIN labelling_result l ON a.aid = l.aid WHERE l.aid IS NULL`, ); return queryResult.rows.map((row) => row.aid); } @@ -23,20 +26,20 @@ export async function getUnlabelledVideos(client: Client) { export async function insertVideoLabel(client: Client, aid: number, label: number) { return await client.queryObject( `INSERT INTO labelling_result (aid, label, model_version) VALUES ($1, $2, $3) ON CONFLICT (aid, model_version) DO NOTHING`, - [aid, label, modelVersion], + [aid, label, Akari.getModelVersion()], ); } export async function getVideoInfoFromAllData(client: Client, aid: number) { const queryResult = await client.queryObject( - `SELECT * FROM all_data WHERE aid = $1`, + `SELECT * FROM bilibili_metadata WHERE aid = $1`, [aid], ); const row = queryResult.rows[0]; let authorInfo = ""; if (row.uid && await userExistsInBiliUsers(client, row.uid)) { const q = await client.queryObject( - `SELECT * FROM bili_user WHERE uid = $1`, + `SELECT * FROM bilibili_user WHERE uid = $1`, [row.uid], ); const userRow = q.rows[0]; @@ -56,8 +59,8 @@ export async function getUnArchivedBiliUsers(client: Client) { const queryResult = await client.queryObject<{ uid: number }>( ` SELECT ad.uid - FROM all_data ad - LEFT JOIN bili_user bu ON ad.uid = bu.uid + FROM bilibili_metadata ad + LEFT JOIN bilibili_user bu ON ad.uid = bu.uid WHERE bu.uid IS NULL; `, [], @@ -65,3 +68,20 @@ export async function getUnArchivedBiliUsers(client: Client) { const rows = queryResult.rows; return rows.map((row) => row.uid); } + +export async function setBiliVideoStatus(client: Client, aid: number, status: number) { + return await client.queryObject( + `UPDATE bilibili_metadata SET status = $1 WHERE aid = $2`, + [status, aid], + ); +} + +export async function getBiliVideoStatus(client: Client, aid: number) { + const queryResult = await client.queryObject<{ status: number }>( + `SELECT status FROM bilibili_metadata WHERE aid = $1`, + [aid], + ); + const rows = queryResult.rows; + if (rows.length === 0) return 0; + return rows[0].status; +} diff --git a/lib/db/init.ts b/packages/crawler/db/init.ts similarity index 72% rename from lib/db/init.ts rename to packages/crawler/db/init.ts index d206872..a1835b0 100644 --- a/lib/db/init.ts +++ b/packages/crawler/db/init.ts @@ -1,5 +1,5 @@ import { Pool } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; -import { postgresConfig } from "lib/db/pgConfig.ts"; +import { postgresConfig } from "db/pgConfig.ts"; const pool = new Pool(postgresConfig, 12); diff --git a/lib/db/pgConfig.ts b/packages/crawler/db/pgConfig.ts similarity index 100% rename from lib/db/pgConfig.ts rename to packages/crawler/db/pgConfig.ts diff --git a/lib/db/redis.ts b/packages/crawler/db/redis.ts similarity index 100% rename from lib/db/redis.ts rename to packages/crawler/db/redis.ts diff --git a/lib/db/schema.d.ts b/packages/crawler/db/schema.d.ts similarity index 61% rename from lib/db/schema.d.ts rename to packages/crawler/db/schema.d.ts index d93f736..d030308 100644 --- a/lib/db/schema.d.ts +++ b/packages/crawler/db/schema.d.ts @@ -31,3 +31,25 @@ export interface VideoSnapshotType { aid: bigint; replies: number; } + +export interface LatestSnapshotType { + aid: number; + time: number; + views: number; + danmakus: number; + replies: number; + likes: number; + coins: number; + shares: number; + favorites: number; +} + +export interface SnapshotScheduleType { + id: number; + aid: number; + type?: string; + created_at: string; + started_at?: string; + finished_at?: string; + status: string; +} diff --git a/packages/crawler/db/snapshot.ts b/packages/crawler/db/snapshot.ts new file mode 100644 index 0000000..ef8009d --- /dev/null +++ b/packages/crawler/db/snapshot.ts @@ -0,0 +1,44 @@ +import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; +import { LatestSnapshotType } from "db/schema.d.ts"; + +export async function getVideosNearMilestone(client: Client) { + const queryResult = await client.queryObject(` + SELECT ls.* + FROM latest_video_snapshot ls + INNER JOIN + songs s ON ls.aid = s.aid + AND s.deleted = false + WHERE + s.deleted = false AND + (views >= 90000 AND views < 100000) OR + (views >= 900000 AND views < 1000000) OR + (views >= 9900000 AND views < 10000000) + `); + return queryResult.rows.map((row) => { + return { + ...row, + aid: Number(row.aid), + }; + }); +} + +export async function getLatestVideoSnapshot(client: Client, aid: number): Promise { + const queryResult = await client.queryObject( + ` + SELECT * + FROM latest_video_snapshot + WHERE aid = $1 + `, + [aid], + ); + if (queryResult.rows.length === 0) { + return null; + } + return queryResult.rows.map((row) => { + return { + ...row, + aid: Number(row.aid), + time: new Date(row.time).getTime(), + }; + })[0]; +} diff --git a/packages/crawler/db/snapshotSchedule.ts b/packages/crawler/db/snapshotSchedule.ts new file mode 100644 index 0000000..b98f900 --- /dev/null +++ b/packages/crawler/db/snapshotSchedule.ts @@ -0,0 +1,309 @@ +import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; +import { formatTimestampToPsql } from "utils/formatTimestampToPostgre.ts"; +import { SnapshotScheduleType } from "./schema.d.ts"; +import logger from "log/logger.ts"; +import { MINUTE } from "$std/datetime/constants.ts"; +import { redis } from "db/redis.ts"; +import { Redis } from "ioredis"; + +const REDIS_KEY = "cvsa:snapshot_window_counts"; + +function getCurrentWindowIndex(): number { + const now = new Date(); + const minutesSinceMidnight = now.getHours() * 60 + now.getMinutes(); + const currentWindow = Math.floor(minutesSinceMidnight / 5); + return currentWindow; +} + +export async function refreshSnapshotWindowCounts(client: Client, redisClient: Redis) { + const now = new Date(); + const startTime = now.getTime(); + + const result = await client.queryObject<{ window_start: Date; count: number }>` + SELECT + date_trunc('hour', started_at) + + (EXTRACT(minute FROM started_at)::int / 5 * INTERVAL '5 minutes') AS window_start, + COUNT(*) AS count + FROM snapshot_schedule + WHERE started_at >= NOW() AND status = 'pending' AND started_at <= NOW() + INTERVAL '10 days' + GROUP BY 1 + ORDER BY window_start + ` + + await redisClient.del(REDIS_KEY); + + const currentWindow = getCurrentWindowIndex(); + + for (const row of result.rows) { + const targetOffset = Math.floor((row.window_start.getTime() - startTime) / (5 * MINUTE)); + const offset = (currentWindow + targetOffset); + if (offset >= 0) { + await redisClient.hset(REDIS_KEY, offset.toString(), Number(row.count)); + } + } +} + +export async function initSnapshotWindowCounts(client: Client, redisClient: Redis) { + await refreshSnapshotWindowCounts(client, redisClient); + setInterval(async () => { + await refreshSnapshotWindowCounts(client, redisClient); + }, 5 * MINUTE); +} + +async function getWindowCount(redisClient: Redis, offset: number): Promise { + const count = await redisClient.hget(REDIS_KEY, offset.toString()); + return count ? parseInt(count, 10) : 0; +} + +export async function snapshotScheduleExists(client: Client, id: number) { + const res = await client.queryObject<{ id: number }>( + `SELECT id FROM snapshot_schedule WHERE id = $1`, + [id], + ); + return res.rows.length > 0; +} + +export async function videoHasActiveSchedule(client: Client, aid: number) { + const res = await client.queryObject<{ status: string }>( + `SELECT status FROM snapshot_schedule WHERE aid = $1 AND (status = 'pending' OR status = 'processing')`, + [aid], + ); + return res.rows.length > 0; +} + +export async function videoHasProcessingSchedule(client: Client, aid: number) { + const res = await client.queryObject<{ status: string }>( + `SELECT status FROM snapshot_schedule WHERE aid = $1 AND status = 'processing'`, + [aid], + ); + return res.rows.length > 0; +} + +export async function bulkGetVideosWithoutProcessingSchedules(client: Client, aids: number[]) { + const res = await client.queryObject<{ aid: number }>( + `SELECT aid FROM snapshot_schedule WHERE aid = ANY($1) AND status != 'processing' GROUP BY aid`, + [aids], + ); + return res.rows.map((row) => row.aid); +} + +interface Snapshot { + created_at: number; + views: number; +} + +export async function findClosestSnapshot( + client: Client, + aid: number, + targetTime: Date, +): Promise { + const query = ` + SELECT created_at, views + FROM video_snapshot + WHERE aid = $1 + ORDER BY ABS(EXTRACT(EPOCH FROM (created_at - $2::timestamptz))) + LIMIT 1 + `; + const result = await client.queryObject<{ created_at: string; views: number }>( + query, + [aid, targetTime.toISOString()], + ); + if (result.rows.length === 0) return null; + const row = result.rows[0]; + return { + created_at: new Date(row.created_at).getTime(), + views: row.views, + }; +} + +export async function findSnapshotBefore( + client: Client, + aid: number, + targetTime: Date, +): Promise { + const query = ` + SELECT created_at, views + FROM video_snapshot + WHERE aid = $1 + AND created_at <= $2::timestamptz + ORDER BY created_at DESC + LIMIT 1 + `; + const result = await client.queryObject<{ created_at: string; views: number }>( + query, + [aid, targetTime.toISOString()], + ); + if (result.rows.length === 0) return null; + const row = result.rows[0]; + return { + created_at: new Date(row.created_at).getTime(), + views: row.views, + }; +} + +export async function hasAtLeast2Snapshots(client: Client, aid: number) { + const res = await client.queryObject<{ count: number }>( + `SELECT COUNT(*) FROM video_snapshot WHERE aid = $1`, + [aid], + ); + return res.rows[0].count >= 2; +} + +export async function getLatestSnapshot(client: Client, aid: number): Promise { + const res = await client.queryObject<{ created_at: string; views: number }>( + `SELECT created_at, views FROM video_snapshot WHERE aid = $1 ORDER BY created_at DESC LIMIT 1`, + [aid], + ); + if (res.rows.length === 0) return null; + const row = res.rows[0]; + return { + created_at: new Date(row.created_at).getTime(), + views: row.views, + }; +} + +/* + * Returns the number of snapshot schedules within the specified range. + * @param client The database client. + * @param start The start time of the range. (Timestamp in milliseconds) + * @param end The end time of the range. (Timestamp in milliseconds) + */ +export async function getSnapshotScheduleCountWithinRange(client: Client, start: number, end: number) { + const startTimeString = formatTimestampToPsql(start); + const endTimeString = formatTimestampToPsql(end); + const query = ` + SELECT COUNT(*) FROM snapshot_schedule + WHERE started_at BETWEEN $1 AND $2 + AND status = 'pending' + `; + const res = await client.queryObject<{ count: number }>(query, [startTimeString, endTimeString]); + return res.rows[0].count; +} + +/* + * Creates a new snapshot schedule record. + * @param client The database client. + * @param aid The aid of the video. + * @param targetTime Scheduled time for snapshot. (Timestamp in milliseconds) + */ +export async function scheduleSnapshot(client: Client, aid: number, type: string, targetTime: number, force: boolean = false) { + if (await videoHasActiveSchedule(client, aid) && !force) return; + let adjustedTime = new Date(targetTime); + if (type !== "milestone" && type !== "new") { + adjustedTime = await adjustSnapshotTime(new Date(targetTime), 1000, redis); + } + logger.log(`Scheduled snapshot for ${aid} at ${adjustedTime.toISOString()}`, "mq", "fn:scheduleSnapshot"); + return client.queryObject( + `INSERT INTO snapshot_schedule (aid, type, started_at) VALUES ($1, $2, $3)`, + [aid, type, adjustedTime.toISOString()], + ); +} + +export async function bulkScheduleSnapshot(client: Client, aids: number[], type: string, targetTime: number, force: boolean = false) { + for (const aid of aids) { + await scheduleSnapshot(client, aid, type, targetTime, force); + } +} + +export async function adjustSnapshotTime( + expectedStartTime: Date, + allowedCounts: number = 1000, + redisClient: Redis, +): Promise { + const currentWindow = getCurrentWindowIndex(); + const targetOffset = Math.floor((expectedStartTime.getTime() - Date.now()) / (5 * MINUTE)) - 6; + + const initialOffset = currentWindow + Math.max(targetOffset, 0); + + let timePerIteration = 0; + const MAX_ITERATIONS = 2880; + let iters = 0; + const t = performance.now(); + for (let i = initialOffset; i < MAX_ITERATIONS; i++) { + iters++; + const offset = i; + const count = await getWindowCount(redisClient, offset); + + if (count < allowedCounts) { + await redisClient.hincrby(REDIS_KEY, offset.toString(), 1); + + const startPoint = new Date(); + startPoint.setHours(0, 0, 0, 0); + const startTime = startPoint.getTime(); + const windowStart = startTime + offset * 5 * MINUTE; + const randomDelay = Math.floor(Math.random() * 5 * MINUTE); + const delayedDate = new Date(windowStart + randomDelay); + const now = new Date(); + + if (delayedDate.getTime() < now.getTime()) { + const elapsed = performance.now() - t; + timePerIteration = elapsed / (i+1); + logger.log(`${timePerIteration.toFixed(3)}ms * ${iters} iterations`, "perf", "fn:adjustSnapshotTime"); + return now; + } + const elapsed = performance.now() - t; + timePerIteration = elapsed / (i+1); + logger.log(`${timePerIteration.toFixed(3)}ms * ${iters} iterations`, "perf", "fn:adjustSnapshotTime"); + return delayedDate; + } + } + const elapsed = performance.now() - t; + timePerIteration = elapsed / MAX_ITERATIONS; + logger.log(`${timePerIteration.toFixed(3)}ms * ${MAX_ITERATIONS} iterations`, "perf", "fn:adjustSnapshotTime"); + return expectedStartTime; +} + + +export async function getSnapshotsInNextSecond(client: Client) { + const query = ` + SELECT * + FROM snapshot_schedule + WHERE started_at <= NOW() + INTERVAL '1 seconds' AND status = 'pending' AND type != 'normal' + ORDER BY + CASE + WHEN type = 'milestone' THEN 0 + ELSE 1 + END, + started_at + LIMIT 10; + `; + const res = await client.queryObject(query, []); + return res.rows; +} + +export async function getBulkSnapshotsInNextSecond(client: Client) { + const query = ` + SELECT * + FROM snapshot_schedule + WHERE started_at <= NOW() + INTERVAL '15 seconds' AND status = 'pending' AND type = 'normal' + ORDER BY started_at + LIMIT 1000; + `; + const res = await client.queryObject(query, []); + return res.rows; +} + +export async function setSnapshotStatus(client: Client, id: number, status: string) { + return await client.queryObject( + `UPDATE snapshot_schedule SET status = $2 WHERE id = $1`, + [id, status], + ); +} + +export async function bulkSetSnapshotStatus(client: Client, ids: number[], status: string) { + return await client.queryObject( + `UPDATE snapshot_schedule SET status = $2 WHERE id = ANY($1)`, + [ids, status], + ); +} + +export async function getVideosWithoutActiveSnapshotSchedule(client: Client) { + const query: string = ` + SELECT s.aid + FROM songs s + LEFT JOIN snapshot_schedule ss ON s.aid = ss.aid AND (ss.status = 'pending' OR ss.status = 'processing') + WHERE ss.aid IS NULL + `; + const res = await client.queryObject<{ aid: number }>(query, []); + return res.rows.map((r) => Number(r.aid)); +} diff --git a/lib/db/songs.ts b/packages/crawler/db/songs.ts similarity index 61% rename from lib/db/songs.ts rename to packages/crawler/db/songs.ts index 0d5a096..1bfa002 100644 --- a/lib/db/songs.ts +++ b/packages/crawler/db/songs.ts @@ -1,4 +1,5 @@ import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; +import { parseTimestampFromPsql } from "utils/formatTimestampToPostgre.ts"; export async function getNotCollectedSongs(client: Client) { const queryResult = await client.queryObject<{ aid: number }>(` @@ -22,8 +23,23 @@ export async function aidExistsInSongs(client: Client, aid: number) { FROM songs WHERE aid = $1 ); - `, + `, [aid], ); return queryResult.rows[0].exists; } + +export async function getSongsPublihsedAt(client: Client, aid: number) { + const queryResult = await client.queryObject<{ published_at: string }>( + ` + SELECT published_at + FROM songs + WHERE aid = $1; + `, + [aid], + ); + if (queryResult.rows.length === 0) { + return null; + } + return parseTimestampFromPsql(queryResult.rows[0].published_at); +} diff --git a/packages/crawler/deno.json b/packages/crawler/deno.json new file mode 100644 index 0000000..4f95bb9 --- /dev/null +++ b/packages/crawler/deno.json @@ -0,0 +1,50 @@ +{ + "name": "@cvsa/crawler", + "tasks": { + "crawl-raw-bili": "deno --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run src/db/raw/insertAidsToDB.ts", + "crawl-bili-aids": "deno --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run src/db/raw/fetchAids.ts", + "check": "deno fmt --check && deno lint && deno check **/*.ts && deno check **/*.tsx", + "manifest": "deno task cli manifest $(pwd)", + "start": "deno run -A --watch=static/,routes/ dev.ts", + "build": "deno run -A dev.ts build", + "preview": "deno run -A main.ts", + "worker:main": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write --allow-run ./src/worker.ts", + "worker:filter": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/filterWorker.ts", + "adder": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts", + "bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts", + "all": "concurrently 'deno task worker:main' 'deno task adder' 'deno task bullui' 'deno task worker:filter'", + "test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run" + }, + "lint": { + "rules": { + "tags": ["recommended"] + } + }, + "imports": { + "@std/assert": "jsr:@std/assert@1", + "$std/": "https://deno.land/std@0.216.0/", + "@huggingface/transformers": "npm:@huggingface/transformers@3.0.0", + "bullmq": "npm:bullmq", + "mq/": "./mq/", + "db/": "./db/", + "log/": "./log/", + "net/": "./net/", + "ml/": "./ml/", + "utils/": "./utils/", + "ioredis": "npm:ioredis", + "@bull-board/api": "npm:@bull-board/api", + "@bull-board/express": "npm:@bull-board/express", + "express": "npm:express", + "src/": "./src/", + "onnxruntime": "npm:onnxruntime-node@1.19.2", + "chalk": "npm:chalk" + }, + "fmt": { + "useTabs": true, + "lineWidth": 120, + "indentWidth": 4, + "semiColons": true, + "proseWrap": "always" + }, + "exports": "./main.ts" +} \ No newline at end of file diff --git a/lib/log/logger.ts b/packages/crawler/log/logger.ts similarity index 100% rename from lib/log/logger.ts rename to packages/crawler/log/logger.ts diff --git a/lib/log/test.ts b/packages/crawler/log/test.ts similarity index 89% rename from lib/log/test.ts rename to packages/crawler/log/test.ts index 71c719c..ee5953c 100644 --- a/lib/log/test.ts +++ b/packages/crawler/log/test.ts @@ -1,4 +1,4 @@ -import logger from "lib/log/logger.ts"; +import logger from "log/logger.ts"; logger.error(Error("test error"), "test service"); logger.debug(`some string`); diff --git a/packages/crawler/main.ts b/packages/crawler/main.ts new file mode 100644 index 0000000..3cdc0f4 --- /dev/null +++ b/packages/crawler/main.ts @@ -0,0 +1,7 @@ +// DENO ASK ME TO EXPORT SOMETHING WHEN 'name' IS SPECIFIED +// AND IF I DON'T SPECIFY 'name', THE --filter FLAG IN `deno task` WON'T WORK. +// I DONT'T KNOW WHY +// SO HERE'S A PLACHOLDER EXPORT FOR DENO: +export const DENO = "FUCK YOU DENO"; +// Oh, maybe export the version is a good idea +export const VERSION = "1.0.13"; \ No newline at end of file diff --git a/packages/crawler/ml/akari.ts b/packages/crawler/ml/akari.ts new file mode 100644 index 0000000..69a7a5d --- /dev/null +++ b/packages/crawler/ml/akari.ts @@ -0,0 +1,107 @@ +import { AIManager } from "ml/manager.ts"; +import * as ort from "onnxruntime"; +import logger from "log/logger.ts"; +import { WorkerError } from "mq/schema.ts"; +import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers"; + +const tokenizerModel = "alikia2x/jina-embedding-v3-m2v-1024"; +const onnxClassifierPath = "../../model/akari/3.17.onnx"; +const onnxEmbeddingPath = "../../model/embedding/model.onnx"; + +class AkariProto extends AIManager { + private tokenizer: PreTrainedTokenizer | null = null; + private readonly modelVersion = "3.17"; + + constructor() { + super(); + this.models = { + "classifier": onnxClassifierPath, + "embedding": onnxEmbeddingPath, + }; + } + + public override async init(): Promise { + await super.init(); + await this.initJinaTokenizer(); + } + + private tokenizerInitialized(): boolean { + return this.tokenizer !== null; + } + + private getTokenizer(): PreTrainedTokenizer { + if (!this.tokenizerInitialized()) { + throw new Error("Tokenizer is not initialized. Call init() first."); + } + return this.tokenizer!; + } + + private async initJinaTokenizer(): Promise { + if (this.tokenizerInitialized()) { + return; + } + try { + this.tokenizer = await AutoTokenizer.from_pretrained(tokenizerModel); + logger.log("Tokenizer initialized", "ml"); + } catch (error) { + throw new WorkerError(error as Error, "ml", "fn:initTokenizer"); + } + } + + private async getJinaEmbeddings1024(texts: string[]): Promise { + const tokenizer = this.getTokenizer(); + const session = this.getModelSession("embedding"); + + const { input_ids } = await tokenizer(texts, { + add_special_tokens: false, + return_tensor: false, + }); + + const cumsum = (arr: number[]): number[] => + arr.reduce((acc: number[], num: number, i: number) => [...acc, num + (acc[i - 1] || 0)], []); + + const offsets: number[] = [0, ...cumsum(input_ids.slice(0, -1).map((x: string) => x.length))]; + const flattened_input_ids = input_ids.flat(); + + const inputs = { + input_ids: new ort.Tensor("int64", new BigInt64Array(flattened_input_ids.map(BigInt)), [ + flattened_input_ids.length, + ]), + offsets: new ort.Tensor("int64", new BigInt64Array(offsets.map(BigInt)), [offsets.length]), + }; + + const { embeddings } = await session.run(inputs); + return Array.from(embeddings.data as Float32Array); + } + + private async runClassification(embeddings: number[]): Promise { + const session = this.getModelSession("classifier"); + const inputTensor = new ort.Tensor( + Float32Array.from(embeddings), + [1, 3, 1024], + ); + + const { logits } = await session.run({ channel_features: inputTensor }); + return this.softmax(logits.data as Float32Array); + } + + public async classifyVideo(title: string, description: string, tags: string, aid?: number): Promise { + const embeddings = await this.getJinaEmbeddings1024([ + title, + description, + tags, + ]); + const probabilities = await this.runClassification(embeddings); + if (aid) { + logger.log(`Prediction result for aid: ${aid}: [${probabilities.map((p) => p.toFixed(5))}]`, "ml"); + } + return probabilities.indexOf(Math.max(...probabilities)); + } + + public getModelVersion(): string { + return this.modelVersion; + } +} + +const Akari = new AkariProto(); +export default Akari; diff --git a/lib/ml/benchmark.ts b/packages/crawler/ml/benchmark.ts similarity index 94% rename from lib/ml/benchmark.ts rename to packages/crawler/ml/benchmark.ts index 0cfc193..3fc76ac 100644 --- a/lib/ml/benchmark.ts +++ b/packages/crawler/ml/benchmark.ts @@ -1,6 +1,12 @@ import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers"; import * as ort from "onnxruntime"; -import { softmax } from "lib/ml/filter_inference.ts"; + +function softmax(logits: Float32Array): number[] { + const maxLogit = Math.max(...logits); + const exponents = logits.map((logit) => Math.exp(logit - maxLogit)); + const sumOfExponents = exponents.reduce((sum, exp) => sum + exp, 0); + return Array.from(exponents.map((exp) => exp / sumOfExponents)); +} // 配置参数 const sentenceTransformerModelName = "alikia2x/jina-embedding-v3-m2v-1024"; diff --git a/packages/crawler/ml/manager.ts b/packages/crawler/ml/manager.ts new file mode 100644 index 0000000..42f783e --- /dev/null +++ b/packages/crawler/ml/manager.ts @@ -0,0 +1,37 @@ +import * as ort from "onnxruntime"; +import logger from "log/logger.ts"; +import { WorkerError } from "mq/schema.ts"; + +export class AIManager { + public sessions: { [key: string]: ort.InferenceSession } = {}; + public models: { [key: string]: string } = {}; + + constructor() { + } + + public async init() { + const modelKeys = Object.keys(this.models); + for (const key of modelKeys) { + try { + this.sessions[key] = await ort.InferenceSession.create(this.models[key]); + logger.log(`Model ${key} initialized`, "ml"); + } catch (error) { + throw new WorkerError(error as Error, "ml", "fn:init"); + } + } + } + + public getModelSession(key: string): ort.InferenceSession { + if (this.sessions[key] === undefined) { + throw new WorkerError(new Error(`Model ${key} not found / not initialized.`), "ml", "fn:getModelSession"); + } + return this.sessions[key]; + } + + public softmax(logits: Float32Array): number[] { + const maxLogit = Math.max(...logits); + const exponents = logits.map((logit) => Math.exp(logit - maxLogit)); + const sumOfExponents = exponents.reduce((sum, exp) => sum + exp, 0); + return Array.from(exponents.map((exp) => exp / sumOfExponents)); + } +} diff --git a/lib/ml/quant_benchmark.ts b/packages/crawler/ml/quant_benchmark.ts similarity index 94% rename from lib/ml/quant_benchmark.ts rename to packages/crawler/ml/quant_benchmark.ts index bcc5044..aab6308 100644 --- a/lib/ml/quant_benchmark.ts +++ b/packages/crawler/ml/quant_benchmark.ts @@ -1,6 +1,12 @@ import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers"; import * as ort from "onnxruntime"; -import { softmax } from "lib/ml/filter_inference.ts"; + +function softmax(logits: Float32Array): number[] { + const maxLogit = Math.max(...logits); + const exponents = logits.map((logit) => Math.exp(logit - maxLogit)); + const sumOfExponents = exponents.reduce((sum, exp) => sum + exp, 0); + return Array.from(exponents.map((exp) => exp / sumOfExponents)); +} // 配置参数 const sentenceTransformerModelName = "alikia2x/jina-embedding-v3-m2v-1024"; diff --git a/lib/mq/exec/classifyVideo.ts b/packages/crawler/mq/exec/classifyVideo.ts similarity index 69% rename from lib/mq/exec/classifyVideo.ts rename to packages/crawler/mq/exec/classifyVideo.ts index 3541892..c813b7b 100644 --- a/lib/mq/exec/classifyVideo.ts +++ b/packages/crawler/mq/exec/classifyVideo.ts @@ -1,12 +1,14 @@ import { Job } from "bullmq"; -import { db } from "lib/db/init.ts"; -import { getUnlabelledVideos, getVideoInfoFromAllData, insertVideoLabel } from "lib/db/allData.ts"; -import { classifyVideo } from "lib/ml/filter_inference.ts"; -import { ClassifyVideoQueue } from "lib/mq/index.ts"; -import logger from "lib/log/logger.ts"; -import { lockManager } from "lib/mq/lockManager.ts"; -import { aidExistsInSongs } from "lib/db/songs.ts"; -import { insertIntoSongs } from "lib/mq/task/collectSongs.ts"; +import { db } from "db/init.ts"; +import { getUnlabelledVideos, getVideoInfoFromAllData, insertVideoLabel } from "db/allData.ts"; +import Akari from "ml/akari.ts"; +import { ClassifyVideoQueue } from "mq/index.ts"; +import logger from "log/logger.ts"; +import { lockManager } from "mq/lockManager.ts"; +import { aidExistsInSongs } from "db/songs.ts"; +import { insertIntoSongs } from "mq/task/collectSongs.ts"; +import { scheduleSnapshot } from "db/snapshotSchedule.ts"; +import { MINUTE } from "$std/datetime/constants.ts"; export const classifyVideoWorker = async (job: Job) => { const client = await db.connect(); @@ -19,14 +21,15 @@ export const classifyVideoWorker = async (job: Job) => { const title = videoInfo.title?.trim() || "untitled"; const description = videoInfo.description?.trim() || "N/A"; const tags = videoInfo.tags?.trim() || "empty"; - const label = await classifyVideo(title, description, tags, aid); + const label = await Akari.classifyVideo(title, description, tags, aid); if (label == -1) { logger.warn(`Failed to classify video ${aid}`, "ml"); } await insertVideoLabel(client, aid, label); const exists = await aidExistsInSongs(client, aid); - if (!exists) { + if (!exists && label !== 0) { + await scheduleSnapshot(client, aid, "new", Date.now() + 10 * MINUTE, true); await insertIntoSongs(client, aid); } diff --git a/lib/mq/exec/getLatestVideos.ts b/packages/crawler/mq/exec/getLatestVideos.ts similarity index 74% rename from lib/mq/exec/getLatestVideos.ts rename to packages/crawler/mq/exec/getLatestVideos.ts index 34b5d1a..7a19738 100644 --- a/lib/mq/exec/getLatestVideos.ts +++ b/packages/crawler/mq/exec/getLatestVideos.ts @@ -1,8 +1,8 @@ import { Job } from "bullmq"; -import { queueLatestVideos } from "lib/mq/task/queueLatestVideo.ts"; -import { db } from "lib/db/init.ts"; -import { insertVideoInfo } from "lib/mq/task/getVideoDetails.ts"; -import { collectSongs } from "lib/mq/task/collectSongs.ts"; +import { queueLatestVideos } from "mq/task/queueLatestVideo.ts"; +import { db } from "db/init.ts"; +import { insertVideoInfo } from "mq/task/getVideoDetails.ts"; +import { collectSongs } from "mq/task/collectSongs.ts"; export const getLatestVideosWorker = async (_job: Job): Promise => { const client = await db.connect(); diff --git a/packages/crawler/mq/exec/snapshotTick.ts b/packages/crawler/mq/exec/snapshotTick.ts new file mode 100644 index 0000000..876e05a --- /dev/null +++ b/packages/crawler/mq/exec/snapshotTick.ts @@ -0,0 +1,397 @@ +import { Job } from "bullmq"; +import { db } from "db/init.ts"; +import { getLatestVideoSnapshot, getVideosNearMilestone } from "db/snapshot.ts"; +import { + bulkGetVideosWithoutProcessingSchedules, + bulkScheduleSnapshot, + bulkSetSnapshotStatus, + findClosestSnapshot, + findSnapshotBefore, + getLatestSnapshot, + getSnapshotsInNextSecond, + getVideosWithoutActiveSnapshotSchedule, + hasAtLeast2Snapshots, + scheduleSnapshot, + setSnapshotStatus, + snapshotScheduleExists, + videoHasProcessingSchedule, + getBulkSnapshotsInNextSecond +} from "db/snapshotSchedule.ts"; +import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; +import { HOUR, MINUTE, SECOND, WEEK } from "$std/datetime/constants.ts"; +import logger from "log/logger.ts"; +import { SnapshotQueue } from "mq/index.ts"; +import { insertVideoSnapshot } from "mq/task/getVideoStats.ts"; +import { NetSchedulerError } from "mq/scheduler.ts"; +import { getBiliVideoStatus, setBiliVideoStatus } from "db/allData.ts"; +import { truncate } from "utils/truncate.ts"; +import { lockManager } from "mq/lockManager.ts"; +import { getSongsPublihsedAt } from "db/songs.ts"; +import { bulkGetVideoStats } from "net/bulkGetVideoStats.ts"; + +const priorityMap: { [key: string]: number } = { + "milestone": 1, + "normal": 3, +}; + +const snapshotTypeToTaskMap: { [key: string]: string } = { + "milestone": "snapshotMilestoneVideo", + "normal": "snapshotVideo", + "new": "snapshotMilestoneVideo", +}; + +export const bulkSnapshotTickWorker = async (_job: Job) => { + const client = await db.connect(); + try { + const schedules = await getBulkSnapshotsInNextSecond(client); + const count = schedules.length; + const groups = Math.ceil(count / 30); + for (let i = 0; i < groups; i++) { + const group = schedules.slice(i * 30, (i + 1) * 30); + const aids = group.map((schedule) => Number(schedule.aid)); + const filteredAids = await bulkGetVideosWithoutProcessingSchedules(client, aids); + if (filteredAids.length === 0) continue; + await bulkSetSnapshotStatus(client, filteredAids, "processing"); + const dataMap: { [key: number]: number } = {}; + for (const schedule of group) { + const id = Number(schedule.id); + dataMap[id] = Number(schedule.aid); + } + await SnapshotQueue.add("bulkSnapshotVideo", { + map: dataMap, + }, { priority: 3 }); + } + } catch (e) { + logger.error(e as Error); + } finally { + client.release(); + } +}; + +export const snapshotTickWorker = async (_job: Job) => { + const client = await db.connect(); + try { + const schedules = await getSnapshotsInNextSecond(client); + for (const schedule of schedules) { + if (await videoHasProcessingSchedule(client, Number(schedule.aid))) { + return `ALREADY_PROCESSING`; + } + let priority = 3; + if (schedule.type && priorityMap[schedule.type]) { + priority = priorityMap[schedule.type]; + } + const aid = Number(schedule.aid); + await setSnapshotStatus(client, schedule.id, "processing"); + await SnapshotQueue.add("snapshotVideo", { + aid: aid, + id: Number(schedule.id), + type: schedule.type ?? "normal", + }, { priority }); + } + } catch (e) { + logger.error(e as Error); + } finally { + client.release(); + } +}; + +export const closetMilestone = (views: number) => { + if (views < 100000) return 100000; + if (views < 1000000) return 1000000; + return 10000000; +}; + +const log = (value: number, base: number = 10) => Math.log(value) / Math.log(base); + +/* + * Returns the minimum ETA in hours for the next snapshot + * @param client - Postgres client + * @param aid - aid of the video + * @returns ETA in hours + */ +const getAdjustedShortTermETA = async (client: Client, aid: number) => { + const latestSnapshot = await getLatestSnapshot(client, aid); + // Immediately dispatch a snapshot if there is no snapshot yet + if (!latestSnapshot) return 0; + const snapshotsEnough = await hasAtLeast2Snapshots(client, aid); + if (!snapshotsEnough) return 0; + + const currentTimestamp = new Date().getTime(); + const timeIntervals = [3 * MINUTE, 20 * MINUTE, 1 * HOUR, 3 * HOUR, 6 * HOUR]; + const DELTA = 0.00001; + let minETAHours = Infinity; + + for (const timeInterval of timeIntervals) { + const date = new Date(currentTimestamp - timeInterval); + const snapshot = await findClosestSnapshot(client, aid, date); + if (!snapshot) continue; + const hoursDiff = (latestSnapshot.created_at - snapshot.created_at) / HOUR; + const viewsDiff = latestSnapshot.views - snapshot.views; + if (viewsDiff <= 0) continue; + const speed = viewsDiff / (hoursDiff + DELTA); + const target = closetMilestone(latestSnapshot.views); + const viewsToIncrease = target - latestSnapshot.views; + const eta = viewsToIncrease / (speed + DELTA); + let factor = log(2.97 / log(viewsToIncrease + 1), 1.14); + factor = truncate(factor, 3, 100); + const adjustedETA = eta / factor; + if (adjustedETA < minETAHours) { + minETAHours = adjustedETA; + } + } + + if (isNaN(minETAHours)) { + minETAHours = Infinity; + } + + return minETAHours; +}; + +export const collectMilestoneSnapshotsWorker = async (_job: Job) => { + const client = await db.connect(); + try { + const videos = await getVideosNearMilestone(client); + for (const video of videos) { + const aid = Number(video.aid); + const eta = await getAdjustedShortTermETA(client, aid); + if (eta > 72) continue; + const now = Date.now(); + const scheduledNextSnapshotDelay = eta * HOUR; + const maxInterval = 4 * HOUR; + const minInterval = 1 * SECOND; + const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval); + const targetTime = now + delay; + await scheduleSnapshot(client, aid, "milestone", targetTime); + } + } catch (e) { + logger.error(e as Error, "mq", "fn:collectMilestoneSnapshotsWorker"); + } finally { + client.release(); + } +}; + +const getRegularSnapshotInterval = async (client: Client, aid: number) => { + const now = Date.now(); + const date = new Date(now - 24 * HOUR); + let oldSnapshot = await findSnapshotBefore(client, aid, date); + if (!oldSnapshot) oldSnapshot = await findClosestSnapshot(client, aid, date); + const latestSnapshot = await getLatestSnapshot(client, aid); + if (!oldSnapshot || !latestSnapshot) return 0; + if (oldSnapshot.created_at === latestSnapshot.created_at) return 0; + const hoursDiff = (latestSnapshot.created_at - oldSnapshot.created_at) / HOUR; + if (hoursDiff < 8) return 24; + const viewsDiff = latestSnapshot.views - oldSnapshot.views; + if (viewsDiff === 0) return 72; + const speedPerDay = viewsDiff / (hoursDiff + 0.001) * 24; + if (speedPerDay < 6) return 36; + if (speedPerDay < 120) return 24; + if (speedPerDay < 320) return 12; + return 6; +}; + +export const regularSnapshotsWorker = async (_job: Job) => { + const client = await db.connect(); + const startedAt = Date.now(); + if (await lockManager.isLocked("dispatchRegularSnapshots")) { + logger.log("dispatchRegularSnapshots is already running", "mq"); + client.release(); + return; + } + await lockManager.acquireLock("dispatchRegularSnapshots", 30 * 60); + try { + const aids = await getVideosWithoutActiveSnapshotSchedule(client); + for (const rawAid of aids) { + const aid = Number(rawAid); + const latestSnapshot = await getLatestVideoSnapshot(client, aid); + const now = Date.now(); + const lastSnapshotedAt = latestSnapshot?.time ?? now; + const interval = await getRegularSnapshotInterval(client, aid); + logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq"); + const targetTime = truncate(lastSnapshotedAt + interval * HOUR, now + 1, now + 100000 * WEEK); + await scheduleSnapshot(client, aid, "normal", targetTime); + if (now - startedAt > 25 * MINUTE) { + return; + } + } + } catch (e) { + logger.error(e as Error, "mq", "fn:regularSnapshotsWorker"); + } finally { + lockManager.releaseLock("dispatchRegularSnapshots"); + client.release(); + } +}; + +export const takeBulkSnapshotForVideosWorker = async (job: Job) => { + const dataMap: { [key: number]: number } = job.data.map; + const ids = Object.keys(dataMap).map((id) => Number(id)); + const aidsToFetch: number[] = []; + const client = await db.connect(); + try { + for (const id of ids) { + const aid = Number(dataMap[id]); + const exists = await snapshotScheduleExists(client, id); + if (!exists) { + continue; + } + aidsToFetch.push(aid); + } + const data = await bulkGetVideoStats(aidsToFetch); + if (typeof data === "number") { + await bulkSetSnapshotStatus(client, ids, "failed"); + await bulkScheduleSnapshot(client, aidsToFetch, "normal", Date.now() + 15 * SECOND); + return `GET_BILI_STATUS_${data}`; + } + for (const video of data) { + const aid = video.id; + const stat = video.cnt_info; + const views = stat.play; + const danmakus = stat.danmaku; + const replies = stat.reply; + const likes = stat.thumb_up; + const coins = stat.coin; + const shares = stat.share; + const favorites = stat.collect; + const query: string = ` + INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + `; + await client.queryObject( + query, + [aid, views, danmakus, replies, likes, coins, shares, favorites], + ); + + logger.log(`Taken snapshot for video ${aid} in bulk.`, "net", "fn:takeBulkSnapshotForVideosWorker"); + } + await bulkSetSnapshotStatus(client, ids, "completed"); + for (const aid of aidsToFetch) { + const interval = await getRegularSnapshotInterval(client, aid); + logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq"); + await scheduleSnapshot(client, aid, "normal", Date.now() + interval * HOUR); + } + return `DONE`; + } catch (e) { + if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") { + logger.warn( + `No available proxy for bulk request now.`, + "mq", + "fn:takeBulkSnapshotForVideosWorker", + ); + await bulkSetSnapshotStatus(client, ids, "completed"); + await bulkScheduleSnapshot(client, aidsToFetch, "normal", Date.now() + 2 * MINUTE); + return; + } + logger.error(e as Error, "mq", "fn:takeBulkSnapshotForVideosWorker"); + await bulkSetSnapshotStatus(client, ids, "failed"); + } + finally { + client.release(); + } +}; + +export const takeSnapshotForVideoWorker = async (job: Job) => { + const id = job.data.id; + const aid = Number(job.data.aid); + const type = job.data.type; + const task = snapshotTypeToTaskMap[type] ?? "snapshotVideo"; + const client = await db.connect(); + const retryInterval = type === "milestone" ? 5 * SECOND : 2 * MINUTE; + const exists = await snapshotScheduleExists(client, id); + if (!exists) { + client.release(); + return; + } + const status = await getBiliVideoStatus(client, aid); + if (status !== 0) { + client.release(); + return `REFUSE_WORKING_BILI_STATUS_${status}`; + } + try { + await setSnapshotStatus(client, id, "processing"); + const stat = await insertVideoSnapshot(client, aid, task); + if (typeof stat === "number") { + await setBiliVideoStatus(client, aid, stat); + await setSnapshotStatus(client, id, "completed"); + return `GET_BILI_STATUS_${stat}`; + } + await setSnapshotStatus(client, id, "completed"); + if (type === "normal") { + const interval = await getRegularSnapshotInterval(client, aid); + logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq"); + await scheduleSnapshot(client, aid, type, Date.now() + interval * HOUR); + return `DONE`; + } else if (type === "new") { + const publihsedAt = await getSongsPublihsedAt(client, aid); + const timeSincePublished = stat.time - publihsedAt!; + const viewsPerHour = stat.views / timeSincePublished * HOUR; + if (timeSincePublished > 48 * HOUR) { + return `DONE`; + } + if (timeSincePublished > 2 * HOUR && viewsPerHour < 10) { + return `DONE`; + } + let intervalMins = 240; + if (viewsPerHour > 50) { + intervalMins = 120; + } + if (viewsPerHour > 100) { + intervalMins = 60; + } + if (viewsPerHour > 1000) { + intervalMins = 15; + } + await scheduleSnapshot(client, aid, type, Date.now() + intervalMins * MINUTE, true); + } + if (type !== "milestone") return `DONE`; + const eta = await getAdjustedShortTermETA(client, aid); + if (eta > 72) return "ETA_TOO_LONG"; + const now = Date.now(); + const targetTime = now + eta * HOUR; + await scheduleSnapshot(client, aid, type, targetTime); + return `DONE`; + } catch (e) { + if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") { + logger.warn( + `No available proxy for aid ${job.data.aid}.`, + "mq", + "fn:takeSnapshotForVideoWorker", + ); + await setSnapshotStatus(client, id, "completed"); + await scheduleSnapshot(client, aid, type, Date.now() + retryInterval); + return; + } + logger.error(e as Error, "mq", "fn:takeSnapshotForVideoWorker"); + await setSnapshotStatus(client, id, "failed"); + } finally { + client.release(); + } +}; + +export const scheduleCleanupWorker = async (_job: Job) => { + const client = await db.connect(); + try { + const query = ` + SELECT id, aid, type + FROM snapshot_schedule + WHERE status IN ('pending', 'processing') + AND started_at < NOW() - INTERVAL '5 minutes' + `; + const { rows } = await client.queryObject<{ id: bigint; aid: bigint; type: string }>(query); + if (rows.length === 0) return; + for (const row of rows) { + const id = Number(row.id); + const aid = Number(row.aid); + const type = row.type; + await setSnapshotStatus(client, id, "timeout"); + await scheduleSnapshot(client, aid, type, Date.now() + 10 * SECOND); + logger.log( + `Schedule ${id} has no response received for 5 minutes, rescheduled.`, + "mq", + "fn:scheduleCleanupWorker", + ); + } + } catch (e) { + logger.error(e as Error, "mq", "fn:scheduleCleanupWorker"); + } finally { + client.release(); + } +}; diff --git a/packages/crawler/mq/executors.ts b/packages/crawler/mq/executors.ts new file mode 100644 index 0000000..1e486e1 --- /dev/null +++ b/packages/crawler/mq/executors.ts @@ -0,0 +1 @@ +export * from "mq/exec/getLatestVideos.ts"; diff --git a/lib/mq/index.ts b/packages/crawler/mq/index.ts similarity index 100% rename from lib/mq/index.ts rename to packages/crawler/mq/index.ts diff --git a/packages/crawler/mq/init.ts b/packages/crawler/mq/init.ts new file mode 100644 index 0000000..4a302d1 --- /dev/null +++ b/packages/crawler/mq/init.ts @@ -0,0 +1,67 @@ +import { MINUTE, SECOND } from "$std/datetime/constants.ts"; +import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "mq/index.ts"; +import logger from "log/logger.ts"; +import { initSnapshotWindowCounts } from "db/snapshotSchedule.ts"; +import { db } from "db/init.ts"; +import { redis } from "db/redis.ts"; + +export async function initMQ() { + const client = await db.connect(); + try { + await initSnapshotWindowCounts(client, redis); + + await LatestVideosQueue.upsertJobScheduler("getLatestVideos", { + every: 1 * MINUTE, + immediately: true, + }); + + await ClassifyVideoQueue.upsertJobScheduler("classifyVideos", { + every: 5 * MINUTE, + immediately: true, + }); + + await LatestVideosQueue.upsertJobScheduler("collectSongs", { + every: 3 * MINUTE, + immediately: true, + }); + + await SnapshotQueue.upsertJobScheduler("snapshotTick", { + every: 1 * SECOND, + immediately: true, + }, { + opts: { + removeOnComplete: 1, + removeOnFail: 1, + }, + }); + + await SnapshotQueue.upsertJobScheduler("bulkSnapshotTick", { + every: 15 * SECOND, + immediately: true, + }, { + opts: { + removeOnComplete: 1, + removeOnFail: 1, + }, + }); + + await SnapshotQueue.upsertJobScheduler("collectMilestoneSnapshots", { + every: 5 * MINUTE, + immediately: true, + }); + + await SnapshotQueue.upsertJobScheduler("dispatchRegularSnapshots", { + every: 30 * MINUTE, + immediately: true, + }); + + await SnapshotQueue.upsertJobScheduler("scheduleCleanup", { + every: 30 * MINUTE, + immediately: true, + }); + + logger.log("Message queue initialized."); + } finally { + client.release(); + } +} diff --git a/lib/mq/lockManager.ts b/packages/crawler/mq/lockManager.ts similarity index 97% rename from lib/mq/lockManager.ts rename to packages/crawler/mq/lockManager.ts index f83b148..e0c7f8a 100644 --- a/lib/mq/lockManager.ts +++ b/packages/crawler/mq/lockManager.ts @@ -1,5 +1,5 @@ import { Redis } from "ioredis"; -import { redis } from "lib/db/redis.ts"; +import { redis } from "db/redis.ts"; class LockManager { private redis: Redis; diff --git a/lib/mq/rateLimiter.ts b/packages/crawler/mq/rateLimiter.ts similarity index 96% rename from lib/mq/rateLimiter.ts rename to packages/crawler/mq/rateLimiter.ts index 7f62547..aba7c3e 100644 --- a/lib/mq/rateLimiter.ts +++ b/packages/crawler/mq/rateLimiter.ts @@ -1,4 +1,4 @@ -import { SlidingWindow } from "lib/mq/slidingWindow.ts"; +import { SlidingWindow } from "mq/slidingWindow.ts"; export interface RateLimiterConfig { window: SlidingWindow; diff --git a/lib/mq/scheduler.ts b/packages/crawler/mq/scheduler.ts similarity index 85% rename from lib/mq/scheduler.ts rename to packages/crawler/mq/scheduler.ts index 00c3a4e..0e8c036 100644 --- a/lib/mq/scheduler.ts +++ b/packages/crawler/mq/scheduler.ts @@ -1,10 +1,9 @@ -import logger from "lib/log/logger.ts"; -import { RateLimiter, RateLimiterConfig } from "lib/mq/rateLimiter.ts"; -import { SlidingWindow } from "lib/mq/slidingWindow.ts"; -import { redis } from "lib/db/redis.ts"; +import logger from "log/logger.ts"; +import { RateLimiter, RateLimiterConfig } from "mq/rateLimiter.ts"; +import { SlidingWindow } from "mq/slidingWindow.ts"; +import { redis } from "db/redis.ts"; import Redis from "ioredis"; import { SECOND } from "$std/datetime/constants.ts"; -import { randomUUID } from "node:crypto"; interface Proxy { type: string; @@ -21,7 +20,7 @@ interface ProxiesMap { } type NetSchedulerErrorCode = - | "NO_AVAILABLE_PROXY" + | "NO_PROXY_AVAILABLE" | "PROXY_RATE_LIMITED" | "PROXY_NOT_FOUND" | "FETCH_ERROR" @@ -143,10 +142,10 @@ class NetScheduler { * @param {string} method - The HTTP method to use for the request. Default is "GET". * @returns {Promise} - A promise that resolves to the response body. * @throws {NetSchedulerError} - The error will be thrown in following cases: - * - No available proxy currently: with error code NO_AVAILABLE_PROXY - * - Proxy is under rate limit: with error code PROXY_RATE_LIMITED - * - The native `fetch` function threw an error: with error code FETCH_ERROR - * - The proxy type is not supported: with error code NOT_IMPLEMENTED + * - No proxy is available currently: with error code `NO_PROXY_AVAILABLE` + * - The native `fetch` function threw an error: with error code `FETCH_ERROR` + * - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR` + * - The proxy type is not supported: with error code `NOT_IMPLEMENTED` */ async request(url: string, task: string, method: string = "GET"): Promise { // find a available proxy @@ -156,7 +155,7 @@ class NetScheduler { return await this.proxyRequest(url, proxyName, task, method); } } - throw new NetSchedulerError("No available proxy currently.", "NO_AVAILABLE_PROXY"); + throw new NetSchedulerError("No proxy is available currently.", "NO_PROXY_AVAILABLE"); } /* @@ -168,10 +167,11 @@ class NetScheduler { * @param {boolean} force - If true, the request will be made even if the proxy is rate limited. Default is false. * @returns {Promise} - A promise that resolves to the response body. * @throws {NetSchedulerError} - The error will be thrown in following cases: - * - Proxy not found: with error code PROXY_NOT_FOUND - * - Proxy is under rate limit: with error code PROXY_RATE_LIMITED - * - The native `fetch` function threw an error: with error code FETCH_ERROR - * - The proxy type is not supported: with error code NOT_IMPLEMENTED + * - Proxy not found: with error code `PROXY_NOT_FOUND` + * - Proxy is under rate limit: with error code `PROXY_RATE_LIMITED` + * - The native `fetch` function threw an error: with error code `FETCH_ERROR` + * - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR` + * - The proxy type is not supported: with error code `NOT_IMPLEMENTED` */ async proxyRequest( url: string, @@ -255,8 +255,6 @@ class NetScheduler { } private async alicloudFcRequest(url: string, region: string): Promise { - let rawOutput: null | Uint8Array = null; - let rawErr: null | Uint8Array = null; try { const decoder = new TextDecoder(); const output = await new Deno.Command("aliyun", { @@ -280,15 +278,9 @@ class NetScheduler { `CVSA-${region}`, ], }).output(); - rawOutput = output.stdout; - rawErr = output.stderr; const out = decoder.decode(output.stdout); const rawData = JSON.parse(out); if (rawData.statusCode !== 200) { - const fileId = randomUUID(); - await Deno.writeFile(`./logs/files/${fileId}.stdout`, output.stdout); - await Deno.writeFile(`./logs/files/${fileId}.stderr`, output.stderr); - logger.log(`Returned non-200 status code. Raw ouput saved to ./logs/files/${fileId}.stdout/stderr`, "net", "fn:alicloudFcRequest") throw new NetSchedulerError( `Error proxying ${url} to ali-fc region ${region}, code: ${rawData.statusCode}.`, "ALICLOUD_PROXY_ERR", @@ -297,12 +289,6 @@ class NetScheduler { return JSON.parse(JSON.parse(rawData.body)) as R; } } catch (e) { - if (rawOutput !== null || rawErr !== null) { - const fileId = randomUUID(); - rawOutput && await Deno.writeFile(`./logs/files/${fileId}.stdout`, rawOutput); - rawErr && await Deno.writeFile(`./logs/files/${fileId}.stderr`, rawErr); - logger.log(`Error occurred. Raw ouput saved to ./logs/files/${fileId}.stdout/stderr`, "net", "fn:alicloudFcRequest") - } logger.error(e as Error, "net", "fn:alicloudFcRequest"); throw new NetSchedulerError(`Unhandled error: Cannot proxy ${url} to ali-fc.`, "ALICLOUD_PROXY_ERR", e); } @@ -347,6 +333,18 @@ const biliLimiterConfig: RateLimiterConfig[] = [ }, ]; +const bili_test = [...biliLimiterConfig]; +bili_test[0].max = 10; +bili_test[1].max = 36; +bili_test[2].max = 150; +bili_test[3].max = 1000; + +const bili_strict = [...biliLimiterConfig]; +bili_strict[0].max = 1; +bili_strict[1].max = 4; +bili_strict[2].max = 12; +bili_strict[3].max = 100; + /* Execution order for setup: @@ -361,7 +359,8 @@ Execution order for setup: - Call after addProxy and addTask. Configures rate limiters specifically for tasks and their associated proxies. - Depends on tasks and proxies being defined to apply limiters correctly. 4. setProviderLimiter(providerName, config): - - Call after addProxy and addTask. Sets rate limiters at the provider level, affecting all proxies used by tasks of that provider. + - Call after addProxy and addTask. + - It sets rate limiters at the provider level, affecting all proxies used by tasks of that provider. - Depends on tasks and proxies being defined to identify which proxies to apply provider-level limiters to. In summary: addProxy -> addTask -> (setTaskLimiter and/or setProviderLimiter). @@ -377,7 +376,15 @@ for (const region of regions) { netScheduler.addTask("getVideoInfo", "bilibili", "all"); netScheduler.addTask("getLatestVideos", "bilibili", "all"); netScheduler.addTask("snapshotMilestoneVideo", "bilibili", regions.map((region) => `alicloud-${region}`)); -netScheduler.addTask("snapshotVideo", "bilibili", [ +netScheduler.addTask("snapshotVideo", "bili_test", [ + "alicloud-qingdao", + "alicloud-shanghai", + "alicloud-zhangjiakou", + "alicloud-chengdu", + "alicloud-shenzhen", + "alicloud-hohhot", +]); +netScheduler.addTask("bulkSnapshot", "bili_strict", [ "alicloud-qingdao", "alicloud-shanghai", "alicloud-zhangjiakou", @@ -388,7 +395,10 @@ netScheduler.addTask("snapshotVideo", "bilibili", [ netScheduler.setTaskLimiter("getVideoInfo", videoInfoRateLimiterConfig); netScheduler.setTaskLimiter("getLatestVideos", null); netScheduler.setTaskLimiter("snapshotMilestoneVideo", null); -netScheduler.setTaskLimiter("snapshotVideo", videoInfoRateLimiterConfig); +netScheduler.setTaskLimiter("snapshotVideo", null); +netScheduler.setTaskLimiter("bulkSnapshot", null); netScheduler.setProviderLimiter("bilibili", biliLimiterConfig); +netScheduler.setProviderLimiter("bili_test", bili_test); +netScheduler.setProviderLimiter("bili_strict", bili_strict); export default netScheduler; diff --git a/lib/mq/schema.ts b/packages/crawler/mq/schema.ts similarity index 100% rename from lib/mq/schema.ts rename to packages/crawler/mq/schema.ts diff --git a/lib/mq/slidingWindow.ts b/packages/crawler/mq/slidingWindow.ts similarity index 100% rename from lib/mq/slidingWindow.ts rename to packages/crawler/mq/slidingWindow.ts diff --git a/lib/mq/task/collectSongs.ts b/packages/crawler/mq/task/collectSongs.ts similarity index 55% rename from lib/mq/task/collectSongs.ts rename to packages/crawler/mq/task/collectSongs.ts index 04e033d..389ca06 100644 --- a/lib/mq/task/collectSongs.ts +++ b/packages/crawler/mq/task/collectSongs.ts @@ -1,6 +1,8 @@ import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; -import { aidExistsInSongs, getNotCollectedSongs } from "lib/db/songs.ts"; -import logger from "lib/log/logger.ts"; +import { aidExistsInSongs, getNotCollectedSongs } from "db/songs.ts"; +import logger from "log/logger.ts"; +import { scheduleSnapshot } from "db/snapshotSchedule.ts"; +import { MINUTE } from "$std/datetime/constants.ts"; export async function collectSongs(client: Client) { const aids = await getNotCollectedSongs(client); @@ -8,6 +10,7 @@ export async function collectSongs(client: Client) { const exists = await aidExistsInSongs(client, aid); if (exists) continue; await insertIntoSongs(client, aid); + await scheduleSnapshot(client, aid, "new", Date.now() + 10 * MINUTE, true); logger.log(`Video ${aid} was added into the songs table.`, "mq", "fn:collectSongs"); } } @@ -15,12 +18,11 @@ export async function collectSongs(client: Client) { export async function insertIntoSongs(client: Client, aid: number) { await client.queryObject( ` - INSERT INTO songs (aid, bvid, published_at, duration) + INSERT INTO songs (aid, published_at, duration) VALUES ( $1, - (SELECT bvid FROM all_data WHERE aid = $1), - (SELECT published_at FROM all_data WHERE aid = $1), - (SELECT duration FROM all_data WHERE aid = $1) + (SELECT published_at FROM bilibili_metadata WHERE aid = $1), + (SELECT duration FROM bilibili_metadata WHERE aid = $1) ) ON CONFLICT DO NOTHING `, diff --git a/lib/mq/task/getVideoDetails.ts b/packages/crawler/mq/task/getVideoDetails.ts similarity index 58% rename from lib/mq/task/getVideoDetails.ts rename to packages/crawler/mq/task/getVideoDetails.ts index ead8dd0..9b675e5 100644 --- a/lib/mq/task/getVideoDetails.ts +++ b/packages/crawler/mq/task/getVideoDetails.ts @@ -1,9 +1,10 @@ import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; -import { getVideoDetails } from "lib/net/getVideoDetails.ts"; -import { formatTimestampToPsql } from "lib/utils/formatTimestampToPostgre.ts"; -import logger from "lib/log/logger.ts"; -import { ClassifyVideoQueue } from "lib/mq/index.ts"; -import { userExistsInBiliUsers, videoExistsInAllData } from "lib/db/allData.ts"; +import { getVideoDetails } from "net/getVideoDetails.ts"; +import { formatTimestampToPsql } from "utils/formatTimestampToPostgre.ts"; +import logger from "log/logger.ts"; +import { ClassifyVideoQueue } from "mq/index.ts"; +import { userExistsInBiliUsers, videoExistsInAllData } from "db/allData.ts"; +import { HOUR, SECOND } from "$std/datetime/constants.ts"; export async function insertVideoInfo(client: Client, aid: number) { const videoExists = await videoExistsInAllData(client, aid); @@ -18,25 +19,25 @@ export async function insertVideoInfo(client: Client, aid: number) { const desc = data.View.desc; const uid = data.View.owner.mid; const tags = data.Tags - .filter((tag) => tag.tag_type in ["old_channel", "topic"]) + .filter((tag) => !["old_channel", "topic"].indexOf(tag.tag_type)) .map((tag) => tag.tag_name).join(","); const title = data.View.title; - const published_at = formatTimestampToPsql(data.View.pubdate); + const published_at = formatTimestampToPsql(data.View.pubdate * SECOND + 8 * HOUR); const duration = data.View.duration; await client.queryObject( - `INSERT INTO all_data (aid, bvid, description, uid, tags, title, published_at, duration) + `INSERT INTO bilibili_metadata (aid, bvid, description, uid, tags, title, published_at, duration) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, [aid, bvid, desc, uid, tags, title, published_at, duration], ); const userExists = await userExistsInBiliUsers(client, aid); if (!userExists) { await client.queryObject( - `INSERT INTO bili_user (uid, username, "desc", fans) VALUES ($1, $2, $3, $4)`, + `INSERT INTO bilibili_user (uid, username, "desc", fans) VALUES ($1, $2, $3, $4)`, [uid, data.View.owner.name, data.Card.card.sign, data.Card.follower], ); } else { await client.queryObject( - `UPDATE bili_user SET fans = $1 WHERE uid = $2`, + `UPDATE bilibili_user SET fans = $1 WHERE uid = $2`, [data.Card.follower, uid], ); } diff --git a/packages/crawler/mq/task/getVideoStats.ts b/packages/crawler/mq/task/getVideoStats.ts new file mode 100644 index 0000000..34b6c42 --- /dev/null +++ b/packages/crawler/mq/task/getVideoStats.ts @@ -0,0 +1,58 @@ +import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; +import { getVideoInfo } from "net/getVideoInfo.ts"; +import { LatestSnapshotType } from "db/schema.d.ts"; +import logger from "log/logger.ts"; + +/* + * Fetch video stats from bilibili API and insert into database + * @returns {Promise} + * A number indicating the status code when receiving non-0 status code from bilibili, + * otherwise an VideoSnapshot object containing the video stats + * @throws {NetSchedulerError} - The error will be thrown in following cases: + * - No proxy is available currently: with error code `NO_PROXY_AVAILABLE` + * - The native `fetch` function threw an error: with error code `FETCH_ERROR` + * - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR` + */ +export async function insertVideoSnapshot( + client: Client, + aid: number, + task: string, +): Promise { + const data = await getVideoInfo(aid, task); + if (typeof data == "number") { + return data; + } + const time = new Date().getTime(); + const views = data.stat.view; + const danmakus = data.stat.danmaku; + const replies = data.stat.reply; + const likes = data.stat.like; + const coins = data.stat.coin; + const shares = data.stat.share; + const favorites = data.stat.favorite; + + const query: string = ` + INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + `; + await client.queryObject( + query, + [aid, views, danmakus, replies, likes, coins, shares, favorites], + ); + + logger.log(`Taken snapshot for video ${aid}.`, "net", "fn:insertVideoSnapshot"); + + const snapshot: LatestSnapshotType = { + aid, + views, + danmakus, + replies, + likes, + coins, + shares, + favorites, + time, + }; + + return snapshot; +} diff --git a/lib/mq/task/queueLatestVideo.ts b/packages/crawler/mq/task/queueLatestVideo.ts similarity index 82% rename from lib/mq/task/queueLatestVideo.ts rename to packages/crawler/mq/task/queueLatestVideo.ts index d2e938b..d8b3993 100644 --- a/lib/mq/task/queueLatestVideo.ts +++ b/packages/crawler/mq/task/queueLatestVideo.ts @@ -1,10 +1,10 @@ import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; -import { getLatestVideoAids } from "lib/net/getLatestVideoAids.ts"; -import { videoExistsInAllData } from "lib/db/allData.ts"; -import { sleep } from "lib/utils/sleep.ts"; +import { getLatestVideoAids } from "net/getLatestVideoAids.ts"; +import { videoExistsInAllData } from "db/allData.ts"; +import { sleep } from "utils/sleep.ts"; import { SECOND } from "$std/datetime/constants.ts"; -import logger from "lib/log/logger.ts"; -import { LatestVideosQueue } from "lib/mq/index.ts"; +import logger from "log/logger.ts"; +import { LatestVideosQueue } from "mq/index.ts"; export async function queueLatestVideos( client: Client, diff --git a/lib/net/bilibili.d.ts b/packages/crawler/net/bilibili.d.ts similarity index 91% rename from lib/net/bilibili.d.ts rename to packages/crawler/net/bilibili.d.ts index 209b566..19e1ba2 100644 --- a/lib/net/bilibili.d.ts +++ b/packages/crawler/net/bilibili.d.ts @@ -9,6 +9,25 @@ export type VideoListResponse = BaseResponse; export type VideoDetailsResponse = BaseResponse; export type VideoTagsResponse = BaseResponse; export type VideoInfoResponse = BaseResponse; +export type MediaListInfoResponse = BaseResponse; + +export type MediaListInfoData = MediaListInfoItem[]; + + +export interface MediaListInfoItem { + attr: number; + bvid: string; + id: number; + cnt_info: { + coin: number; + collect: number; + danmaku: number; + play: number; + reply: number; + share: number; + thumb_up: number; + } +} interface VideoInfoData { bvid: string; @@ -26,8 +45,8 @@ interface VideoInfoData { mid: number; name: string; face: string; - }, - stat: VideoStats, + }; + stat: VideoStats; } interface VideoDetailsData { diff --git a/packages/crawler/net/bulkGetVideoStats.ts b/packages/crawler/net/bulkGetVideoStats.ts new file mode 100644 index 0000000..2b0c7f2 --- /dev/null +++ b/packages/crawler/net/bulkGetVideoStats.ts @@ -0,0 +1,27 @@ +import netScheduler from "mq/scheduler.ts"; +import { MediaListInfoData, MediaListInfoResponse } from "net/bilibili.d.ts"; +import logger from "log/logger.ts"; + +/* + * Bulk fetch video metadata from bilibili API + * @param {number[]} aids - The aid list to fetch + * @returns {Promise} MediaListInfoData or the error code returned by bilibili API + * @throws {NetSchedulerError} - The error will be thrown in following cases: + * - No proxy is available currently: with error code `NO_PROXY_AVAILABLE` + * - The native `fetch` function threw an error: with error code `FETCH_ERROR` + * - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR` + */ +export async function bulkGetVideoStats(aids: number[]): Promise { + const baseURL = `https://api.bilibili.com/medialist/gateway/base/resource/infos?resources=`; + let url = baseURL; + for (const aid of aids) { + url += `${aid}:2,`; + } + const data = await netScheduler.request(url, "bulkSnapshot"); + const errMessage = `Error fetching metadata for aid list: ${aids.join(",")}:`; + if (data.code !== 0) { + logger.error(errMessage + data.code + "-" + data.message, "net", "fn:getVideoInfo"); + return data.code; + } + return data.data; +} diff --git a/lib/net/getLatestVideoAids.ts b/packages/crawler/net/getLatestVideoAids.ts similarity index 83% rename from lib/net/getLatestVideoAids.ts rename to packages/crawler/net/getLatestVideoAids.ts index 2fb44be..7dacd46 100644 --- a/lib/net/getLatestVideoAids.ts +++ b/packages/crawler/net/getLatestVideoAids.ts @@ -1,6 +1,6 @@ -import { VideoListResponse } from "lib/net/bilibili.d.ts"; -import logger from "lib/log/logger.ts"; -import netScheduler from "lib/mq/scheduler.ts"; +import { VideoListResponse } from "net/bilibili.d.ts"; +import logger from "log/logger.ts"; +import netScheduler from "mq/scheduler.ts"; export async function getLatestVideoAids(page: number = 1, pageSize: number = 10): Promise { const startFrom = 1 + pageSize * (page - 1); diff --git a/lib/net/getVideoDetails.ts b/packages/crawler/net/getVideoDetails.ts similarity index 73% rename from lib/net/getVideoDetails.ts rename to packages/crawler/net/getVideoDetails.ts index 9e421cf..d6d52c1 100644 --- a/lib/net/getVideoDetails.ts +++ b/packages/crawler/net/getVideoDetails.ts @@ -1,6 +1,6 @@ -import netScheduler from "lib/mq/scheduler.ts"; -import { VideoDetailsData, VideoDetailsResponse } from "lib/net/bilibili.d.ts"; -import logger from "lib/log/logger.ts"; +import netScheduler from "mq/scheduler.ts"; +import { VideoDetailsData, VideoDetailsResponse } from "net/bilibili.d.ts"; +import logger from "log/logger.ts"; export async function getVideoDetails(aid: number): Promise { const url = `https://api.bilibili.com/x/web-interface/view/detail?aid=${aid}`; diff --git a/packages/crawler/net/getVideoInfo.ts b/packages/crawler/net/getVideoInfo.ts new file mode 100644 index 0000000..0533c53 --- /dev/null +++ b/packages/crawler/net/getVideoInfo.ts @@ -0,0 +1,27 @@ +import netScheduler from "mq/scheduler.ts"; +import { VideoInfoData, VideoInfoResponse } from "net/bilibili.d.ts"; +import logger from "log/logger.ts"; + +/* + * Fetch video metadata from bilibili API + * @param {number} aid - The video's aid + * @param {string} task - The task name used in scheduler. It can be one of the following: + * - snapshotVideo + * - getVideoInfo + * - snapshotMilestoneVideo + * @returns {Promise} VideoInfoData or the error code returned by bilibili API + * @throws {NetSchedulerError} - The error will be thrown in following cases: + * - No proxy is available currently: with error code `NO_PROXY_AVAILABLE` + * - The native `fetch` function threw an error: with error code `FETCH_ERROR` + * - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR` + */ +export async function getVideoInfo(aid: number, task: string): Promise { + const url = `https://api.bilibili.com/x/web-interface/view?aid=${aid}`; + const data = await netScheduler.request(url, task); + const errMessage = `Error fetching metadata for ${aid}:`; + if (data.code !== 0) { + logger.error(errMessage + data.code + "-" + data.message, "net", "fn:getVideoInfo"); + return data.code; + } + return data.data; +} diff --git a/src/bullui.ts b/packages/crawler/src/bullui.ts similarity index 97% rename from src/bullui.ts rename to packages/crawler/src/bullui.ts index acf8d3f..5765540 100644 --- a/src/bullui.ts +++ b/packages/crawler/src/bullui.ts @@ -2,7 +2,7 @@ import express from "express"; import { createBullBoard } from "@bull-board/api"; import { BullMQAdapter } from "@bull-board/api/bullMQAdapter.js"; import { ExpressAdapter } from "@bull-board/express"; -import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "lib/mq/index.ts"; +import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "mq/index.ts"; const serverAdapter = new ExpressAdapter(); serverAdapter.setBasePath("/"); diff --git a/src/filterWorker.ts b/packages/crawler/src/filterWorker.ts similarity index 65% rename from src/filterWorker.ts rename to packages/crawler/src/filterWorker.ts index 8eb43d4..cb336c4 100644 --- a/src/filterWorker.ts +++ b/packages/crawler/src/filterWorker.ts @@ -1,10 +1,10 @@ -import { Job, Worker } from "bullmq"; -import { redis } from "lib/db/redis.ts"; -import logger from "lib/log/logger.ts"; -import { classifyVideosWorker, classifyVideoWorker } from "lib/mq/exec/classifyVideo.ts"; -import { WorkerError } from "lib/mq/schema.ts"; -import { lockManager } from "lib/mq/lockManager.ts"; -import { initializeModels } from "lib/ml/filter_inference.ts"; +import { ConnectionOptions, Job, Worker } from "bullmq"; +import { redis } from "db/redis.ts"; +import logger from "log/logger.ts"; +import { classifyVideosWorker, classifyVideoWorker } from "mq/exec/classifyVideo.ts"; +import { WorkerError } from "mq/schema.ts"; +import { lockManager } from "mq/lockManager.ts"; +import Akari from "ml/akari.ts"; Deno.addSignalListener("SIGINT", async () => { logger.log("SIGINT Received: Shutting down workers...", "mq"); @@ -18,7 +18,7 @@ Deno.addSignalListener("SIGTERM", async () => { Deno.exit(); }); -await initializeModels(); +Akari.init(); const filterWorker = new Worker( "classifyVideo", @@ -32,7 +32,7 @@ const filterWorker = new Worker( break; } }, - { connection: redis, concurrency: 2, removeOnComplete: { count: 1000 } }, + { connection: redis as ConnectionOptions, concurrency: 2, removeOnComplete: { count: 1000 } }, ); filterWorker.on("active", () => { diff --git a/packages/crawler/src/jobAdder.ts b/packages/crawler/src/jobAdder.ts new file mode 100644 index 0000000..3aefd24 --- /dev/null +++ b/packages/crawler/src/jobAdder.ts @@ -0,0 +1,3 @@ +import { initMQ } from "mq/init.ts"; + +await initMQ(); diff --git a/packages/crawler/src/worker.ts b/packages/crawler/src/worker.ts new file mode 100644 index 0000000..e240a0b --- /dev/null +++ b/packages/crawler/src/worker.ts @@ -0,0 +1,109 @@ +import { ConnectionOptions, Job, Worker } from "bullmq"; +import { collectSongsWorker, getLatestVideosWorker } from "mq/executors.ts"; +import { redis } from "db/redis.ts"; +import logger from "log/logger.ts"; +import { lockManager } from "mq/lockManager.ts"; +import { WorkerError } from "mq/schema.ts"; +import { getVideoInfoWorker } from "mq/exec/getLatestVideos.ts"; +import { + collectMilestoneSnapshotsWorker, + regularSnapshotsWorker, + snapshotTickWorker, + takeSnapshotForVideoWorker, + scheduleCleanupWorker, + takeBulkSnapshotForVideosWorker, + bulkSnapshotTickWorker +} from "mq/exec/snapshotTick.ts"; + +Deno.addSignalListener("SIGINT", async () => { + logger.log("SIGINT Received: Shutting down workers...", "mq"); + await latestVideoWorker.close(true); + await snapshotWorker.close(true); + Deno.exit(); +}); + +Deno.addSignalListener("SIGTERM", async () => { + logger.log("SIGTERM Received: Shutting down workers...", "mq"); + await latestVideoWorker.close(true); + await snapshotWorker.close(true); + Deno.exit(); +}); + +const latestVideoWorker = new Worker( + "latestVideos", + async (job: Job) => { + switch (job.name) { + case "getLatestVideos": + await getLatestVideosWorker(job); + break; + case "getVideoInfo": + await getVideoInfoWorker(job); + break; + case "collectSongs": + await collectSongsWorker(job); + break; + default: + break; + } + }, + { + connection: redis as ConnectionOptions, + concurrency: 6, + removeOnComplete: { count: 1440 }, + removeOnFail: { count: 0 }, + }, +); + +latestVideoWorker.on("active", () => { + logger.log("Worker (latestVideos) activated.", "mq"); +}); + +latestVideoWorker.on("error", (err) => { + const e = err as WorkerError; + logger.error(e.rawError, e.service, e.codePath); +}); + +latestVideoWorker.on("closed", async () => { + await lockManager.releaseLock("getLatestVideos"); +}); + +const snapshotWorker = new Worker( + "snapshot", + async (job: Job) => { + switch (job.name) { + case "snapshotVideo": + await takeSnapshotForVideoWorker(job); + break; + case "snapshotTick": + await snapshotTickWorker(job); + break; + case "collectMilestoneSnapshots": + await collectMilestoneSnapshotsWorker(job); + break; + case "dispatchRegularSnapshots": + await regularSnapshotsWorker(job); + break; + case "scheduleCleanup": + await scheduleCleanupWorker(job); + break; + case "bulkSnapshotVideo": + await takeBulkSnapshotForVideosWorker(job); + break; + case "bulkSnapshotTick": + await bulkSnapshotTickWorker(job); + break; + default: + break; + } + }, + { connection: redis as ConnectionOptions, concurrency: 50, removeOnComplete: { count: 2000 } }, +); + +snapshotWorker.on("error", (err) => { + const e = err as WorkerError; + logger.error(e.rawError, e.service, e.codePath); +}); + +snapshotWorker.on("closed", async () => { + await lockManager.releaseLock("dispatchRegularSnapshots"); +}); diff --git a/lib/utils/formatSeconds.ts b/packages/crawler/utils/formatSeconds.ts similarity index 63% rename from lib/utils/formatSeconds.ts rename to packages/crawler/utils/formatSeconds.ts index ffabb22..491dfd6 100644 --- a/lib/utils/formatSeconds.ts +++ b/packages/crawler/utils/formatSeconds.ts @@ -1,9 +1,9 @@ export const formatSeconds = (seconds: number) => { if (seconds < 60) { - return `${(seconds).toFixed(1)}s`; + return `${seconds.toFixed(1)}s`; } if (seconds < 3600) { - return `${Math.floor(seconds / 60)}m${seconds % 60}s`; + return `${Math.floor(seconds / 60)}m${(seconds % 60).toFixed(1)}s`; } return `${Math.floor(seconds / 3600)}h ${((seconds % 3600) / 60).toFixed(2)}m`; }; diff --git a/lib/utils/formatTimestampToPostgre.ts b/packages/crawler/utils/formatTimestampToPostgre.ts similarity index 100% rename from lib/utils/formatTimestampToPostgre.ts rename to packages/crawler/utils/formatTimestampToPostgre.ts diff --git a/lib/utils/sleep.ts b/packages/crawler/utils/sleep.ts similarity index 100% rename from lib/utils/sleep.ts rename to packages/crawler/utils/sleep.ts diff --git a/lib/utils/truncate.ts b/packages/crawler/utils/truncate.ts similarity index 100% rename from lib/utils/truncate.ts rename to packages/crawler/utils/truncate.ts diff --git a/packages/frontend/deno.json b/packages/frontend/deno.json new file mode 100644 index 0000000..e69de29 diff --git a/routes/_404.tsx b/routes/_404.tsx deleted file mode 100644 index 4628eeb..0000000 --- a/routes/_404.tsx +++ /dev/null @@ -1,27 +0,0 @@ -import { Head } from "$fresh/runtime.ts"; - -export default function Error404() { - return ( - <> - - 404 - Page not found - -
-
- the Fresh logo: a sliced lemon dripping with juice -

404 - Page not found

-

- The page you were looking for doesn't exist. -

- Go back home -
-
- - ); -} diff --git a/routes/_app.tsx b/routes/_app.tsx deleted file mode 100644 index a44414e..0000000 --- a/routes/_app.tsx +++ /dev/null @@ -1,16 +0,0 @@ -import { type PageProps } from "$fresh/server.ts"; -export default function App({ Component }: PageProps) { - return ( - - - - - cvsa - - - - - - - ); -} diff --git a/routes/api/joke.ts b/routes/api/joke.ts deleted file mode 100644 index 68b0ebe..0000000 --- a/routes/api/joke.ts +++ /dev/null @@ -1,21 +0,0 @@ -import { FreshContext } from "$fresh/server.ts"; - -// Jokes courtesy of https://punsandoneliners.com/randomness/programmer-jokes/ -const JOKES = [ - "Why do Java developers often wear glasses? They can't C#.", - "A SQL query walks into a bar, goes up to two tables and says “can I join you?”", - "Wasn't hard to crack Forrest Gump's password. 1forrest1.", - "I love pressing the F5 key. It's refreshing.", - "Called IT support and a chap from Australia came to fix my network connection. I asked “Do you come from a LAN down under?”", - "There are 10 types of people in the world. Those who understand binary and those who don't.", - "Why are assembly programmers often wet? They work below C level.", - "My favourite computer based band is the Black IPs.", - "What programme do you use to predict the music tastes of former US presidential candidates? An Al Gore Rhythm.", - "An SEO expert walked into a bar, pub, inn, tavern, hostelry, public house.", -]; - -export const handler = (_req: Request, _ctx: FreshContext): Response => { - const randomIndex = Math.floor(Math.random() * JOKES.length); - const body = JOKES[randomIndex]; - return new Response(body); -}; diff --git a/routes/greet/[name].tsx b/routes/greet/[name].tsx deleted file mode 100644 index a7a5fe1..0000000 --- a/routes/greet/[name].tsx +++ /dev/null @@ -1,5 +0,0 @@ -import { PageProps } from "$fresh/server.ts"; - -export default function Greet(props: PageProps) { - return
Hello {props.params.name}
; -} diff --git a/routes/index.tsx b/routes/index.tsx deleted file mode 100644 index 67a22a7..0000000 --- a/routes/index.tsx +++ /dev/null @@ -1,25 +0,0 @@ -import { useSignal } from "@preact/signals"; -import Counter from "../islands/Counter.tsx"; - -export default function Home() { - const count = useSignal(3); - return ( -
-
- the Fresh logo: a sliced lemon dripping with juice -

Welcome to Fresh

-

- Try updating this message in the - ./routes/index.tsx file, and refresh. -

- -
-
- ); -} diff --git a/src/jobAdder.ts b/src/jobAdder.ts deleted file mode 100644 index cb107f4..0000000 --- a/src/jobAdder.ts +++ /dev/null @@ -1,3 +0,0 @@ -import { initMQ } from "lib/mq/init.ts"; - -await initMQ(); diff --git a/src/worker.ts b/src/worker.ts deleted file mode 100644 index c79e943..0000000 --- a/src/worker.ts +++ /dev/null @@ -1,78 +0,0 @@ -import { Job, Worker } from "bullmq"; -import { collectSongsWorker, getLatestVideosWorker } from "lib/mq/executors.ts"; -import { redis } from "lib/db/redis.ts"; -import logger from "lib/log/logger.ts"; -import { lockManager } from "lib/mq/lockManager.ts"; -import { WorkerError } from "lib/mq/schema.ts"; -import { getVideoInfoWorker } from "lib/mq/exec/getLatestVideos.ts"; -import { snapshotTickWorker, takeSnapshotForMilestoneVideoWorker, takeSnapshotForVideoWorker } from "lib/mq/exec/snapshotTick.ts"; - -Deno.addSignalListener("SIGINT", async () => { - logger.log("SIGINT Received: Shutting down workers...", "mq"); - await latestVideoWorker.close(true); - Deno.exit(); -}); - -Deno.addSignalListener("SIGTERM", async () => { - logger.log("SIGTERM Received: Shutting down workers...", "mq"); - await latestVideoWorker.close(true); - Deno.exit(); -}); - -const latestVideoWorker = new Worker( - "latestVideos", - async (job: Job) => { - switch (job.name) { - case "getLatestVideos": - await getLatestVideosWorker(job); - break; - case "getVideoInfo": - await getVideoInfoWorker(job); - break; - case "collectSongs": - await collectSongsWorker(job); - break; - default: - break; - } - }, - { connection: redis, concurrency: 6, removeOnComplete: { count: 1440 }, removeOnFail: { count: 0 } }, -); - -latestVideoWorker.on("active", () => { - logger.log("Worker (latestVideos) activated.", "mq"); -}); - -latestVideoWorker.on("error", (err) => { - const e = err as WorkerError; - logger.error(e.rawError, e.service, e.codePath); -}); - -latestVideoWorker.on("closed", async () => { - await lockManager.releaseLock("getLatestVideos"); -}); - -const snapshotWorker = new Worker( - "snapshot", - async (job: Job) => { - switch (job.name) { - case "scheduleSnapshotTick": - await snapshotTickWorker(job); - break; - case "snapshotMilestoneVideo": - await takeSnapshotForMilestoneVideoWorker(job); - break; - case "snapshotVideo": - await takeSnapshotForVideoWorker(job); - break; - default: - break; - } - }, - { connection: redis, concurrency: 10, removeOnComplete: { count: 2000 } }, -); - -snapshotWorker.on("error", (err) => { - const e = err as WorkerError; - logger.error(e.rawError, e.service, e.codePath); -}) diff --git a/static/favicon.ico b/static/favicon.ico deleted file mode 100644 index 1cfaaa2..0000000 Binary files a/static/favicon.ico and /dev/null differ diff --git a/static/logo.svg b/static/logo.svg deleted file mode 100644 index ef2fbe4..0000000 --- a/static/logo.svg +++ /dev/null @@ -1,6 +0,0 @@ - - - - - - \ No newline at end of file diff --git a/static/styles.css b/static/styles.css deleted file mode 100644 index b5c61c9..0000000 --- a/static/styles.css +++ /dev/null @@ -1,3 +0,0 @@ -@tailwind base; -@tailwind components; -@tailwind utilities; diff --git a/tailwind.config.ts b/tailwind.config.ts deleted file mode 100644 index 0c790d0..0000000 --- a/tailwind.config.ts +++ /dev/null @@ -1,7 +0,0 @@ -import { type Config } from "tailwindcss"; - -export default { - content: [ - "{routes,islands,components}/**/*.{ts,tsx,js,jsx}", - ], -} satisfies Config; diff --git a/test/mq/rateLimiter.test.ts b/test/mq/rateLimiter.test.ts deleted file mode 100644 index 2f19723..0000000 --- a/test/mq/rateLimiter.test.ts +++ /dev/null @@ -1,91 +0,0 @@ -import { assertEquals } from "jsr:@std/assert"; -import { SlidingWindow } from "lib/mq/slidingWindow.ts"; -import { RateLimiter, RateLimiterConfig } from "lib/mq/rateLimiter.ts"; -import { Redis } from "npm:ioredis@5.5.0"; - -Deno.test("RateLimiter works correctly", async () => { - const redis = new Redis({ maxRetriesPerRequest: null }); - const windowSize = 5; - const maxRequests = 10; - - const slidingWindow = new SlidingWindow(redis, windowSize); - const config: RateLimiterConfig = { - window: slidingWindow, - max: maxRequests, - }; - const rateLimiter = new RateLimiter("test_event", [config]); - await rateLimiter.clear(); - - // Initial availability should be true - assertEquals(await rateLimiter.getAvailability(), true); - - // Trigger events up to the limit - for (let i = 0; i < maxRequests; i++) { - await rateLimiter.trigger(); - } - - // Availability should now be false - assertEquals(await rateLimiter.getAvailability(), false); - - // Wait for the window to slide - await new Promise((resolve) => setTimeout(resolve, windowSize * 1000 + 500)); - - // Availability should be true again - assertEquals(await rateLimiter.getAvailability(), true); - - redis.quit(); -}); - -Deno.test("Multiple configs work correctly", async () => { - const redis = new Redis({ maxRetriesPerRequest: null }); - const windowSize1 = 1; - const maxRequests1 = 2; - const windowSize2 = 5; - const maxRequests2 = 6; - - const slidingWindow1 = new SlidingWindow(redis, windowSize1); - const config1: RateLimiterConfig = { - window: slidingWindow1, - max: maxRequests1, - }; - const slidingWindow2 = new SlidingWindow(redis, windowSize2); - const config2: RateLimiterConfig = { - window: slidingWindow2, - max: maxRequests2, - }; - const rateLimiter = new RateLimiter("test_event_multi", [config1, config2]); - await rateLimiter.clear(); - - // Initial availability should be true - assertEquals(await rateLimiter.getAvailability(), true); - - // Trigger events up to the limit of the first config - for (let i = 0; i < maxRequests1; i++) { - await rateLimiter.trigger(); - } - - // Availability should now be false (due to config1) - assertEquals(await rateLimiter.getAvailability(), false); - - // Wait for the first window to slide - await new Promise((resolve) => setTimeout(resolve, windowSize1 * 1000 + 500)); - - // Availability should now be true (due to config1) - assertEquals(await rateLimiter.getAvailability(), true); - - // Trigger events up to the limit of the second config - for (let i = maxRequests1; i < maxRequests2; i++) { - await rateLimiter.trigger(); - } - - // Availability should still be false (due to config2) - assertEquals(await rateLimiter.getAvailability(), false); - - // Wait for the second window to slide - await new Promise((resolve) => setTimeout(resolve, windowSize2 * 1000 + 500)); - - // Availability should be true again - assertEquals(await rateLimiter.getAvailability(), true); - - redis.quit(); -}); diff --git a/test/mq/slidingWindow.test.ts b/test/mq/slidingWindow.test.ts deleted file mode 100644 index a749edc..0000000 --- a/test/mq/slidingWindow.test.ts +++ /dev/null @@ -1,84 +0,0 @@ -import { assertEquals } from "jsr:@std/assert"; -import { SlidingWindow } from "lib/mq/slidingWindow.ts"; -import { Redis } from "ioredis"; - -Deno.test("SlidingWindow - event and count", async () => { - const redis = new Redis({ maxRetriesPerRequest: null }); - const windowSize = 5000; // 5 seconds - const slidingWindow = new SlidingWindow(redis, windowSize); - const eventName = "test_event"; - await slidingWindow.clear(eventName); - - await slidingWindow.event(eventName); - const count = await slidingWindow.count(eventName); - - assertEquals(count, 1); - redis.quit(); -}); - -Deno.test("SlidingWindow - multiple events", async () => { - const redis = new Redis({ maxRetriesPerRequest: null }); - const windowSize = 5000; // 5 seconds - const slidingWindow = new SlidingWindow(redis, windowSize); - const eventName = "test_event"; - await slidingWindow.clear(eventName); - - await slidingWindow.event(eventName); - await slidingWindow.event(eventName); - await slidingWindow.event(eventName); - const count = await slidingWindow.count(eventName); - - assertEquals(count, 3); - redis.quit(); -}); - -Deno.test("SlidingWindow - no events", async () => { - const redis = new Redis({ maxRetriesPerRequest: null }); - const windowSize = 5000; // 5 seconds - const slidingWindow = new SlidingWindow(redis, windowSize); - const eventName = "test_event"; - await slidingWindow.clear(eventName); - - const count = await slidingWindow.count(eventName); - - assertEquals(count, 0); - redis.quit(); -}); - -Deno.test("SlidingWindow - different event names", async () => { - const redis = new Redis({ maxRetriesPerRequest: null }); - const windowSize = 5000; // 5 seconds - const slidingWindow = new SlidingWindow(redis, windowSize); - const eventName1 = "test_event_1"; - const eventName2 = "test_event_2"; - await slidingWindow.clear(eventName1); - await slidingWindow.clear(eventName2); - - await slidingWindow.event(eventName1); - await slidingWindow.event(eventName2); - - const count1 = await slidingWindow.count(eventName1); - const count2 = await slidingWindow.count(eventName2); - - assertEquals(count1, 1); - assertEquals(count2, 1); - redis.quit(); -}); - -Deno.test("SlidingWindow - large number of events", async () => { - const redis = new Redis({ maxRetriesPerRequest: null }); - const windowSize = 5000; // 5 seconds - const slidingWindow = new SlidingWindow(redis, windowSize); - const eventName = "test_event"; - await slidingWindow.clear(eventName); - const numEvents = 1000; - - for (let i = 0; i < numEvents; i++) { - await slidingWindow.event(eventName); - } - - const count = await slidingWindow.count(eventName); - - assertEquals(count, numEvents); - redis.quit(); -});