Compare commits

...

54 Commits

Author SHA1 Message Date
be3ff00edc
fix: incorrect path for model file 2025-03-29 18:52:44 +08:00
879a6604e5
fix: missing export in deno.json 2025-03-29 18:43:47 +08:00
d88ad099c4
ref: monorepo support 2025-03-29 18:13:57 +08:00
636c5e25cb
ref: move ML stuff
add: .idea to VCS, the refactor guide
2025-03-29 14:13:15 +08:00
7337538f0b
fix: snapshotBefore may returns null even there's a snapshot exists 2025-03-27 05:49:19 +08:00
9e3cc8236c
fix: incorrect iterations counting 2025-03-27 04:15:46 +08:00
01171f5de3
update: remove WINDOW_SIZE 2025-03-27 04:14:12 +08:00
6a7f246562
update: increase limit of getBulkSnapshotsInNextSecond 2025-03-27 04:05:35 +08:00
49098763f1
fix: mixing bulk tasks with other tasks 2025-03-27 03:57:47 +08:00
cd160c486e
update: logging in bulk 2025-03-27 03:51:35 +08:00
c12379134c
update: error handling in bulk fetch 2025-03-27 03:44:41 +08:00
189bb294cb
improve: logging 2025-03-27 03:31:48 +08:00
7ad4255fa7
update: remove lastAvailableWindow 2025-03-27 03:26:13 +08:00
de061eeb0f
update: rate limit config and number of schedules obtained by tick 2025-03-27 03:21:01 +08:00
767e19b425
fix: bigint serialization failed 2025-03-27 03:15:35 +08:00
d8201c7f8e
update: shallow copy instead of re-asignment when creating copy of rate limitor config 2025-03-27 03:13:05 +08:00
d229e49ff2
feat: bulk fetch 2025-03-27 02:58:42 +08:00
aea9e10d1a
update: bulk fetch 2025-03-27 02:33:54 +08:00
d44ba8a0ae
fix: incorrect offset calculation in adjustSnapshotTime 2025-03-27 02:13:33 +08:00
d6dd4d9334
fix: potential shifting for obtained window offset in last commit 2025-03-27 02:09:48 +08:00
d5c278ae06
improve: cache result for adjustSnapshotTime 2025-03-27 01:48:10 +08:00
c80b047e0c
fix: did not release db client before quiting 2025-03-27 01:31:54 +08:00
92678066a7
update: forgot to plus one for iteration count 2025-03-26 23:37:12 +08:00
afee7f58bf
update: return when meeting non-0 video status code in snapshotVideo job 2025-03-26 23:25:07 +08:00
5450c17e13
update: reduce rate limit, no longer collect deleted videos for milestone monitoring 2025-03-26 23:13:53 +08:00
2c51c3c09c
add: log for getRegularSnapshotInterval 2025-03-26 23:04:08 +08:00
7adc370ba2
update: increase concurrency of snapshot worker 2025-03-26 23:00:45 +08:00
b286a9d7b1
update: use findSnapshotBefore instead of findClosetSnapshot in regular snapshot scheduling 2025-03-26 22:59:52 +08:00
cabb360a16
fix: missing entrance for schedule type 'new'
update: perf log text
2025-03-25 22:58:56 +08:00
17ded63758
update: perf monitor for adjustSnapshotTime 2025-03-25 22:38:57 +08:00
2d8e990bc9
fix: prevent duplicate jobs added to queue 2025-03-25 21:33:58 +08:00
b33fd790d1
fix: snapshot process early returned incorrectly 2025-03-25 21:29:47 +08:00
86337e3802
add: set scheduled job status to 'processing' before adding to bullmq 2025-03-25 05:05:33 +08:00
76c9256662
test: looser rate limits for snapshot 2025-03-25 04:57:24 +08:00
a178b7fc16
update: force schedule for new songs 2025-03-25 04:52:19 +08:00
6b7142a6d5
fix: new jobs may be scheduled before the current time 2025-03-24 23:43:54 +08:00
314beb54b5
add: upsertJob for scheduleCleanup 2025-03-24 23:39:29 +08:00
d8b47f8fc8
add: log for scheduleCleanupWorker 2025-03-24 23:37:46 +08:00
fa058b22fe
fix: job consumption rate too low, add outdated job cleanup 2025-03-24 23:36:01 +08:00
48b1130cba
feat: continuous monitoring of new songs 2025-03-24 04:53:43 +08:00
42db333d1a
temp: schedule for new songs 2025-03-24 04:40:10 +08:00
0a73d28623
fix: incorrect time base when adjusting snapshot time 2025-03-24 04:15:51 +08:00
584a1be9f9
test: debug for adjusting snapshot time 2025-03-24 04:08:34 +08:00
20731c0530
test: debug for init window 2025-03-24 04:05:22 +08:00
cb573e55d9
fix: incorrect offset when initializing 2025-03-24 04:01:44 +08:00
8be68248df
fix: ignored the case where snapshots are actually the same 2025-03-24 03:49:06 +08:00
6f4a26e8b3
test: dynamic interval for reglar snapshots 2025-03-24 03:43:26 +08:00
c99318e2d3
update: missing env in deno job 2025-03-24 02:39:57 +08:00
3028dc13c7
improve: performance when dispatching jobs 2025-03-24 02:35:55 +08:00
0455abce2e
fix: did not quit when job is already workingx 2025-03-24 01:10:27 +08:00
723b6090c4
fix: 1 dispatchRegularSnapshots working at a time 2025-03-24 01:06:48 +08:00
9060d28823
fix: prevent dispatchRegularSnapshots from running for too long 2025-03-24 00:57:48 +08:00
0be961e709
improve: target time finding 2025-03-23 23:31:16 +08:00
f9dd53c250
update: remove old jobScheduler 2025-03-23 21:48:42 +08:00
122 changed files with 1169 additions and 1139 deletions

12
.gitignore vendored
View File

@ -51,7 +51,6 @@ internal/
!tests/cases/projects/projectOption/**/node_modules
!tests/cases/projects/NodeModulesSearch/**/*
!tests/baselines/reference/project/nodeModules*/**/*
.idea
yarn.lock
yarn-error.log
.parallelperf.*
@ -78,10 +77,11 @@ node_modules/
# project specific
logs/
__pycache__
filter/runs
pred/runs
pred/checkpoints
data/
filter/checkpoints
ml/filter/runs
ml/pred/runs
ml/pred/checkpoints
ml/pred/observed
ml/data/
ml/filter/checkpoints
scripts
model/

9
.idea/.gitignore vendored Normal file
View File

@ -0,0 +1,9 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
dataSources.xml

21
.idea/cvsa.iml Normal file
View File

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="WEB_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/.tmp" />
<excludeFolder url="file://$MODULE_DIR$/temp" />
<excludeFolder url="file://$MODULE_DIR$/tmp" />
<excludeFolder url="file://$MODULE_DIR$/ml/data" />
<excludeFolder url="file://$MODULE_DIR$/doc" />
<excludeFolder url="file://$MODULE_DIR$/ml/filter/checkpoints" />
<excludeFolder url="file://$MODULE_DIR$/ml/filter/runs" />
<excludeFolder url="file://$MODULE_DIR$/ml/lab/data" />
<excludeFolder url="file://$MODULE_DIR$/ml/lab/temp" />
<excludeFolder url="file://$MODULE_DIR$/logs" />
<excludeFolder url="file://$MODULE_DIR$/model" />
<excludeFolder url="file://$MODULE_DIR$/src/db" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

View File

@ -0,0 +1,12 @@
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="GrazieInspection" enabled="false" level="GRAMMAR_ERROR" enabled_by_default="false" />
<inspection_tool class="LanguageDetectionInspection" enabled="false" level="WARNING" enabled_by_default="false" />
<inspection_tool class="SpellCheckingInspection" enabled="false" level="TYPO" enabled_by_default="false">
<option name="processCode" value="true" />
<option name="processLiterals" value="true" />
<option name="processComments" value="true" />
</inspection_tool>
</profile>
</component>

8
.idea/modules.xml Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/cvsa.iml" filepath="$PROJECT_DIR$/.idea/cvsa.iml" />
</modules>
</component>
</project>

6
.idea/sqldialects.xml Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="SqlDialectMappings">
<file url="PROJECT" dialect="PostgreSQL" />
</component>
</project>

6
.idea/vcs.xml Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
</component>
</project>

65
README-refactor.md Normal file
View File

@ -0,0 +1,65 @@
# 项目重构方案
## 目标架构
采用monorepo结构管理三个独立部分
1. `packages/crawler` - 现有爬虫功能
2. `packages/frontend` - 基于Astro的前端
3. `packages/backend` - 基于Hono的API后端
## 目录结构调整方案
### 新结构
```
.
├── packages/
│ ├── crawler/ # 爬虫组件
│ ├── frontend/ # Astro前端
│ ├── backend/ # Hono后端API
│ └── core/ # 共享代码(未来提取)
├── docs/ # 文档
├── scripts/ # 项目脚本
└── package.json # 根项目配置
```
### 具体迁移方案
#### 1. 爬虫部分(crawler)
保留以下目录/文件:
- `lib/` (除前端相关)
- `src/db/raw/`
- `src/filterWorker.ts`
- `src/worker.ts`
- `test/`
- `deno.json`
- `.gitignore`
需要移除:
- Fresh框架相关文件
- 前端组件(`components/`)
- 静态资源(`static/`)
#### 2. 前端部分(frontend)
全新创建Astro项目不保留任何现有前端代码
#### 3. 后端部分(backend)
全新创建Hono项目
#### 4. 共享代码(core)
未来可从爬虫中提取以下内容到core package
- 数据库相关:`lib/db/`
- 消息队列:`lib/mq/`
- 网络请求:`lib/net/`
- 工具函数:`lib/utils/`
## 重构步骤建议
1. 初始化monorepo结构
2. 迁移爬虫代码到`packages/crawler`
3. 创建新的Astro项目在`packages/frontend`
4. 创建新的Hono项目在`packages/backend`
5. 逐步提取共享代码到`packages/core`
## 注意事项
- 机器学习相关代码(`pred/`, `filter/`, `lab/`)保持现状
- 文档(`doc/`)可以迁移到`docs/`目录
- 需要更新CI/CD流程支持monorepo

View File

@ -1,12 +0,0 @@
import { JSX } from "preact";
import { IS_BROWSER } from "$fresh/runtime.ts";
export function Button(props: JSX.HTMLAttributes<HTMLButtonElement>) {
return (
<button
{...props}
disabled={!IS_BROWSER || props.disabled}
class="px-2 py-1 border-gray-500 border-2 rounded bg-white hover:bg-gray-200 transition-colors"
/>
);
}

View File

@ -1,55 +0,0 @@
import json
import random
def process_data(input_file, output_file):
"""
从输入文件中读取数据找出model和human不一致的行
删除"model""human"键重命名为"label"
然后将处理后的数据添加到输出文件中
在写入之前它会加载output_file中的所有样本
并使用aid键进行去重过滤
Args:
input_file (str): 输入文件的路径
output_file (str): 输出文件的路径
"""
# 加载output_file中已有的数据用于去重
existing_data = set()
try:
with open(output_file, 'r', encoding='utf-8') as f_out:
for line in f_out:
try:
data = json.loads(line)
existing_data.add(data['aid'])
except json.JSONDecodeError:
pass # 忽略JSON解码错误继续读取下一行
except FileNotFoundError:
pass # 如果文件不存在,则忽略
with open(input_file, 'r', encoding='utf-8') as f_in, open(output_file, 'a', encoding='utf-8') as f_out:
for line in f_in:
try:
data = json.loads(line)
if data['model'] != data['human'] or random.random() < 0.2:
if data['aid'] not in existing_data: # 检查aid是否已存在
del data['model']
data['label'] = data['human']
del data['human']
f_out.write(json.dumps(data, ensure_ascii=False) + '\n')
existing_data.add(data['aid']) # 将新的aid添加到集合中
except json.JSONDecodeError as e:
print(f"JSON解码错误: {e}")
print(f"错误行内容: {line.strip()}")
except KeyError as e:
print(f"KeyError: 键 '{e}' 不存在")
print(f"错误行内容: {line.strip()}")
# 调用函数处理数据
input_file = 'real_test.jsonl'
output_file = 'labeled_data.jsonl'
process_data(input_file, output_file)
print(f"处理完成,结果已写入 {output_file}")

View File

@ -1,60 +1,8 @@
{
"lock": false,
"tasks": {
"crawl-raw-bili": "deno --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run src/db/raw/insertAidsToDB.ts",
"crawl-bili-aids": "deno --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run src/db/raw/fetchAids.ts",
"check": "deno fmt --check && deno lint && deno check **/*.ts && deno check **/*.tsx",
"cli": "echo \"import '\\$fresh/src/dev/cli.ts'\" | deno run --unstable -A -",
"manifest": "deno task cli manifest $(pwd)",
"start": "deno run -A --watch=static/,routes/ dev.ts",
"build": "deno run -A dev.ts build",
"preview": "deno run -A main.ts",
"update": "deno run -A -r https://fresh.deno.dev/update .",
"worker:main": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write --allow-run ./src/worker.ts",
"worker:filter": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/filterWorker.ts",
"adder": "deno run --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts",
"bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts",
"all": "concurrently 'deno task worker:main' 'deno task adder' 'deno task bullui' 'deno task worker:filter'",
"test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run"
},
"lint": {
"rules": {
"tags": ["fresh", "recommended"]
}
},
"exclude": ["**/_fresh/*"],
"imports": {
"@std/assert": "jsr:@std/assert@1",
"$fresh/": "https://deno.land/x/fresh@1.7.3/",
"preact": "https://esm.sh/preact@10.22.0",
"preact/": "https://esm.sh/preact@10.22.0/",
"@preact/signals": "https://esm.sh/*@preact/signals@1.2.2",
"@preact/signals-core": "https://esm.sh/*@preact/signals-core@1.5.1",
"tailwindcss": "npm:tailwindcss@3.4.1",
"tailwindcss/": "npm:/tailwindcss@3.4.1/",
"tailwindcss/plugin": "npm:/tailwindcss@3.4.1/plugin.js",
"$std/": "https://deno.land/std@0.216.0/",
"@huggingface/transformers": "npm:@huggingface/transformers@3.0.0",
"bullmq": "npm:bullmq",
"lib/": "./lib/",
"ioredis": "npm:ioredis",
"@bull-board/api": "npm:@bull-board/api",
"@bull-board/express": "npm:@bull-board/express",
"express": "npm:express",
"src/": "./src/",
"onnxruntime": "npm:onnxruntime-node@1.19.2",
"chalk": "npm:chalk"
},
"compilerOptions": {
"jsx": "react-jsx",
"jsxImportSource": "preact"
},
"workspace": ["./packages/crawler", "./packages/frontend", "./packages/backend", "./packages/core"],
"nodeModulesDir": "auto",
"fmt": {
"useTabs": true,
"lineWidth": 120,
"indentWidth": 4,
"semiColons": true,
"proseWrap": "always"
"tasks": {
"crawler": "deno task --filter 'crawler' all"
}
}

7
dev.ts
View File

@ -1,7 +0,0 @@
#!/usr/bin/env -S deno run -A --watch=static/,routes/
import dev from "$fresh/dev.ts";
import config from "./fresh.config.ts";
import "$std/dotenv/load.ts";
await dev(import.meta.url, "./main.ts", config);

View File

@ -1,6 +0,0 @@
import { defineConfig } from "$fresh/server.ts";
import tailwind from "$fresh/plugins/tailwind.ts";
export default defineConfig({
plugins: [tailwind()],
});

View File

@ -1,27 +0,0 @@
// DO NOT EDIT. This file is generated by Fresh.
// This file SHOULD be checked into source version control.
// This file is automatically updated during development when running `dev.ts`.
import * as $_404 from "./routes/_404.tsx";
import * as $_app from "./routes/_app.tsx";
import * as $api_joke from "./routes/api/joke.ts";
import * as $greet_name_ from "./routes/greet/[name].tsx";
import * as $index from "./routes/index.tsx";
import * as $Counter from "./islands/Counter.tsx";
import type { Manifest } from "$fresh/server.ts";
const manifest = {
routes: {
"./routes/_404.tsx": $_404,
"./routes/_app.tsx": $_app,
"./routes/api/joke.ts": $api_joke,
"./routes/greet/[name].tsx": $greet_name_,
"./routes/index.tsx": $index,
},
islands: {
"./islands/Counter.tsx": $Counter,
},
baseUrl: import.meta.url,
} satisfies Manifest;
export default manifest;

View File

@ -1,16 +0,0 @@
import type { Signal } from "@preact/signals";
import { Button } from "../components/Button.tsx";
interface CounterProps {
count: Signal<number>;
}
export default function Counter(props: CounterProps) {
return (
<div class="flex gap-8 py-6">
<Button onClick={() => props.count.value -= 1}>-1</Button>
<p class="text-3xl tabular-nums">{props.count}</p>
<Button onClick={() => props.count.value += 1}>+1</Button>
</div>
);
}

View File

@ -1,228 +0,0 @@
import {DAY, HOUR, MINUTE} from "$std/datetime/constants.ts";
import {Client} from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import {formatTimestampToPsql} from "lib/utils/formatTimestampToPostgre.ts";
import {SnapshotScheduleType} from "./schema.d.ts";
import logger from "lib/log/logger.ts";
export async function snapshotScheduleExists(client: Client, id: number) {
const res = await client.queryObject<{ id: number }>(
`SELECT id FROM snapshot_schedule WHERE id = $1`,
[id],
);
return res.rows.length > 0;
}
/*
Returns true if the specified `aid` has at least one record with "pending" or "processing" status.
*/
export async function videoHasActiveSchedule(client: Client, aid: number) {
const res = await client.queryObject<{ status: string }>(
`SELECT status FROM snapshot_schedule WHERE aid = $1 AND (status = 'pending' OR status = 'processing')`,
[aid],
);
return res.rows.length > 0;
}
export async function videoHasProcessingSchedule(client: Client, aid: number) {
const res = await client.queryObject<{ status: string }>(
`SELECT status FROM snapshot_schedule WHERE aid = $1 AND status = 'processing'`,
[aid],
);
return res.rows.length > 0;
}
interface Snapshot {
created_at: number;
views: number;
}
export async function findClosestSnapshot(
client: Client,
aid: number,
targetTime: Date,
): Promise<Snapshot | null> {
const query = `
SELECT created_at, views
FROM video_snapshot
WHERE aid = $1
ORDER BY ABS(EXTRACT(EPOCH FROM (created_at - $2::timestamptz)))
LIMIT 1
`;
const result = await client.queryObject<{ created_at: string; views: number }>(
query,
[aid, targetTime.toISOString()],
);
if (result.rows.length === 0) return null;
const row = result.rows[0];
return {
created_at: new Date(row.created_at).getTime(),
views: row.views,
};
}
export async function hasAtLeast2Snapshots(client: Client, aid: number) {
const res = await client.queryObject<{ count: number }>(
`SELECT COUNT(*) FROM video_snapshot WHERE aid = $1`,
[aid],
);
return res.rows[0].count >= 2;
}
export async function getLatestSnapshot(client: Client, aid: number): Promise<Snapshot | null> {
const res = await client.queryObject<{ created_at: string; views: number }>(
`SELECT created_at, views FROM video_snapshot WHERE aid = $1 ORDER BY created_at DESC LIMIT 1`,
[aid],
);
if (res.rows.length === 0) return null;
const row = res.rows[0];
return {
created_at: new Date(row.created_at).getTime(),
views: row.views,
};
}
/*
* Returns the number of snapshot schedules within the specified range.
* @param client The database client.
* @param start The start time of the range. (Timestamp in milliseconds)
* @param end The end time of the range. (Timestamp in milliseconds)
*/
export async function getSnapshotScheduleCountWithinRange(client: Client, start: number, end: number) {
const startTimeString = formatTimestampToPsql(start);
const endTimeString = formatTimestampToPsql(end);
const query = `
SELECT COUNT(*) FROM snapshot_schedule
WHERE started_at BETWEEN $1 AND $2
AND status = 'pending'
`;
const res = await client.queryObject<{ count: number }>(query, [startTimeString, endTimeString]);
return res.rows[0].count;
}
/*
* Creates a new snapshot schedule record.
* @param client The database client.
* @param aid The aid of the video.
* @param targetTime Scheduled time for snapshot. (Timestamp in milliseconds)
*/
export async function scheduleSnapshot(client: Client, aid: number, type: string, targetTime: number) {
if (await videoHasActiveSchedule(client, aid)) return;
const allowedCount = type === "milestone" ? 2000 : 800;
const adjustedTime = await adjustSnapshotTime(client, new Date(targetTime), allowedCount);
logger.log(`Scheduled snapshot for ${aid} at ${adjustedTime.toISOString()}`, "mq", "fn:scheduleSnapshot");
return client.queryObject(
`INSERT INTO snapshot_schedule (aid, type, started_at) VALUES ($1, $2, $3)`,
[aid, type, adjustedTime.toISOString()],
);
}
/**
* Adjust the trigger time of the snapshot to ensure it does not exceed the frequency limit
* @param client PostgreSQL client
* @param expectedStartTime The expected snapshot time
* @param allowedCounts The number of snapshots allowed in the 5-minutes windows.
* @returns The adjusted actual snapshot time
*/
export async function adjustSnapshotTime(
client: Client,
expectedStartTime: Date,
allowedCounts: number = 2000
): Promise<Date> {
const findWindowQuery = `
WITH windows AS (
SELECT generate_series(
$1::timestamp, -- Start time: current time truncated to the nearest 5-minute window
$2::timestamp, -- End time: 24 hours after the target time window starts
INTERVAL '5 MINUTES'
) AS window_start
)
SELECT w.window_start
FROM windows w
LEFT JOIN snapshot_schedule s ON s.started_at >= w.window_start
AND s.started_at < w.window_start + INTERVAL '5 MINUTES'
AND s.status = 'pending'
GROUP BY w.window_start
HAVING COUNT(s.*) < ${allowedCounts}
ORDER BY w.window_start
LIMIT 1;
`;
const now = new Date();
const targetTime = expectedStartTime.getTime();
let start = new Date(targetTime - 2 * HOUR);
if (start.getTime() <= now.getTime()) {
start = now;
}
const startTruncated = truncateTo5MinInterval(start);
const end = new Date(startTruncated.getTime() + 1 * DAY);
const windowResult = await client.queryObject<{ window_start: Date }>(
findWindowQuery,
[startTruncated, end],
);
const windowStart = windowResult.rows[0]?.window_start;
if (!windowStart) {
return expectedStartTime;
}
if (windowStart.getTime() > new Date().getTime() + 5 * MINUTE) {
const randomDelay = Math.floor(Math.random() * 5 * MINUTE);
return new Date(windowStart.getTime() + randomDelay);
} else {
return expectedStartTime;
}
}
/**
* Truncate the timestamp to the nearest 5-minute interval
* @param timestamp The timestamp
* @returns The truncated time
*/
function truncateTo5MinInterval(timestamp: Date): Date {
const minutes = timestamp.getMinutes() - (timestamp.getMinutes() % 5);
return new Date(
timestamp.getFullYear(),
timestamp.getMonth(),
timestamp.getDate(),
timestamp.getHours(),
minutes,
0,
0,
);
}
export async function getSnapshotsInNextSecond(client: Client) {
const query = `
SELECT *
FROM snapshot_schedule
WHERE started_at <= NOW() + INTERVAL '1 seconds' AND status = 'pending'
ORDER BY
CASE
WHEN type = 'milestone' THEN 0
ELSE 1
END,
started_at
LIMIT 3;
`;
const res = await client.queryObject<SnapshotScheduleType>(query, []);
return res.rows;
}
export async function setSnapshotStatus(client: Client, id: number, status: string) {
return await client.queryObject(
`UPDATE snapshot_schedule SET status = $2 WHERE id = $1`,
[id, status],
);
}
export async function getVideosWithoutActiveSnapshotSchedule(client: Client) {
const query: string = `
SELECT s.aid
FROM songs s
LEFT JOIN snapshot_schedule ss ON s.aid = ss.aid AND (ss.status = 'pending' OR ss.status = 'processing')
WHERE ss.aid IS NULL
`;
const res = await client.queryObject<{ aid: number }>(query, []);
return res.rows.map((r) => Number(r.aid));
}

View File

@ -1,22 +0,0 @@
import { AIManager } from "lib/ml/manager.ts";
import * as ort from "onnxruntime";
import logger from "lib/log/logger.ts";
import { WorkerError } from "lib/mq/schema.ts";
const modelPath = "./model/model.onnx";
class MantisProto extends AIManager {
constructor() {
super();
this.models = {
"predictor": modelPath,
};
}
public override async init(): Promise<void> {
await super.init();
}
}
const Mantis = new MantisProto();
export default Mantis;

View File

@ -1,200 +0,0 @@
import { Job } from "bullmq";
import { db } from "lib/db/init.ts";
import {getLatestVideoSnapshot, getVideosNearMilestone} from "lib/db/snapshot.ts";
import {
findClosestSnapshot,
getLatestSnapshot,
getSnapshotsInNextSecond, getVideosWithoutActiveSnapshotSchedule,
hasAtLeast2Snapshots,
scheduleSnapshot,
setSnapshotStatus,
snapshotScheduleExists,
videoHasProcessingSchedule,
} from "lib/db/snapshotSchedule.ts";
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { WEEK, HOUR, MINUTE, SECOND } from "$std/datetime/constants.ts";
import logger from "lib/log/logger.ts";
import { SnapshotQueue } from "lib/mq/index.ts";
import { insertVideoSnapshot } from "lib/mq/task/getVideoStats.ts";
import { NetSchedulerError } from "lib/mq/scheduler.ts";
import { setBiliVideoStatus } from "lib/db/allData.ts";
import { truncate } from "lib/utils/truncate.ts";
const priorityMap: { [key: string]: number } = {
"milestone": 1,
"normal": 3,
};
const snapshotTypeToTaskMap: { [key: string]: string } = {
"milestone": "snapshotMilestoneVideo",
"normal": "snapshotVideo",
};
export const snapshotTickWorker = async (_job: Job) => {
const client = await db.connect();
try {
const schedules = await getSnapshotsInNextSecond(client);
for (const schedule of schedules) {
let priority = 3;
if (schedule.type && priorityMap[schedule.type]) {
priority = priorityMap[schedule.type];
}
const aid = Number(schedule.aid);
await SnapshotQueue.add("snapshotVideo", {
aid: aid,
id: Number(schedule.id),
type: schedule.type ?? "normal",
}, { priority });
}
} catch (e) {
logger.error(e as Error);
} finally {
client.release();
}
};
export const closetMilestone = (views: number) => {
if (views < 100000) return 100000;
if (views < 1000000) return 1000000;
return 10000000;
};
const log = (value: number, base: number = 10) => Math.log(value) / Math.log(base);
/*
* Returns the minimum ETA in hours for the next snapshot
* @param client - Postgres client
* @param aid - aid of the video
* @returns ETA in hours
*/
const getAdjustedShortTermETA = async (client: Client, aid: number) => {
const latestSnapshot = await getLatestSnapshot(client, aid);
// Immediately dispatch a snapshot if there is no snapshot yet
if (!latestSnapshot) return 0;
const snapshotsEnough = await hasAtLeast2Snapshots(client, aid);
if (!snapshotsEnough) return 0;
const currentTimestamp = new Date().getTime();
const timeIntervals = [3 * MINUTE, 20 * MINUTE, 1 * HOUR, 3 * HOUR, 6 * HOUR];
const DELTA = 0.00001;
let minETAHours = Infinity;
for (const timeInterval of timeIntervals) {
const date = new Date(currentTimestamp - timeInterval);
const snapshot = await findClosestSnapshot(client, aid, date);
if (!snapshot) continue;
const hoursDiff = (latestSnapshot.created_at - snapshot.created_at) / HOUR;
const viewsDiff = latestSnapshot.views - snapshot.views;
if (viewsDiff <= 0) continue;
const speed = viewsDiff / (hoursDiff + DELTA);
const target = closetMilestone(latestSnapshot.views);
const viewsToIncrease = target - latestSnapshot.views;
const eta = viewsToIncrease / (speed + DELTA);
let factor = log(2.97 / log(viewsToIncrease + 1), 1.14);
factor = truncate(factor, 3, 100);
const adjustedETA = eta / factor;
if (adjustedETA < minETAHours) {
minETAHours = adjustedETA;
}
}
if (isNaN(minETAHours)) {
minETAHours = Infinity;
}
return minETAHours;
};
export const collectMilestoneSnapshotsWorker = async (_job: Job) => {
const client = await db.connect();
try {
const videos = await getVideosNearMilestone(client);
for (const video of videos) {
const aid = Number(video.aid);
const eta = await getAdjustedShortTermETA(client, aid);
if (eta > 72) continue;
const now = Date.now();
const scheduledNextSnapshotDelay = eta * HOUR;
const maxInterval = 4 * HOUR;
const minInterval = 1 * SECOND;
const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval);
const targetTime = now + delay;
await scheduleSnapshot(client, aid, "milestone", targetTime);
}
} catch (e) {
logger.error(e as Error, "mq", "fn:collectMilestoneSnapshotsWorker");
} finally {
client.release();
}
};
export const regularSnapshotsWorker = async (_job: Job) => {
const client = await db.connect();
try {
const aids = await getVideosWithoutActiveSnapshotSchedule(client);
for (const rawAid of aids) {
const aid = Number(rawAid);
const latestSnapshot = await getLatestVideoSnapshot(client, aid);
const now = Date.now();
const lastSnapshotedAt = latestSnapshot?.time ?? now;
const targetTime = truncate(lastSnapshotedAt + 24 * HOUR, now + 1, now + 100000 * WEEK);
await scheduleSnapshot(client, aid, "normal", targetTime);
}
} catch (e) {
logger.error(e as Error, "mq", "fn:regularSnapshotsWorker");
} finally {
client.release();
}
};
export const takeSnapshotForVideoWorker = async (job: Job) => {
const id = job.data.id;
const aid = Number(job.data.aid);
const type = job.data.type;
const task = snapshotTypeToTaskMap[type] ?? "snapshotVideo";
const client = await db.connect();
const retryInterval = type === "milestone" ? 5 * SECOND : 2 * MINUTE;
const exists = await snapshotScheduleExists(client, id);
if (!exists) {
return;
}
try {
if (await videoHasProcessingSchedule(client, aid)) {
return `ALREADY_PROCESSING`;
}
await setSnapshotStatus(client, id, "processing");
const stat = await insertVideoSnapshot(client, aid, task);
if (typeof stat === "number") {
await setBiliVideoStatus(client, aid, stat);
await setSnapshotStatus(client, id, "completed");
return `BILI_STATUS_${stat}`;
}
await setSnapshotStatus(client, id, "completed");
if (type === "normal") {
await scheduleSnapshot(client, aid, type, Date.now() + 24 * HOUR);
return `DONE`;
}
if (type !== "milestone") return `DONE`;
const eta = await getAdjustedShortTermETA(client, aid);
if (eta > 72) return "ETA_TOO_LONG";
const now = Date.now();
const targetTime = now + eta * HOUR;
await scheduleSnapshot(client, aid, type, targetTime);
return `DONE`;
} catch (e) {
if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") {
logger.warn(
`No available proxy for aid ${job.data.aid}.`,
"mq",
"fn:takeSnapshotForVideoWorker",
);
await setSnapshotStatus(client, id, "completed");
await scheduleSnapshot(client, aid, type, Date.now() + retryInterval);
return;
}
logger.error(e as Error, "mq", "fn:takeSnapshotForVideoWorker");
await setSnapshotStatus(client, id, "failed");
} finally {
client.release();
}
};

View File

@ -1 +0,0 @@
export * from "lib/mq/exec/getLatestVideos.ts";

View File

@ -1,39 +0,0 @@
import { MINUTE, SECOND } from "$std/datetime/constants.ts";
import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "lib/mq/index.ts";
import logger from "lib/log/logger.ts";
export async function initMQ() {
await LatestVideosQueue.upsertJobScheduler("getLatestVideos", {
every: 1 * MINUTE,
immediately: true,
});
await ClassifyVideoQueue.upsertJobScheduler("classifyVideos", {
every: 5 * MINUTE,
immediately: true,
});
await LatestVideosQueue.upsertJobScheduler("collectSongs", {
every: 3 * MINUTE,
immediately: true,
});
await SnapshotQueue.upsertJobScheduler("snapshotTick", {
every: 1 * SECOND,
immediately: true,
}, {
opts: {
removeOnComplete: 1,
removeOnFail: 1,
},
});
await SnapshotQueue.upsertJobScheduler("collectMilestoneSnapshots", {
every: 5 * MINUTE,
immediately: true,
});
await SnapshotQueue.upsertJobScheduler("dispatchRegularSnapshots", {
every: 30 * MINUTE,
immediately: true,
});
logger.log("Message queue initialized.");
}

13
main.ts
View File

@ -1,13 +0,0 @@
/// <reference no-default-lib="true" />
/// <reference lib="dom" />
/// <reference lib="dom.iterable" />
/// <reference lib="dom.asynciterable" />
/// <reference lib="deno.ns" />
import "$std/dotenv/load.ts";
import { start } from "$fresh/server.ts";
import manifest from "./fresh.gen.ts";
import config from "./fresh.config.ts";
await start(manifest, config);

View File

View File

0
packages/core/deno.json Normal file
View File

View File

@ -1,6 +1,6 @@
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { AllDataType, BiliUserType } from "lib/db/schema.d.ts";
import Akari from "lib/ml/akari.ts";
import { AllDataType, BiliUserType } from "db/schema.d.ts";
import Akari from "ml/akari.ts";
export async function videoExistsInAllData(client: Client, aid: number) {
return await client.queryObject<{ exists: boolean }>(
@ -75,3 +75,13 @@ export async function setBiliVideoStatus(client: Client, aid: number, status: nu
[status, aid],
);
}
export async function getBiliVideoStatus(client: Client, aid: number) {
const queryResult = await client.queryObject<{ status: number }>(
`SELECT status FROM bilibili_metadata WHERE aid = $1`,
[aid],
);
const rows = queryResult.rows;
if (rows.length === 0) return 0;
return rows[0].status;
}

View File

@ -1,5 +1,5 @@
import { Pool } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { postgresConfig } from "lib/db/pgConfig.ts";
import { postgresConfig } from "db/pgConfig.ts";
const pool = new Pool(postgresConfig, 12);

View File

@ -1,5 +1,5 @@
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { LatestSnapshotType } from "lib/db/schema.d.ts";
import { LatestSnapshotType } from "db/schema.d.ts";
export async function getVideosNearMilestone(client: Client) {
const queryResult = await client.queryObject<LatestSnapshotType>(`
@ -7,6 +7,7 @@ export async function getVideosNearMilestone(client: Client) {
FROM latest_video_snapshot ls
INNER JOIN
songs s ON ls.aid = s.aid
AND s.deleted = false
WHERE
s.deleted = false AND
(views >= 90000 AND views < 100000) OR
@ -22,11 +23,14 @@ export async function getVideosNearMilestone(client: Client) {
}
export async function getLatestVideoSnapshot(client: Client, aid: number): Promise<null | LatestSnapshotType> {
const queryResult = await client.queryObject<LatestSnapshotType>(`
const queryResult = await client.queryObject<LatestSnapshotType>(
`
SELECT *
FROM latest_video_snapshot
WHERE aid = $1
`, [aid]);
`,
[aid],
);
if (queryResult.rows.length === 0) {
return null;
}
@ -35,6 +39,6 @@ export async function getLatestVideoSnapshot(client: Client, aid: number): Promi
...row,
aid: Number(row.aid),
time: new Date(row.time).getTime(),
}
};
})[0];
}

View File

@ -0,0 +1,309 @@
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { formatTimestampToPsql } from "utils/formatTimestampToPostgre.ts";
import { SnapshotScheduleType } from "./schema.d.ts";
import logger from "log/logger.ts";
import { MINUTE } from "$std/datetime/constants.ts";
import { redis } from "db/redis.ts";
import { Redis } from "ioredis";
const REDIS_KEY = "cvsa:snapshot_window_counts";
function getCurrentWindowIndex(): number {
const now = new Date();
const minutesSinceMidnight = now.getHours() * 60 + now.getMinutes();
const currentWindow = Math.floor(minutesSinceMidnight / 5);
return currentWindow;
}
export async function refreshSnapshotWindowCounts(client: Client, redisClient: Redis) {
const now = new Date();
const startTime = now.getTime();
const result = await client.queryObject<{ window_start: Date; count: number }>`
SELECT
date_trunc('hour', started_at) +
(EXTRACT(minute FROM started_at)::int / 5 * INTERVAL '5 minutes') AS window_start,
COUNT(*) AS count
FROM snapshot_schedule
WHERE started_at >= NOW() AND status = 'pending' AND started_at <= NOW() + INTERVAL '10 days'
GROUP BY 1
ORDER BY window_start
`
await redisClient.del(REDIS_KEY);
const currentWindow = getCurrentWindowIndex();
for (const row of result.rows) {
const targetOffset = Math.floor((row.window_start.getTime() - startTime) / (5 * MINUTE));
const offset = (currentWindow + targetOffset);
if (offset >= 0) {
await redisClient.hset(REDIS_KEY, offset.toString(), Number(row.count));
}
}
}
export async function initSnapshotWindowCounts(client: Client, redisClient: Redis) {
await refreshSnapshotWindowCounts(client, redisClient);
setInterval(async () => {
await refreshSnapshotWindowCounts(client, redisClient);
}, 5 * MINUTE);
}
async function getWindowCount(redisClient: Redis, offset: number): Promise<number> {
const count = await redisClient.hget(REDIS_KEY, offset.toString());
return count ? parseInt(count, 10) : 0;
}
export async function snapshotScheduleExists(client: Client, id: number) {
const res = await client.queryObject<{ id: number }>(
`SELECT id FROM snapshot_schedule WHERE id = $1`,
[id],
);
return res.rows.length > 0;
}
export async function videoHasActiveSchedule(client: Client, aid: number) {
const res = await client.queryObject<{ status: string }>(
`SELECT status FROM snapshot_schedule WHERE aid = $1 AND (status = 'pending' OR status = 'processing')`,
[aid],
);
return res.rows.length > 0;
}
export async function videoHasProcessingSchedule(client: Client, aid: number) {
const res = await client.queryObject<{ status: string }>(
`SELECT status FROM snapshot_schedule WHERE aid = $1 AND status = 'processing'`,
[aid],
);
return res.rows.length > 0;
}
export async function bulkGetVideosWithoutProcessingSchedules(client: Client, aids: number[]) {
const res = await client.queryObject<{ aid: number }>(
`SELECT aid FROM snapshot_schedule WHERE aid = ANY($1) AND status != 'processing' GROUP BY aid`,
[aids],
);
return res.rows.map((row) => row.aid);
}
interface Snapshot {
created_at: number;
views: number;
}
export async function findClosestSnapshot(
client: Client,
aid: number,
targetTime: Date,
): Promise<Snapshot | null> {
const query = `
SELECT created_at, views
FROM video_snapshot
WHERE aid = $1
ORDER BY ABS(EXTRACT(EPOCH FROM (created_at - $2::timestamptz)))
LIMIT 1
`;
const result = await client.queryObject<{ created_at: string; views: number }>(
query,
[aid, targetTime.toISOString()],
);
if (result.rows.length === 0) return null;
const row = result.rows[0];
return {
created_at: new Date(row.created_at).getTime(),
views: row.views,
};
}
export async function findSnapshotBefore(
client: Client,
aid: number,
targetTime: Date,
): Promise<Snapshot | null> {
const query = `
SELECT created_at, views
FROM video_snapshot
WHERE aid = $1
AND created_at <= $2::timestamptz
ORDER BY created_at DESC
LIMIT 1
`;
const result = await client.queryObject<{ created_at: string; views: number }>(
query,
[aid, targetTime.toISOString()],
);
if (result.rows.length === 0) return null;
const row = result.rows[0];
return {
created_at: new Date(row.created_at).getTime(),
views: row.views,
};
}
export async function hasAtLeast2Snapshots(client: Client, aid: number) {
const res = await client.queryObject<{ count: number }>(
`SELECT COUNT(*) FROM video_snapshot WHERE aid = $1`,
[aid],
);
return res.rows[0].count >= 2;
}
export async function getLatestSnapshot(client: Client, aid: number): Promise<Snapshot | null> {
const res = await client.queryObject<{ created_at: string; views: number }>(
`SELECT created_at, views FROM video_snapshot WHERE aid = $1 ORDER BY created_at DESC LIMIT 1`,
[aid],
);
if (res.rows.length === 0) return null;
const row = res.rows[0];
return {
created_at: new Date(row.created_at).getTime(),
views: row.views,
};
}
/*
* Returns the number of snapshot schedules within the specified range.
* @param client The database client.
* @param start The start time of the range. (Timestamp in milliseconds)
* @param end The end time of the range. (Timestamp in milliseconds)
*/
export async function getSnapshotScheduleCountWithinRange(client: Client, start: number, end: number) {
const startTimeString = formatTimestampToPsql(start);
const endTimeString = formatTimestampToPsql(end);
const query = `
SELECT COUNT(*) FROM snapshot_schedule
WHERE started_at BETWEEN $1 AND $2
AND status = 'pending'
`;
const res = await client.queryObject<{ count: number }>(query, [startTimeString, endTimeString]);
return res.rows[0].count;
}
/*
* Creates a new snapshot schedule record.
* @param client The database client.
* @param aid The aid of the video.
* @param targetTime Scheduled time for snapshot. (Timestamp in milliseconds)
*/
export async function scheduleSnapshot(client: Client, aid: number, type: string, targetTime: number, force: boolean = false) {
if (await videoHasActiveSchedule(client, aid) && !force) return;
let adjustedTime = new Date(targetTime);
if (type !== "milestone" && type !== "new") {
adjustedTime = await adjustSnapshotTime(new Date(targetTime), 1000, redis);
}
logger.log(`Scheduled snapshot for ${aid} at ${adjustedTime.toISOString()}`, "mq", "fn:scheduleSnapshot");
return client.queryObject(
`INSERT INTO snapshot_schedule (aid, type, started_at) VALUES ($1, $2, $3)`,
[aid, type, adjustedTime.toISOString()],
);
}
export async function bulkScheduleSnapshot(client: Client, aids: number[], type: string, targetTime: number, force: boolean = false) {
for (const aid of aids) {
await scheduleSnapshot(client, aid, type, targetTime, force);
}
}
export async function adjustSnapshotTime(
expectedStartTime: Date,
allowedCounts: number = 1000,
redisClient: Redis,
): Promise<Date> {
const currentWindow = getCurrentWindowIndex();
const targetOffset = Math.floor((expectedStartTime.getTime() - Date.now()) / (5 * MINUTE)) - 6;
const initialOffset = currentWindow + Math.max(targetOffset, 0);
let timePerIteration = 0;
const MAX_ITERATIONS = 2880;
let iters = 0;
const t = performance.now();
for (let i = initialOffset; i < MAX_ITERATIONS; i++) {
iters++;
const offset = i;
const count = await getWindowCount(redisClient, offset);
if (count < allowedCounts) {
await redisClient.hincrby(REDIS_KEY, offset.toString(), 1);
const startPoint = new Date();
startPoint.setHours(0, 0, 0, 0);
const startTime = startPoint.getTime();
const windowStart = startTime + offset * 5 * MINUTE;
const randomDelay = Math.floor(Math.random() * 5 * MINUTE);
const delayedDate = new Date(windowStart + randomDelay);
const now = new Date();
if (delayedDate.getTime() < now.getTime()) {
const elapsed = performance.now() - t;
timePerIteration = elapsed / (i+1);
logger.log(`${timePerIteration.toFixed(3)}ms * ${iters} iterations`, "perf", "fn:adjustSnapshotTime");
return now;
}
const elapsed = performance.now() - t;
timePerIteration = elapsed / (i+1);
logger.log(`${timePerIteration.toFixed(3)}ms * ${iters} iterations`, "perf", "fn:adjustSnapshotTime");
return delayedDate;
}
}
const elapsed = performance.now() - t;
timePerIteration = elapsed / MAX_ITERATIONS;
logger.log(`${timePerIteration.toFixed(3)}ms * ${MAX_ITERATIONS} iterations`, "perf", "fn:adjustSnapshotTime");
return expectedStartTime;
}
export async function getSnapshotsInNextSecond(client: Client) {
const query = `
SELECT *
FROM snapshot_schedule
WHERE started_at <= NOW() + INTERVAL '1 seconds' AND status = 'pending' AND type != 'normal'
ORDER BY
CASE
WHEN type = 'milestone' THEN 0
ELSE 1
END,
started_at
LIMIT 10;
`;
const res = await client.queryObject<SnapshotScheduleType>(query, []);
return res.rows;
}
export async function getBulkSnapshotsInNextSecond(client: Client) {
const query = `
SELECT *
FROM snapshot_schedule
WHERE started_at <= NOW() + INTERVAL '15 seconds' AND status = 'pending' AND type = 'normal'
ORDER BY started_at
LIMIT 1000;
`;
const res = await client.queryObject<SnapshotScheduleType>(query, []);
return res.rows;
}
export async function setSnapshotStatus(client: Client, id: number, status: string) {
return await client.queryObject(
`UPDATE snapshot_schedule SET status = $2 WHERE id = $1`,
[id, status],
);
}
export async function bulkSetSnapshotStatus(client: Client, ids: number[], status: string) {
return await client.queryObject(
`UPDATE snapshot_schedule SET status = $2 WHERE id = ANY($1)`,
[ids, status],
);
}
export async function getVideosWithoutActiveSnapshotSchedule(client: Client) {
const query: string = `
SELECT s.aid
FROM songs s
LEFT JOIN snapshot_schedule ss ON s.aid = ss.aid AND (ss.status = 'pending' OR ss.status = 'processing')
WHERE ss.aid IS NULL
`;
const res = await client.queryObject<{ aid: number }>(query, []);
return res.rows.map((r) => Number(r.aid));
}

View File

@ -1,4 +1,5 @@
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { parseTimestampFromPsql } from "utils/formatTimestampToPostgre.ts";
export async function getNotCollectedSongs(client: Client) {
const queryResult = await client.queryObject<{ aid: number }>(`
@ -27,3 +28,18 @@ export async function aidExistsInSongs(client: Client, aid: number) {
);
return queryResult.rows[0].exists;
}
export async function getSongsPublihsedAt(client: Client, aid: number) {
const queryResult = await client.queryObject<{ published_at: string }>(
`
SELECT published_at
FROM songs
WHERE aid = $1;
`,
[aid],
);
if (queryResult.rows.length === 0) {
return null;
}
return parseTimestampFromPsql(queryResult.rows[0].published_at);
}

View File

@ -0,0 +1,50 @@
{
"name": "@cvsa/crawler",
"tasks": {
"crawl-raw-bili": "deno --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run src/db/raw/insertAidsToDB.ts",
"crawl-bili-aids": "deno --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run src/db/raw/fetchAids.ts",
"check": "deno fmt --check && deno lint && deno check **/*.ts && deno check **/*.tsx",
"manifest": "deno task cli manifest $(pwd)",
"start": "deno run -A --watch=static/,routes/ dev.ts",
"build": "deno run -A dev.ts build",
"preview": "deno run -A main.ts",
"worker:main": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write --allow-run ./src/worker.ts",
"worker:filter": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/filterWorker.ts",
"adder": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts",
"bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts",
"all": "concurrently 'deno task worker:main' 'deno task adder' 'deno task bullui' 'deno task worker:filter'",
"test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run"
},
"lint": {
"rules": {
"tags": ["recommended"]
}
},
"imports": {
"@std/assert": "jsr:@std/assert@1",
"$std/": "https://deno.land/std@0.216.0/",
"@huggingface/transformers": "npm:@huggingface/transformers@3.0.0",
"bullmq": "npm:bullmq",
"mq/": "./mq/",
"db/": "./db/",
"log/": "./log/",
"net/": "./net/",
"ml/": "./ml/",
"utils/": "./utils/",
"ioredis": "npm:ioredis",
"@bull-board/api": "npm:@bull-board/api",
"@bull-board/express": "npm:@bull-board/express",
"express": "npm:express",
"src/": "./src/",
"onnxruntime": "npm:onnxruntime-node@1.19.2",
"chalk": "npm:chalk"
},
"fmt": {
"useTabs": true,
"lineWidth": 120,
"indentWidth": 4,
"semiColons": true,
"proseWrap": "always"
},
"exports": "./main.ts"
}

View File

@ -1,4 +1,4 @@
import logger from "lib/log/logger.ts";
import logger from "log/logger.ts";
logger.error(Error("test error"), "test service");
logger.debug(`some string`);

7
packages/crawler/main.ts Normal file
View File

@ -0,0 +1,7 @@
// DENO ASK ME TO EXPORT SOMETHING WHEN 'name' IS SPECIFIED
// AND IF I DON'T SPECIFY 'name', THE --filter FLAG IN `deno task` WON'T WORK.
// I DONT'T KNOW WHY
// SO HERE'S A PLACHOLDER EXPORT FOR DENO:
export const DENO = "FUCK YOU DENO";
// Oh, maybe export the version is a good idea
export const VERSION = "1.0.13";

View File

@ -1,12 +1,12 @@
import { AIManager } from "lib/ml/manager.ts";
import { AIManager } from "ml/manager.ts";
import * as ort from "onnxruntime";
import logger from "lib/log/logger.ts";
import { WorkerError } from "lib/mq/schema.ts";
import logger from "log/logger.ts";
import { WorkerError } from "mq/schema.ts";
import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers";
const tokenizerModel = "alikia2x/jina-embedding-v3-m2v-1024";
const onnxClassifierPath = "./model/akari/3.17.onnx";
const onnxEmbeddingPath = "./model/embedding/model.onnx";
const onnxClassifierPath = "../../model/akari/3.17.onnx";
const onnxEmbeddingPath = "../../model/embedding/model.onnx";
class AkariProto extends AIManager {
private tokenizer: PreTrainedTokenizer | null = null;

View File

@ -1,6 +1,6 @@
import * as ort from "onnxruntime";
import logger from "lib/log/logger.ts";
import { WorkerError } from "lib/mq/schema.ts";
import logger from "log/logger.ts";
import { WorkerError } from "mq/schema.ts";
export class AIManager {
public sessions: { [key: string]: ort.InferenceSession } = {};

View File

@ -1,12 +1,14 @@
import { Job } from "bullmq";
import { db } from "lib/db/init.ts";
import { getUnlabelledVideos, getVideoInfoFromAllData, insertVideoLabel } from "lib/db/allData.ts";
import Akari from "lib/ml/akari.ts";
import { ClassifyVideoQueue } from "lib/mq/index.ts";
import logger from "lib/log/logger.ts";
import { lockManager } from "lib/mq/lockManager.ts";
import { aidExistsInSongs } from "lib/db/songs.ts";
import { insertIntoSongs } from "lib/mq/task/collectSongs.ts";
import { db } from "db/init.ts";
import { getUnlabelledVideos, getVideoInfoFromAllData, insertVideoLabel } from "db/allData.ts";
import Akari from "ml/akari.ts";
import { ClassifyVideoQueue } from "mq/index.ts";
import logger from "log/logger.ts";
import { lockManager } from "mq/lockManager.ts";
import { aidExistsInSongs } from "db/songs.ts";
import { insertIntoSongs } from "mq/task/collectSongs.ts";
import { scheduleSnapshot } from "db/snapshotSchedule.ts";
import { MINUTE } from "$std/datetime/constants.ts";
export const classifyVideoWorker = async (job: Job) => {
const client = await db.connect();
@ -27,6 +29,7 @@ export const classifyVideoWorker = async (job: Job) => {
const exists = await aidExistsInSongs(client, aid);
if (!exists && label !== 0) {
await scheduleSnapshot(client, aid, "new", Date.now() + 10 * MINUTE, true);
await insertIntoSongs(client, aid);
}

View File

@ -1,8 +1,8 @@
import { Job } from "bullmq";
import { queueLatestVideos } from "lib/mq/task/queueLatestVideo.ts";
import { db } from "lib/db/init.ts";
import { insertVideoInfo } from "lib/mq/task/getVideoDetails.ts";
import { collectSongs } from "lib/mq/task/collectSongs.ts";
import { queueLatestVideos } from "mq/task/queueLatestVideo.ts";
import { db } from "db/init.ts";
import { insertVideoInfo } from "mq/task/getVideoDetails.ts";
import { collectSongs } from "mq/task/collectSongs.ts";
export const getLatestVideosWorker = async (_job: Job): Promise<void> => {
const client = await db.connect();

View File

@ -0,0 +1,397 @@
import { Job } from "bullmq";
import { db } from "db/init.ts";
import { getLatestVideoSnapshot, getVideosNearMilestone } from "db/snapshot.ts";
import {
bulkGetVideosWithoutProcessingSchedules,
bulkScheduleSnapshot,
bulkSetSnapshotStatus,
findClosestSnapshot,
findSnapshotBefore,
getLatestSnapshot,
getSnapshotsInNextSecond,
getVideosWithoutActiveSnapshotSchedule,
hasAtLeast2Snapshots,
scheduleSnapshot,
setSnapshotStatus,
snapshotScheduleExists,
videoHasProcessingSchedule,
getBulkSnapshotsInNextSecond
} from "db/snapshotSchedule.ts";
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { HOUR, MINUTE, SECOND, WEEK } from "$std/datetime/constants.ts";
import logger from "log/logger.ts";
import { SnapshotQueue } from "mq/index.ts";
import { insertVideoSnapshot } from "mq/task/getVideoStats.ts";
import { NetSchedulerError } from "mq/scheduler.ts";
import { getBiliVideoStatus, setBiliVideoStatus } from "db/allData.ts";
import { truncate } from "utils/truncate.ts";
import { lockManager } from "mq/lockManager.ts";
import { getSongsPublihsedAt } from "db/songs.ts";
import { bulkGetVideoStats } from "net/bulkGetVideoStats.ts";
const priorityMap: { [key: string]: number } = {
"milestone": 1,
"normal": 3,
};
const snapshotTypeToTaskMap: { [key: string]: string } = {
"milestone": "snapshotMilestoneVideo",
"normal": "snapshotVideo",
"new": "snapshotMilestoneVideo",
};
export const bulkSnapshotTickWorker = async (_job: Job) => {
const client = await db.connect();
try {
const schedules = await getBulkSnapshotsInNextSecond(client);
const count = schedules.length;
const groups = Math.ceil(count / 30);
for (let i = 0; i < groups; i++) {
const group = schedules.slice(i * 30, (i + 1) * 30);
const aids = group.map((schedule) => Number(schedule.aid));
const filteredAids = await bulkGetVideosWithoutProcessingSchedules(client, aids);
if (filteredAids.length === 0) continue;
await bulkSetSnapshotStatus(client, filteredAids, "processing");
const dataMap: { [key: number]: number } = {};
for (const schedule of group) {
const id = Number(schedule.id);
dataMap[id] = Number(schedule.aid);
}
await SnapshotQueue.add("bulkSnapshotVideo", {
map: dataMap,
}, { priority: 3 });
}
} catch (e) {
logger.error(e as Error);
} finally {
client.release();
}
};
export const snapshotTickWorker = async (_job: Job) => {
const client = await db.connect();
try {
const schedules = await getSnapshotsInNextSecond(client);
for (const schedule of schedules) {
if (await videoHasProcessingSchedule(client, Number(schedule.aid))) {
return `ALREADY_PROCESSING`;
}
let priority = 3;
if (schedule.type && priorityMap[schedule.type]) {
priority = priorityMap[schedule.type];
}
const aid = Number(schedule.aid);
await setSnapshotStatus(client, schedule.id, "processing");
await SnapshotQueue.add("snapshotVideo", {
aid: aid,
id: Number(schedule.id),
type: schedule.type ?? "normal",
}, { priority });
}
} catch (e) {
logger.error(e as Error);
} finally {
client.release();
}
};
export const closetMilestone = (views: number) => {
if (views < 100000) return 100000;
if (views < 1000000) return 1000000;
return 10000000;
};
const log = (value: number, base: number = 10) => Math.log(value) / Math.log(base);
/*
* Returns the minimum ETA in hours for the next snapshot
* @param client - Postgres client
* @param aid - aid of the video
* @returns ETA in hours
*/
const getAdjustedShortTermETA = async (client: Client, aid: number) => {
const latestSnapshot = await getLatestSnapshot(client, aid);
// Immediately dispatch a snapshot if there is no snapshot yet
if (!latestSnapshot) return 0;
const snapshotsEnough = await hasAtLeast2Snapshots(client, aid);
if (!snapshotsEnough) return 0;
const currentTimestamp = new Date().getTime();
const timeIntervals = [3 * MINUTE, 20 * MINUTE, 1 * HOUR, 3 * HOUR, 6 * HOUR];
const DELTA = 0.00001;
let minETAHours = Infinity;
for (const timeInterval of timeIntervals) {
const date = new Date(currentTimestamp - timeInterval);
const snapshot = await findClosestSnapshot(client, aid, date);
if (!snapshot) continue;
const hoursDiff = (latestSnapshot.created_at - snapshot.created_at) / HOUR;
const viewsDiff = latestSnapshot.views - snapshot.views;
if (viewsDiff <= 0) continue;
const speed = viewsDiff / (hoursDiff + DELTA);
const target = closetMilestone(latestSnapshot.views);
const viewsToIncrease = target - latestSnapshot.views;
const eta = viewsToIncrease / (speed + DELTA);
let factor = log(2.97 / log(viewsToIncrease + 1), 1.14);
factor = truncate(factor, 3, 100);
const adjustedETA = eta / factor;
if (adjustedETA < minETAHours) {
minETAHours = adjustedETA;
}
}
if (isNaN(minETAHours)) {
minETAHours = Infinity;
}
return minETAHours;
};
export const collectMilestoneSnapshotsWorker = async (_job: Job) => {
const client = await db.connect();
try {
const videos = await getVideosNearMilestone(client);
for (const video of videos) {
const aid = Number(video.aid);
const eta = await getAdjustedShortTermETA(client, aid);
if (eta > 72) continue;
const now = Date.now();
const scheduledNextSnapshotDelay = eta * HOUR;
const maxInterval = 4 * HOUR;
const minInterval = 1 * SECOND;
const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval);
const targetTime = now + delay;
await scheduleSnapshot(client, aid, "milestone", targetTime);
}
} catch (e) {
logger.error(e as Error, "mq", "fn:collectMilestoneSnapshotsWorker");
} finally {
client.release();
}
};
const getRegularSnapshotInterval = async (client: Client, aid: number) => {
const now = Date.now();
const date = new Date(now - 24 * HOUR);
let oldSnapshot = await findSnapshotBefore(client, aid, date);
if (!oldSnapshot) oldSnapshot = await findClosestSnapshot(client, aid, date);
const latestSnapshot = await getLatestSnapshot(client, aid);
if (!oldSnapshot || !latestSnapshot) return 0;
if (oldSnapshot.created_at === latestSnapshot.created_at) return 0;
const hoursDiff = (latestSnapshot.created_at - oldSnapshot.created_at) / HOUR;
if (hoursDiff < 8) return 24;
const viewsDiff = latestSnapshot.views - oldSnapshot.views;
if (viewsDiff === 0) return 72;
const speedPerDay = viewsDiff / (hoursDiff + 0.001) * 24;
if (speedPerDay < 6) return 36;
if (speedPerDay < 120) return 24;
if (speedPerDay < 320) return 12;
return 6;
};
export const regularSnapshotsWorker = async (_job: Job) => {
const client = await db.connect();
const startedAt = Date.now();
if (await lockManager.isLocked("dispatchRegularSnapshots")) {
logger.log("dispatchRegularSnapshots is already running", "mq");
client.release();
return;
}
await lockManager.acquireLock("dispatchRegularSnapshots", 30 * 60);
try {
const aids = await getVideosWithoutActiveSnapshotSchedule(client);
for (const rawAid of aids) {
const aid = Number(rawAid);
const latestSnapshot = await getLatestVideoSnapshot(client, aid);
const now = Date.now();
const lastSnapshotedAt = latestSnapshot?.time ?? now;
const interval = await getRegularSnapshotInterval(client, aid);
logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq");
const targetTime = truncate(lastSnapshotedAt + interval * HOUR, now + 1, now + 100000 * WEEK);
await scheduleSnapshot(client, aid, "normal", targetTime);
if (now - startedAt > 25 * MINUTE) {
return;
}
}
} catch (e) {
logger.error(e as Error, "mq", "fn:regularSnapshotsWorker");
} finally {
lockManager.releaseLock("dispatchRegularSnapshots");
client.release();
}
};
export const takeBulkSnapshotForVideosWorker = async (job: Job) => {
const dataMap: { [key: number]: number } = job.data.map;
const ids = Object.keys(dataMap).map((id) => Number(id));
const aidsToFetch: number[] = [];
const client = await db.connect();
try {
for (const id of ids) {
const aid = Number(dataMap[id]);
const exists = await snapshotScheduleExists(client, id);
if (!exists) {
continue;
}
aidsToFetch.push(aid);
}
const data = await bulkGetVideoStats(aidsToFetch);
if (typeof data === "number") {
await bulkSetSnapshotStatus(client, ids, "failed");
await bulkScheduleSnapshot(client, aidsToFetch, "normal", Date.now() + 15 * SECOND);
return `GET_BILI_STATUS_${data}`;
}
for (const video of data) {
const aid = video.id;
const stat = video.cnt_info;
const views = stat.play;
const danmakus = stat.danmaku;
const replies = stat.reply;
const likes = stat.thumb_up;
const coins = stat.coin;
const shares = stat.share;
const favorites = stat.collect;
const query: string = `
INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`;
await client.queryObject(
query,
[aid, views, danmakus, replies, likes, coins, shares, favorites],
);
logger.log(`Taken snapshot for video ${aid} in bulk.`, "net", "fn:takeBulkSnapshotForVideosWorker");
}
await bulkSetSnapshotStatus(client, ids, "completed");
for (const aid of aidsToFetch) {
const interval = await getRegularSnapshotInterval(client, aid);
logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq");
await scheduleSnapshot(client, aid, "normal", Date.now() + interval * HOUR);
}
return `DONE`;
} catch (e) {
if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") {
logger.warn(
`No available proxy for bulk request now.`,
"mq",
"fn:takeBulkSnapshotForVideosWorker",
);
await bulkSetSnapshotStatus(client, ids, "completed");
await bulkScheduleSnapshot(client, aidsToFetch, "normal", Date.now() + 2 * MINUTE);
return;
}
logger.error(e as Error, "mq", "fn:takeBulkSnapshotForVideosWorker");
await bulkSetSnapshotStatus(client, ids, "failed");
}
finally {
client.release();
}
};
export const takeSnapshotForVideoWorker = async (job: Job) => {
const id = job.data.id;
const aid = Number(job.data.aid);
const type = job.data.type;
const task = snapshotTypeToTaskMap[type] ?? "snapshotVideo";
const client = await db.connect();
const retryInterval = type === "milestone" ? 5 * SECOND : 2 * MINUTE;
const exists = await snapshotScheduleExists(client, id);
if (!exists) {
client.release();
return;
}
const status = await getBiliVideoStatus(client, aid);
if (status !== 0) {
client.release();
return `REFUSE_WORKING_BILI_STATUS_${status}`;
}
try {
await setSnapshotStatus(client, id, "processing");
const stat = await insertVideoSnapshot(client, aid, task);
if (typeof stat === "number") {
await setBiliVideoStatus(client, aid, stat);
await setSnapshotStatus(client, id, "completed");
return `GET_BILI_STATUS_${stat}`;
}
await setSnapshotStatus(client, id, "completed");
if (type === "normal") {
const interval = await getRegularSnapshotInterval(client, aid);
logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq");
await scheduleSnapshot(client, aid, type, Date.now() + interval * HOUR);
return `DONE`;
} else if (type === "new") {
const publihsedAt = await getSongsPublihsedAt(client, aid);
const timeSincePublished = stat.time - publihsedAt!;
const viewsPerHour = stat.views / timeSincePublished * HOUR;
if (timeSincePublished > 48 * HOUR) {
return `DONE`;
}
if (timeSincePublished > 2 * HOUR && viewsPerHour < 10) {
return `DONE`;
}
let intervalMins = 240;
if (viewsPerHour > 50) {
intervalMins = 120;
}
if (viewsPerHour > 100) {
intervalMins = 60;
}
if (viewsPerHour > 1000) {
intervalMins = 15;
}
await scheduleSnapshot(client, aid, type, Date.now() + intervalMins * MINUTE, true);
}
if (type !== "milestone") return `DONE`;
const eta = await getAdjustedShortTermETA(client, aid);
if (eta > 72) return "ETA_TOO_LONG";
const now = Date.now();
const targetTime = now + eta * HOUR;
await scheduleSnapshot(client, aid, type, targetTime);
return `DONE`;
} catch (e) {
if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") {
logger.warn(
`No available proxy for aid ${job.data.aid}.`,
"mq",
"fn:takeSnapshotForVideoWorker",
);
await setSnapshotStatus(client, id, "completed");
await scheduleSnapshot(client, aid, type, Date.now() + retryInterval);
return;
}
logger.error(e as Error, "mq", "fn:takeSnapshotForVideoWorker");
await setSnapshotStatus(client, id, "failed");
} finally {
client.release();
}
};
export const scheduleCleanupWorker = async (_job: Job) => {
const client = await db.connect();
try {
const query = `
SELECT id, aid, type
FROM snapshot_schedule
WHERE status IN ('pending', 'processing')
AND started_at < NOW() - INTERVAL '5 minutes'
`;
const { rows } = await client.queryObject<{ id: bigint; aid: bigint; type: string }>(query);
if (rows.length === 0) return;
for (const row of rows) {
const id = Number(row.id);
const aid = Number(row.aid);
const type = row.type;
await setSnapshotStatus(client, id, "timeout");
await scheduleSnapshot(client, aid, type, Date.now() + 10 * SECOND);
logger.log(
`Schedule ${id} has no response received for 5 minutes, rescheduled.`,
"mq",
"fn:scheduleCleanupWorker",
);
}
} catch (e) {
logger.error(e as Error, "mq", "fn:scheduleCleanupWorker");
} finally {
client.release();
}
};

View File

@ -0,0 +1 @@
export * from "mq/exec/getLatestVideos.ts";

View File

@ -0,0 +1,67 @@
import { MINUTE, SECOND } from "$std/datetime/constants.ts";
import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "mq/index.ts";
import logger from "log/logger.ts";
import { initSnapshotWindowCounts } from "db/snapshotSchedule.ts";
import { db } from "db/init.ts";
import { redis } from "db/redis.ts";
export async function initMQ() {
const client = await db.connect();
try {
await initSnapshotWindowCounts(client, redis);
await LatestVideosQueue.upsertJobScheduler("getLatestVideos", {
every: 1 * MINUTE,
immediately: true,
});
await ClassifyVideoQueue.upsertJobScheduler("classifyVideos", {
every: 5 * MINUTE,
immediately: true,
});
await LatestVideosQueue.upsertJobScheduler("collectSongs", {
every: 3 * MINUTE,
immediately: true,
});
await SnapshotQueue.upsertJobScheduler("snapshotTick", {
every: 1 * SECOND,
immediately: true,
}, {
opts: {
removeOnComplete: 1,
removeOnFail: 1,
},
});
await SnapshotQueue.upsertJobScheduler("bulkSnapshotTick", {
every: 15 * SECOND,
immediately: true,
}, {
opts: {
removeOnComplete: 1,
removeOnFail: 1,
},
});
await SnapshotQueue.upsertJobScheduler("collectMilestoneSnapshots", {
every: 5 * MINUTE,
immediately: true,
});
await SnapshotQueue.upsertJobScheduler("dispatchRegularSnapshots", {
every: 30 * MINUTE,
immediately: true,
});
await SnapshotQueue.upsertJobScheduler("scheduleCleanup", {
every: 30 * MINUTE,
immediately: true,
});
logger.log("Message queue initialized.");
} finally {
client.release();
}
}

View File

@ -1,5 +1,5 @@
import { Redis } from "ioredis";
import { redis } from "lib/db/redis.ts";
import { redis } from "db/redis.ts";
class LockManager {
private redis: Redis;

View File

@ -1,4 +1,4 @@
import { SlidingWindow } from "lib/mq/slidingWindow.ts";
import { SlidingWindow } from "mq/slidingWindow.ts";
export interface RateLimiterConfig {
window: SlidingWindow;

View File

@ -1,10 +1,9 @@
import logger from "lib/log/logger.ts";
import { RateLimiter, RateLimiterConfig } from "lib/mq/rateLimiter.ts";
import { SlidingWindow } from "lib/mq/slidingWindow.ts";
import { redis } from "lib/db/redis.ts";
import logger from "log/logger.ts";
import { RateLimiter, RateLimiterConfig } from "mq/rateLimiter.ts";
import { SlidingWindow } from "mq/slidingWindow.ts";
import { redis } from "db/redis.ts";
import Redis from "ioredis";
import { SECOND } from "$std/datetime/constants.ts";
import { randomUUID } from "node:crypto";
interface Proxy {
type: string;
@ -334,6 +333,18 @@ const biliLimiterConfig: RateLimiterConfig[] = [
},
];
const bili_test = [...biliLimiterConfig];
bili_test[0].max = 10;
bili_test[1].max = 36;
bili_test[2].max = 150;
bili_test[3].max = 1000;
const bili_strict = [...biliLimiterConfig];
bili_strict[0].max = 1;
bili_strict[1].max = 4;
bili_strict[2].max = 12;
bili_strict[3].max = 100;
/*
Execution order for setup:
@ -365,7 +376,15 @@ for (const region of regions) {
netScheduler.addTask("getVideoInfo", "bilibili", "all");
netScheduler.addTask("getLatestVideos", "bilibili", "all");
netScheduler.addTask("snapshotMilestoneVideo", "bilibili", regions.map((region) => `alicloud-${region}`));
netScheduler.addTask("snapshotVideo", "bilibili", [
netScheduler.addTask("snapshotVideo", "bili_test", [
"alicloud-qingdao",
"alicloud-shanghai",
"alicloud-zhangjiakou",
"alicloud-chengdu",
"alicloud-shenzhen",
"alicloud-hohhot",
]);
netScheduler.addTask("bulkSnapshot", "bili_strict", [
"alicloud-qingdao",
"alicloud-shanghai",
"alicloud-zhangjiakou",
@ -376,7 +395,10 @@ netScheduler.addTask("snapshotVideo", "bilibili", [
netScheduler.setTaskLimiter("getVideoInfo", videoInfoRateLimiterConfig);
netScheduler.setTaskLimiter("getLatestVideos", null);
netScheduler.setTaskLimiter("snapshotMilestoneVideo", null);
netScheduler.setTaskLimiter("snapshotVideo", videoInfoRateLimiterConfig);
netScheduler.setTaskLimiter("snapshotVideo", null);
netScheduler.setTaskLimiter("bulkSnapshot", null);
netScheduler.setProviderLimiter("bilibili", biliLimiterConfig);
netScheduler.setProviderLimiter("bili_test", bili_test);
netScheduler.setProviderLimiter("bili_strict", bili_strict);
export default netScheduler;

View File

@ -1,6 +1,8 @@
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { aidExistsInSongs, getNotCollectedSongs } from "lib/db/songs.ts";
import logger from "lib/log/logger.ts";
import { aidExistsInSongs, getNotCollectedSongs } from "db/songs.ts";
import logger from "log/logger.ts";
import { scheduleSnapshot } from "db/snapshotSchedule.ts";
import { MINUTE } from "$std/datetime/constants.ts";
export async function collectSongs(client: Client) {
const aids = await getNotCollectedSongs(client);
@ -8,6 +10,7 @@ export async function collectSongs(client: Client) {
const exists = await aidExistsInSongs(client, aid);
if (exists) continue;
await insertIntoSongs(client, aid);
await scheduleSnapshot(client, aid, "new", Date.now() + 10 * MINUTE, true);
logger.log(`Video ${aid} was added into the songs table.`, "mq", "fn:collectSongs");
}
}

View File

@ -1,9 +1,9 @@
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { getVideoDetails } from "lib/net/getVideoDetails.ts";
import { formatTimestampToPsql } from "lib/utils/formatTimestampToPostgre.ts";
import logger from "lib/log/logger.ts";
import { ClassifyVideoQueue } from "lib/mq/index.ts";
import { userExistsInBiliUsers, videoExistsInAllData } from "lib/db/allData.ts";
import { getVideoDetails } from "net/getVideoDetails.ts";
import { formatTimestampToPsql } from "utils/formatTimestampToPostgre.ts";
import logger from "log/logger.ts";
import { ClassifyVideoQueue } from "mq/index.ts";
import { userExistsInBiliUsers, videoExistsInAllData } from "db/allData.ts";
import { HOUR, SECOND } from "$std/datetime/constants.ts";
export async function insertVideoInfo(client: Client, aid: number) {

View File

@ -1,7 +1,7 @@
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { getVideoInfo } from "lib/net/getVideoInfo.ts";
import { LatestSnapshotType } from "lib/db/schema.d.ts";
import logger from "lib/log/logger.ts";
import { getVideoInfo } from "net/getVideoInfo.ts";
import { LatestSnapshotType } from "db/schema.d.ts";
import logger from "log/logger.ts";
/*
* Fetch video stats from bilibili API and insert into database

View File

@ -1,10 +1,10 @@
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { getLatestVideoAids } from "lib/net/getLatestVideoAids.ts";
import { videoExistsInAllData } from "lib/db/allData.ts";
import { sleep } from "lib/utils/sleep.ts";
import { getLatestVideoAids } from "net/getLatestVideoAids.ts";
import { videoExistsInAllData } from "db/allData.ts";
import { sleep } from "utils/sleep.ts";
import { SECOND } from "$std/datetime/constants.ts";
import logger from "lib/log/logger.ts";
import { LatestVideosQueue } from "lib/mq/index.ts";
import logger from "log/logger.ts";
import { LatestVideosQueue } from "mq/index.ts";
export async function queueLatestVideos(
client: Client,

View File

@ -9,6 +9,25 @@ export type VideoListResponse = BaseResponse<VideoListData>;
export type VideoDetailsResponse = BaseResponse<VideoDetailsData>;
export type VideoTagsResponse = BaseResponse<VideoTagsData>;
export type VideoInfoResponse = BaseResponse<VideoInfoData>;
export type MediaListInfoResponse = BaseResponse<MediaListInfoData>;
export type MediaListInfoData = MediaListInfoItem[];
export interface MediaListInfoItem {
attr: number;
bvid: string;
id: number;
cnt_info: {
coin: number;
collect: number;
danmaku: number;
play: number;
reply: number;
share: number;
thumb_up: number;
}
}
interface VideoInfoData {
bvid: string;

View File

@ -0,0 +1,27 @@
import netScheduler from "mq/scheduler.ts";
import { MediaListInfoData, MediaListInfoResponse } from "net/bilibili.d.ts";
import logger from "log/logger.ts";
/*
* Bulk fetch video metadata from bilibili API
* @param {number[]} aids - The aid list to fetch
* @returns {Promise<MediaListInfoData | number>} MediaListInfoData or the error code returned by bilibili API
* @throws {NetSchedulerError} - The error will be thrown in following cases:
* - No proxy is available currently: with error code `NO_PROXY_AVAILABLE`
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
*/
export async function bulkGetVideoStats(aids: number[]): Promise<MediaListInfoData | number> {
const baseURL = `https://api.bilibili.com/medialist/gateway/base/resource/infos?resources=`;
let url = baseURL;
for (const aid of aids) {
url += `${aid}:2,`;
}
const data = await netScheduler.request<MediaListInfoResponse>(url, "bulkSnapshot");
const errMessage = `Error fetching metadata for aid list: ${aids.join(",")}:`;
if (data.code !== 0) {
logger.error(errMessage + data.code + "-" + data.message, "net", "fn:getVideoInfo");
return data.code;
}
return data.data;
}

View File

@ -1,6 +1,6 @@
import { VideoListResponse } from "lib/net/bilibili.d.ts";
import logger from "lib/log/logger.ts";
import netScheduler from "lib/mq/scheduler.ts";
import { VideoListResponse } from "net/bilibili.d.ts";
import logger from "log/logger.ts";
import netScheduler from "mq/scheduler.ts";
export async function getLatestVideoAids(page: number = 1, pageSize: number = 10): Promise<number[]> {
const startFrom = 1 + pageSize * (page - 1);

View File

@ -1,6 +1,6 @@
import netScheduler from "lib/mq/scheduler.ts";
import { VideoDetailsData, VideoDetailsResponse } from "lib/net/bilibili.d.ts";
import logger from "lib/log/logger.ts";
import netScheduler from "mq/scheduler.ts";
import { VideoDetailsData, VideoDetailsResponse } from "net/bilibili.d.ts";
import logger from "log/logger.ts";
export async function getVideoDetails(aid: number): Promise<VideoDetailsData | null> {
const url = `https://api.bilibili.com/x/web-interface/view/detail?aid=${aid}`;

View File

@ -1,6 +1,6 @@
import netScheduler from "lib/mq/scheduler.ts";
import { VideoInfoData, VideoInfoResponse } from "lib/net/bilibili.d.ts";
import logger from "lib/log/logger.ts";
import netScheduler from "mq/scheduler.ts";
import { VideoInfoData, VideoInfoResponse } from "net/bilibili.d.ts";
import logger from "log/logger.ts";
/*
* Fetch video metadata from bilibili API

View File

@ -2,7 +2,7 @@ import express from "express";
import { createBullBoard } from "@bull-board/api";
import { BullMQAdapter } from "@bull-board/api/bullMQAdapter.js";
import { ExpressAdapter } from "@bull-board/express";
import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "lib/mq/index.ts";
import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "mq/index.ts";
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath("/");

View File

@ -1,10 +1,10 @@
import { ConnectionOptions, Job, Worker } from "bullmq";
import { redis } from "lib/db/redis.ts";
import logger from "lib/log/logger.ts";
import { classifyVideosWorker, classifyVideoWorker } from "lib/mq/exec/classifyVideo.ts";
import { WorkerError } from "lib/mq/schema.ts";
import { lockManager } from "lib/mq/lockManager.ts";
import Akari from "lib/ml/akari.ts";
import { redis } from "db/redis.ts";
import logger from "log/logger.ts";
import { classifyVideosWorker, classifyVideoWorker } from "mq/exec/classifyVideo.ts";
import { WorkerError } from "mq/schema.ts";
import { lockManager } from "mq/lockManager.ts";
import Akari from "ml/akari.ts";
Deno.addSignalListener("SIGINT", async () => {
logger.log("SIGINT Received: Shutting down workers...", "mq");

Some files were not shown because too many files have changed in this diff Show More