Compare commits
31 Commits
36bee46f9d
...
cc41aa45b1
Author | SHA1 | Date | |
---|---|---|---|
cc41aa45b1 | |||
13198a49e8 | |||
003249ac4d | |||
0614067278 | |||
6df6345ec1 | |||
bae1f84bea | |||
21c918f1fa | |||
f1651fee30 | |||
d0b7d93e5b | |||
7a7c5cada9 | |||
10b761e3db | |||
1f6411b512 | |||
9ef513eed7 | |||
d80a6bfcd9 | |||
7a6892ae8e | |||
f4d08e944a | |||
b5dbf293a2 | |||
fc90dad185 | |||
0b36f52c6c | |||
445886815a | |||
8e7a1c3076 | |||
71ed0bd66b | |||
b76d8e589c | |||
69fb3604b1 | |||
d98e24b62f | |||
c4c9a3a440 | |||
da1bea7f41 | |||
38c0cbd371 | |||
a90747878e | |||
dd720b18fa | |||
3a83df7954 |
@ -17,6 +17,8 @@
|
||||
<excludeFolder url="file://$MODULE_DIR$/.idea" />
|
||||
<excludeFolder url="file://$MODULE_DIR$/.vscode" />
|
||||
<excludeFolder url="file://$MODULE_DIR$/.zed" />
|
||||
<excludeFolder url="file://$MODULE_DIR$/packages/frontend/.astro" />
|
||||
<excludeFolder url="file://$MODULE_DIR$/scripts" />
|
||||
</content>
|
||||
<orderEntry type="inheritedJdk" />
|
||||
<orderEntry type="sourceFolder" forTests="false" />
|
||||
|
@ -1,6 +1,6 @@
|
||||
---
|
||||
icon: hand-wave
|
||||
description: 「中V档案馆」 (CVSA) 是一个收录中文歌声合成文化圈有关信息的网站。
|
||||
icon: hand-wave
|
||||
layout:
|
||||
title:
|
||||
visible: true
|
||||
@ -16,10 +16,10 @@ layout:
|
||||
|
||||
# 欢迎
|
||||
|
||||
欢迎阅读CVSA文档!
|
||||
欢迎阅读中V档案馆文档!
|
||||
|
||||
该文档包含有关中V档案馆项目的各种信息,包括本项目的有关信息、技术架构、访客指南、API文档等。
|
||||
|
||||
### 导航
|
||||
|
||||
<table data-view="cards"><thead><tr><th></th><th></th><th data-hidden data-card-cover data-type="files"></th><th data-hidden></th><th data-hidden data-card-target data-type="content-ref"></th></tr></thead><tbody><tr><td><strong>关于本项目</strong></td><td>一些你可能想知道的…</td><td></td><td></td><td><a href="about/this-project.md">this-project.md</a></td></tr><tr><td><strong>技术架构</strong></td><td>关于本项目的技术细节</td><td></td><td></td><td><a href="broken-reference">Broken link</a></td></tr><tr><td><strong>API 文档</strong> </td><td>中V档案馆公开 API 的文档</td><td></td><td></td><td><a href="broken-reference">Broken link</a></td></tr><tr><td><strong>项目地址</strong></td><td>在 <a href="https://github.com/alikia2x/cvsa">GitHub</a> 或 <a href="https://gitee.com/alikia/cvsa">Gitee</a> 上查看本项目</td><td></td><td></td><td><a href="https://gitee.com/alikia/cvsa">https://gitee.com/alikia/cvsa</a></td></tr><tr><td>🇺🇸 English Version</td><td>Hint: There's a language switcher on the top-left corner, just to the right of the logo.</td><td></td><td></td><td><a href="https://app.gitbook.com/o/ZRcyqFK0ovlJduZb50X0/s/89Gi0XfqMigoQkEYJZZl/">CVSA Doc English</a></td></tr></tbody></table>
|
||||
<table data-view="cards"><thead><tr><th></th><th></th><th data-hidden data-card-target data-type="content-ref"></th></tr></thead><tbody><tr><td><strong>关于本项目</strong></td><td>一些你可能想知道的…</td><td><a href="about/this-project.md">this-project.md</a></td></tr><tr><td><strong>技术架构</strong></td><td>关于本项目的技术细节</td><td><a href="broken-reference">Broken link</a></td></tr><tr><td><strong>API 文档</strong> </td><td>中V档案馆公开 API 的文档</td><td><a href="broken-reference">Broken link</a></td></tr><tr><td>🇺🇸 English Version</td><td>Tip: There is a language selector in the header.</td><td><a href="https://app.gitbook.com/o/ZRcyqFK0ovlJduZb50X0/s/89Gi0XfqMigoQkEYJZZl/">CVSA Doc English</a></td></tr><tr><td><strong>项目地址</strong></td><td>在 <a href="https://github.com/alikia2x/cvsa">GitHub</a> 或 <a href="https://gitee.com/alikia/cvsa">Gitee</a> 上查看本项目</td><td><a href="https://gitee.com/alikia/cvsa">https://gitee.com/alikia/cvsa</a></td></tr><tr><td><strong>网站</strong></td><td>我们新上线的测试网站,查看目前数据库中的信息</td><td><a href="https://projectcvsa.com">https://projectcvsa.com</a></td></tr></tbody></table>
|
||||
|
@ -9,12 +9,12 @@
|
||||
|
||||
## 技术架构 <a href="#architecture" id="architecture"></a>
|
||||
|
||||
- [概览](architecture/overview.md)
|
||||
- [数据库结构](architecture/database-structure/README.md)
|
||||
- [歌曲类型](architecture/database-structure/type-of-song.md)
|
||||
- [人工智能](architecture/artificial-intelligence.md)
|
||||
- [消息队列](architecture/message-queue/README.md)
|
||||
- [LatestVideosQueue 队列](architecture/message-queue/latestvideosqueue-dui-lie.md)
|
||||
* [概览](architecture/overview.md)
|
||||
* [Crawler 模块介绍](architecture/crawler.md)
|
||||
* [数据库结构](architecture/database-structure/README.md)
|
||||
* [歌曲类型](architecture/database-structure/type-of-song.md)
|
||||
* [snapshot\_schedule 表](architecture/database-structure/table-snapshot_schedule.md)
|
||||
* [机器学习](architecture/machine-learning.md)
|
||||
|
||||
## API 文档 <a href="#api-doc" id="api-doc"></a>
|
||||
|
||||
|
@ -1,22 +1,32 @@
|
||||
# 收录范围
|
||||
|
||||
中V档案馆收录许多有关中文歌声合成的内容,包括歌曲、专辑、艺术家(发布者、调校师、编曲者等)、歌手以及引擎/声库。 
|
||||
中V档案馆收录许多有关中文歌声合成的内容,包括歌曲、专辑、艺术家(发布者、调校师、编曲者等)、歌手以及引擎/声库。
|
||||
|
||||
对于一首**歌曲**,必须满足以下条件才能被收录到中V档案馆中:
|
||||
对于一首**歌曲**,必须满足以下两个条件才能被收录到中V档案馆中:
|
||||
|
||||
#### VOCALOID·UATU 分区
|
||||
### 至少一行中文/中文虚拟歌手
|
||||
|
||||
原则上,中V档案馆中收录的歌曲必须包含在哔哩哔哩 VOCALOID·UTAU
|
||||
分区(分区ID为30)下的视频中。在某些特殊情况下,此规则可能不是强制的。
|
||||
歌曲歌词必须至少包含一行中文。否则,如果歌曲歌词不包含中文,则只有在使用中文虚拟歌手的情况下才会将其包含在中V档案馆中。
|
||||
|
||||
#### 至少一行中文
|
||||
我们对**中文虚拟歌手**的定义如下:
|
||||
|
||||
歌曲的歌词必须包含至少一行中文。这意味着,即使使用了仅支持中文的声库,如果歌曲的歌词中没有中文,也不会被收录到中V档案馆中(例如,跨语种调校)。
|
||||
1. 歌手主要使用中文声库(即歌手最广泛使用的声库是中文)。
|
||||
2. 歌手由位于中国大陆、香港、澳门或台湾的公司、组织、个人或团体运营。
|
||||
|
||||
#### 使用歌声合成器
|
||||
### 使用歌声合成器
|
||||
|
||||
歌曲的至少一行必须由歌声合成器生成(包括和声部分),才能被收录到中V档案馆中。
|
||||
歌曲的至少一行必须由歌声合成器合成(包括和声),才能被收录到中V档案馆中。
|
||||
|
||||
我们将歌声合成器定义为通过算法建模声音特征并根据输入的歌词、音高等参数生成音频的软件或系统,包括基于波形拼接的(如
|
||||
VOCALOID、UTAU)和基于 AI 的(如 Synthesizer V、ACE Studio)方法,**但不包括仅改变现有歌声音色的AI声音转换器**(例如
|
||||
[so-vits svc](https://github.com/svc-develop-team/so-vits-svc))。
|
||||
我们将歌声合成器定义为通过算法建模声音特征并根据输入的歌词、音高等参数生成音频的软件或系统,包括基于波形拼接的(如VOCALOID 1\~5、UTAU)和基于 AI 的(如 Synthesizer V、ACE Studio)方法,**但不包括仅改变现有歌声音色的AI声音转换器**(例如[so-vits svc](https://github.com/svc-develop-team/so-vits-svc))。
|
||||
|
||||
 
|
||||
|
||||
此外,歌曲必须出现在发布到哔哩哔哩中 VOCALOID·UTAU 分区下视频中,才能被我们的自动化程序观察到。我们欢迎编辑手动添加尚未上传到 bilibili或未归类到此类别的歌曲。
|
||||
|
||||
**新闻**
|
||||
|
||||
最近,哔哩哔哩似乎正在下线二级分区。这意味着VOCALOID·UTAU分区将无法从前端进入,创作者们也无法再将视频上传到该分区(只能选择“音乐区”)。
|
||||
|
||||
根据我们的实验,哔哩哔哩在后端仍然保留了二级分区的代码逻辑,新发布的歌曲可能仍在 VOCALOID·UTAU 分区中,相关API仍可正常工作。目前,有[报告](https://www.bilibili.com/opus/1041223385394184199)称部分新歌曲被归入了“音乐综合”子分区。。此外,我们观察到哔哩哔哩实际上并没有尊重创作者投稿时选择的分区,而是使用某种方法自动为视频分配分区。我们已经观察到有[稿件](https://www.bilibili.com/video/av114163368068672/)出现了被归类到非音乐区的问题。
|
||||
|
||||
我们仍在等待哔哩哔哩的后续行动,未来我们可能会调整自动化程序的抓取范围。
|
||||
|
@ -6,33 +6,28 @@
|
||||
|
||||
纵观整个互联网,对于「中文歌声合成」或「中文虚拟歌手」(常简称为中V或VC)相关信息进行较为系统、全面地整理收集的主要有以下几个网站:
|
||||
|
||||
- [萌娘百科](https://zh.moegirl.org.cn/):
|
||||
收录了大量中V歌曲及歌姬的信息,呈现形式为传统维基(基于[MediaWiki](https://www.mediawiki.org/))。
|
||||
- [VCPedia](https://vcpedia.cn/):
|
||||
由原萌娘百科中文歌声合成编辑团队的部分成员搭建,专属于中文歌声合成相关内容的信息集成站点[^1],呈现形式为传统维基(基于[MediaWiki](https://www.mediawiki.org/))。
|
||||
- [VocaDB](https://vocadb.net/):
|
||||
[一个围绕 Vocaloid、UTAU 和其他歌声合成器的协作数据库,其中包含艺术家、唱片、PV 等](#user-content-fn-2)[^2],其中包含大量中文歌声合成作品。
|
||||
- [天钿Daily](https://tdd.bunnyxt.com/):一个VC相关数据交流与分享的网站。致力于VC相关数据交流,定期抓取VC相关数据,选取有意义的纬度展示。
|
||||
* [萌娘百科](https://zh.moegirl.org.cn/): 收录了大量中V歌曲及歌姬的信息,呈现形式为传统维基(基于[MediaWiki](https://www.mediawiki.org/))。
|
||||
* [VCPedia](https://vcpedia.cn/): 由原萌娘百科中文歌声合成编辑团队的部分成员搭建,专属于中文歌声合成相关内容的信息集成站点,呈现形式为传统维基(基于[MediaWiki](https://www.mediawiki.org/))。
|
||||
* [VocaDB](https://vocadb.net/): [一个围绕 Vocaloid、UTAU 和其他歌声合成器的协作数据库,其中包含艺术家、唱片、PV 等](#user-content-fn-1)[^1],其中包含大量中文歌声合成作品。
|
||||
* [天钿Daily](https://tdd.bunnyxt.com/):一个VC相关数据交流与分享的网站。致力于VC相关数据交流,定期抓取VC相关数据,选取有意义的纬度展示。
|
||||
|
||||
上述网站中,或多或少存在一些不足,例如:
|
||||
|
||||
- 萌娘百科、VCPedia受限于传统维基,绝大多数内容依赖人工编辑。
|
||||
- VocaDB基于结构化数据库构建,由此可以依赖程序生成一些信息,但**条目收录**仍然完全依赖人工完成。
|
||||
- VocaDB主要专注于元数据展示,少有关于歌曲、作者等的描述性的文字,也缺乏描述性的背景信息。
|
||||
- 天钿Daily只展示歌曲的统计数据及历史趋势,没有关于歌曲其它信息的收集。
|
||||
* 萌娘百科、VCPedia受限于传统维基,绝大多数内容依赖人工编辑。
|
||||
* VocaDB基于结构化数据库构建,由此可以依赖程序生成一些信息,但**条目收录**仍然完全依赖人工完成。
|
||||
* VocaDB主要专注于元数据展示,少有关于歌曲、作者等的描述性的文字,也缺乏描述性的背景信息。
|
||||
* 天钿Daily只展示歌曲的统计数据及历史趋势,没有关于歌曲其它信息的收集。
|
||||
|
||||
因此,**中V档案馆**吸取前人经验,克服上述网站的不足,希望做到:
|
||||
|
||||
- 歌曲收录(指发现歌曲并创建条目)的完全自动化
|
||||
- 歌曲元信息提取的高度自动化
|
||||
- 歌曲统计数据收集的完全自动化
|
||||
- 在程序辅助的同时欢迎并鼓励贡献者参与编辑(主要为描述性内容)或纠错
|
||||
- 在适当的许可声明下,引用来自上述源的数据,使内容更加全面、丰富。
|
||||
* 歌曲收录(指发现歌曲并创建条目)的完全自动化
|
||||
* 歌曲元信息提取的高度自动化
|
||||
* 歌曲统计数据收集的完全自动化
|
||||
* 在程序辅助的同时欢迎并鼓励贡献者参与编辑(主要为描述性内容)或纠错
|
||||
* 在适当的许可声明下,引用来自上述源的数据,使内容更加全面、丰富。
|
||||
|
||||
---
|
||||
***
|
||||
|
||||
本文在[CC BY-NC-SA 4.0协议](https://creativecommons.org/licenses/by-nc-sa/4.0/)提供。
|
||||
|
||||
[^1]: 引用自[VCPedia](https://vcpedia.cn/%E9%A6%96%E9%A1%B5),于[知识共享 署名-非商业性使用-相同方式共享 3.0中国大陆 (CC BY-NC-SA 3.0 CN) 许可协议](https://creativecommons.org/licenses/by-nc-sa/3.0/cn/)下提供。
|
||||
|
||||
[^2]: 翻译自[VocaDB](https://vocadb.net/),于[CC BY 4.0协议](https://creativecommons.org/licenses/by/4.0/)下提供。
|
||||
[^1]: 翻译自[VocaDB](https://vocadb.net/),于[CC BY 4.0协议](https://creativecommons.org/licenses/by/4.0/)下提供。
|
||||
|
@ -1,6 +1,6 @@
|
||||
# 视频快照
|
||||
|
||||
{% openapi src="../.gitbook/assets/1.yaml" path="/video/{id}/snapshots" method="get" %}
|
||||
[1.yaml](../.gitbook/assets/1.yaml)
|
||||
{% openapi src="../.gitbook/assets/API-doc.yaml" path="/video/{id}/snapshots" method="get" %}
|
||||
[API-doc.yaml](../.gitbook/assets/API-doc.yaml)
|
||||
{% endopenapi %}
|
||||
|
||||
|
@ -1,13 +0,0 @@
|
||||
# 人工智能
|
||||
|
||||
CVSA 的自动化工作流高度依赖人工智能进行信息提取和分类。
|
||||
|
||||
我们目前使用的 AI 系统有:
|
||||
|
||||
#### Filter
|
||||
|
||||
位于项目根目录下的 `/filter/`,它将 [30 分区](../about/scope-of-inclusion.md#vocaloiduatu-fen-qu) 中的视频分为以下类别:
|
||||
|
||||
- 0:与中文人声合成无关
|
||||
- 1:中文人声合成原创曲
|
||||
- 2:中文人声合成的翻唱/混音歌曲
|
68
doc/zh/architecture/crawler.md
Normal file
68
doc/zh/architecture/crawler.md
Normal file
@ -0,0 +1,68 @@
|
||||
# Crawler 模块介绍
|
||||
|
||||
在中V档案馆的技术架构中,自动化是核心设计理念。`crawler` 模块负责整个数据采集流程,通过 [BullMQ](https://bullmq.io/) 实现任务的消息队列管理,支持高并发地处理多个采集任务。
|
||||
|
||||
系统的数据存储与状态管理采用了 Redis(用于缓存和实时数据)与 PostgreSQL(作为主数据库)的组合方式,确保了稳定性与高效性。
|
||||
|
||||
***
|
||||
|
||||
### 模块结构概览
|
||||
|
||||
#### `crawler/db` —— 数据库操作模块
|
||||
|
||||
负责与数据库的交互,提供创建、更新、查询等功能。
|
||||
|
||||
* `init.ts`:初始化 PostgreSQL 连接池。
|
||||
* `redis.ts`:配置 Redis 客户端。
|
||||
* `withConnection.ts`:导出 `withDatabaseConnection` 函数,用于包装数据库操作函数,提供数据库上下文。
|
||||
* 其他文件:每个文件对应数据库中的一张表,封装了该表的操作逻辑。
|
||||
|
||||
#### `crawler/ml` —— 机器学习模块
|
||||
|
||||
负责与机器学习模型相关的处理逻辑,主要用于视频内容的文本分类。
|
||||
|
||||
* `manager.ts`:定义了一个模型管理基类 `AIManager`。
|
||||
* `akari.ts`:实现了用于筛选歌曲视频的分类模型 `AkariProto`,继承自 `AIManager`。
|
||||
|
||||
#### `crawler/mq` —— 消息队列模块
|
||||
|
||||
整合 BullMQ,实现任务调度和异步处理。
|
||||
|
||||
**`crawler/mq/exec`**
|
||||
|
||||
该目录下包含了各类任务的处理函数。虽然这些函数并非 BullMQ 所直接定义的“worker”,但在文档中我们仍将其统一称为 **worker**(例如 `getVideoInfoWorker`、`takeBulkSnapshotForVideosWorker`)。
|
||||
|
||||
> **说明:**
|
||||
>
|
||||
> * `crawler/mq/exec` 中的函数称为 **worker**。
|
||||
> * `crawler/mq/workers` 中的函数我们称为 **BullMQ worker**。
|
||||
|
||||
**架构设计说明:**\
|
||||
由于 BullMQ 设计上每个队列只能有一个处理函数,我们通过 `switch` 语句在一个 worker 中区分并路由不同的任务类型,将其分发给相应的执行函数。
|
||||
|
||||
**`crawler/mq/workers`**
|
||||
|
||||
这个目录定义了真正的 BullMQ worker,用于消费对应队列中的任务,并调用具体的执行逻辑。
|
||||
|
||||
**`crawler/mq/task`**
|
||||
|
||||
为了保持 worker 函数的简洁与可维护性,部分复杂逻辑被抽离成独立的“任务(task)”函数,集中放在这个目录中。
|
||||
|
||||
#### `crawler/net` —— 网络请求模块
|
||||
|
||||
该模块用于与外部系统通信,负责所有网络请求的封装和管理。核心是 `net/delegate.ts` 中定义的 `NetworkDelegate` 类。
|
||||
|
||||
**`crawler/net/delegate.ts`**
|
||||
|
||||
这是我们进行大规模请求的主要实现,支持以下功能:
|
||||
|
||||
* 基于任务类型和代理的限速策略
|
||||
* 结合 serverless 架构,根据策略动态切换请求来源 IP
|
||||
|
||||
#### `crawler/utils` —— 工具函数模块
|
||||
|
||||
存放项目中通用的工具函数,供各模块调用。
|
||||
|
||||
#### `crawler/src` —— 主程序入口
|
||||
|
||||
该目录包含 crawler 的启动脚本。我们使用 [concurrently](https://www.npmjs.com/package/concurrently) 同时运行多个任务文件,实现并行处理。
|
@ -2,14 +2,21 @@
|
||||
|
||||
CVSA 使用 [PostgreSQL](https://www.postgresql.org/) 作为数据库。
|
||||
|
||||
CVSA 设计了两个
|
||||
CVSA 设计了两个数据库,`cvsa_main` 和 `cvsa_cred`。前者用于存储可公开的数据,而后者则存储用户相关的个人信息(如登录凭据、账户管理信息等)。
|
||||
|
||||
CVSA 的所有公开数据(不包括用户的个人数据)都存储在名为 `cvsa_main` 的数据库中,该数据库包含以下表:
|
||||
|
||||
- songs:存储歌曲的主要信息
|
||||
- bilibili\_user:存储 Bilibili 用户信息快照
|
||||
- bilibili\_metadata:[分区 30](../../about/scope-of-inclusion.md#vocaloiduatu-fen-qu) 中所有视频的元数据
|
||||
- labelling\_result:包含由我们的 AI 系统 标记的 `all_data` 中视频的标签。
|
||||
- latest\_video\_snapshot:存储视频最新的快照
|
||||
- video\_snapshot:存储视频的快照,包括特定时间下视频的统计信息(播放量、点赞数等)
|
||||
- snapshot\_schedule:视频快照的规划信息,为辅助表
|
||||
* songs:存储歌曲的主要信息。
|
||||
* bilibili\_user:存储哔哩哔哩 UP主 的元信息。
|
||||
* bilibili\_metadata:我们收录的哔哩哔哩所有视频的元数据。
|
||||
* labelling\_result:包含由我们的机器学习模型标记的 `bilibili_metadata` 中视频的标签。
|
||||
* latest\_video\_snapshot:存储视频最新的快照。
|
||||
* video\_snapshot:存储视频的快照,包括特定时间下视频的统计信息(播放量、点赞数等)。
|
||||
* snapshot\_schedule:视频快照的规划信息,为辅助表。
|
||||
|
||||
> **快照:**
|
||||
>
|
||||
> 我们定期采集哔哩哔哩视频的播放量、点赞收藏数等统计信息,在一个给定时间点下某支视频的统计数据即为该视频的一个快照。
|
||||
|
||||
|
||||
|
||||
|
@ -0,0 +1,43 @@
|
||||
# snapshot\_schedule 表
|
||||
|
||||
该表用于记录视频快照任务的调度信息。
|
||||
|
||||
### 字段说明
|
||||
|
||||
| 字段名 | 类型 | 是否为空 | 默认值 | 描述 |
|
||||
| ------------- | -------------------------- | ---- | ------------------------------------- | ------------ |
|
||||
| `id` | `bigint` | 否 | `nextval('snapshot_schedule_id_seq')` | 主键,自增ID |
|
||||
| `aid` | `bigint` | 否 | 无 | 哔哩哔哩视频的 AV 号 |
|
||||
| `type` | `text` | 是 | 无 | 快照类型。 |
|
||||
| `created_at` | `timestamp with time zone` | 否 | `CURRENT_TIMESTAMP` | 记录创建时间 |
|
||||
| `started_at` | `timestamp with time zone` | 是 | 无 | 计划开始拍摄快照的时间 |
|
||||
| `finished_at` | `timestamp with time zone` | 是 | 无 | 快照任务完成的时间 |
|
||||
| `status` | `text` | 否 | `'pending'` | 快照任务状态。 |
|
||||
|
||||
### 字段取值说明(待补充)
|
||||
|
||||
#### `type` 字段
|
||||
|
||||
用于标识快照的类型,例如是定期存档、成就节点、首次收录等。
|
||||
|
||||
* `archive`:每隔一段时间内,对`bilibili_metadata`表中所有视频的定期快照。
|
||||
* `milestone`:监测到曲目即将达成成就(殿堂/传说/神话)时,将会调度该类型的快照任务。
|
||||
* `new`:新观测到歌曲时,会在最长48小时内持续追踪其初始播放量增长趋势。
|
||||
* `normal`:对于所有`songs`表内的曲目,根据播放量增长速度,以动态间隔(6-72小时)定期进行的快照。
|
||||
|
||||
#### `status` 字段
|
||||
|
||||
用于标识快照任务的当前状态。
|
||||
|
||||
* `completed`:快照任务已经完成
|
||||
* `failed`:快照任务因不明原因失败
|
||||
* `no_proxy`:快照任务被执行,但当前没有代理可用于拍摄快照
|
||||
* `pending`:快照任务已经被调度,但尚未开始执行
|
||||
* `processing`:正在获取快照
|
||||
* `timeout`:快照任务在一定时间内没有被响应,因此被丢弃
|
||||
* `bili_error`: 哔哩哔哩返回了一个表示请求失败的状态码
|
||||
|
||||
### 备注
|
||||
|
||||
* 此表中的 `started_at` 字段为计划中的快照开始时间,实际执行时间可能与其略有偏差,具体执行记录可结合其他日志或任务表查看。
|
||||
* 每个 av 号在可以同时存在多个不同类型的快照任务处于 pending 状态,但对于同一种类型,只允许一个pending任务同时存在。
|
27
doc/zh/architecture/machine-learning.md
Normal file
27
doc/zh/architecture/machine-learning.md
Normal file
@ -0,0 +1,27 @@
|
||||
# 机器学习
|
||||
|
||||
中V档案馆的自动化工作流高度依赖机器学习进行信息提取和分类。
|
||||
|
||||
我们目前使用的机器学习系统有:
|
||||
|
||||
#### Filter (代号 Akari)
|
||||
|
||||
位于项目根目录下的 `/ml/filter/`,它是一个分类模型,将来自哔哩哔哩的视频分为以下类别:
|
||||
|
||||
* 0:与中文歌声合成无关
|
||||
* 1:中文歌声合成原创曲
|
||||
* 2:中文歌声合成的翻唱/Remix歌曲
|
||||
|
||||
它接收三个通道的纯文本:视频的标题、简介和标签,使用一个修改后的[model2vec](https://github.com/MinishLab/model2vec)模型(从[jina-embedding-v3](https://huggingface.co/jinaai/jina-embeddings-v3))从三个通道的文本分别产生1024维的嵌入向量作为表征,通过可学习的通道权重进行调整后送入一个隐藏层维度1296的单层全连接网络,最终连接到一个三分类器作为输出。我们使用了一个自定义的损失函数`AdaptiveRecallLoss`,以优化歌声合成作品的 recall(即使得第 0 类的 precision 尽可能高)。
|
||||
|
||||
|
||||
|
||||
此外,我们还有一些尚未投入生产的实验性工作:
|
||||
|
||||
#### Predictor
|
||||
|
||||
位于项目根目录下的 `/ml/pred/`,它预测视频的未来播放量。这是一个回归模型,它将视频的历史播放量趋势、其他上下文信息(例如当前时间)和要预测的未来时间增量作为特征输入,并输出视频播放量从“现在”到指定未来时间点的增量。
|
||||
|
||||
#### 歌词对齐
|
||||
|
||||
位于项目根目录下的 `/ml/lab/`,它分别使用 [MMS wav2vec](https://huggingface.co/docs/transformers/en/model_doc/mms) 和 [Whisper](https://github.com/openai/whisper) 模型进行音素级和行级对齐。这项工作的最初目的是驱动我们另一个项目 [AquaVox](https://github.com/alikia2x/aquavox) 中的实时歌词功能。
|
@ -1 +0,0 @@
|
||||
# 消息队列
|
@ -1 +0,0 @@
|
||||
# LatestVideosQueue 队列
|
@ -14,13 +14,30 @@ layout:
|
||||
|
||||
# 概览
|
||||
|
||||
整个CVSA项目分为三个组件:**crawler**, **frontend** 和 **backend。**
|
||||
CVSA 是一个 [monorepo](https://en.wikipedia.org/wiki/Monorepo) 代码库,使用 [Deno workspace](https://docs.deno.com/runtime/fundamentals/workspaces/) 作为monorepo管理工具,TypeScript 是主要的开发语言。
|
||||
|
||||
### **crawler**
|
||||
**项目结构:**
|
||||
|
||||
位于项目目录`packages/crawler` 下,它负责以下工作:
|
||||
```
|
||||
cvsa
|
||||
├── deno.json
|
||||
├── ml
|
||||
│ ├── filter
|
||||
│ ├── lab
|
||||
│ └── pred
|
||||
├── packages
|
||||
│ ├── backend
|
||||
│ ├── core
|
||||
│ ├── crawler
|
||||
│ └── frontend
|
||||
└── README.md
|
||||
```
|
||||
|
||||
- 抓取新的视频并收录作品
|
||||
- 持续监控视频的播放量等统计信息
|
||||
**其中, `packages` 为 monorepo 主要的根目录,包含 CVSA 主要的程序逻辑**
|
||||
|
||||
整个 crawler 由 BullMQ 消息队列驱动,使用 Redis 和 PostgreSQL 管理状态。
|
||||
* **`backend`**:这个模块包含使用 [Hono](https://hono.dev/) 框架构建的服务器端逻辑。它负责与数据库交互并通过 REST 和 GraphQL API 公开数据,供前端网站、应用和第三方使用。
|
||||
* **`frontend`**:中V档案馆的网站是 [Astro](https://astro.build/) 驱动的。这个模块包含完整的 Astro 前端项目。
|
||||
* **`crawler`**:这个模块包含中V档案馆的自动数据收集系统。它旨在自动发现和收集来自哔哩哔哩的新歌曲数据,以及跟踪相关统计数据(如播放量信息)。
|
||||
* **`core`**:这个模块内包含可重用和通用的代码。
|
||||
|
||||
`ml` 为机器学习相关包,参见
|
||||
|
@ -5,11 +5,14 @@
|
||||
"hono": "jsr:@hono/hono@^4.7.5",
|
||||
"zod": "npm:zod",
|
||||
"yup": "npm:yup",
|
||||
"@core/": "../core/"
|
||||
"@core/": "../core/",
|
||||
"log/": "../core/log/",
|
||||
"@crawler/net/videoInfo": "../crawler/net/getVideoInfo.ts",
|
||||
"ioredis": "npm:ioredis"
|
||||
},
|
||||
"tasks": {
|
||||
"dev": "deno serve --env-file=.env --allow-env --allow-net --watch main.ts",
|
||||
"start": "deno serve --env-file=.env --allow-env --allow-net --host 127.0.0.1 main.ts"
|
||||
"dev": "deno serve --env-file=.env --allow-env --allow-net --allow-read --allow-write --allow-run --watch main.ts",
|
||||
"start": "deno serve --env-file=.env --allow-env --allow-net --allow-read --allow-write --allow-run --host 127.0.0.1 main.ts"
|
||||
},
|
||||
"compilerOptions": {
|
||||
"jsx": "precompile",
|
||||
|
@ -3,6 +3,7 @@ import { dbCredMiddleware, dbMiddleware } from "./database.ts";
|
||||
import { rootHandler } from "./root.ts";
|
||||
import { getSnapshotsHanlder } from "./snapshots.ts";
|
||||
import { registerHandler } from "./register.ts";
|
||||
import { videoInfoHandler } from "./videoInfo.ts";
|
||||
|
||||
export const app = new Hono();
|
||||
|
||||
@ -14,10 +15,12 @@ app.get("/", ...rootHandler);
|
||||
app.get("/video/:id/snapshots", ...getSnapshotsHanlder);
|
||||
app.post("/user", ...registerHandler);
|
||||
|
||||
app.get("/video/:id/info", ...videoInfoHandler);
|
||||
|
||||
const fetch = app.fetch;
|
||||
|
||||
export default {
|
||||
fetch,
|
||||
} satisfies Deno.ServeDefaultExport;
|
||||
|
||||
export const VERSION = "0.3.0";
|
||||
export const VERSION = "0.4.2";
|
||||
|
@ -3,10 +3,10 @@ import { VERSION } from "./main.ts";
|
||||
import { createHandlers } from "./utils.ts";
|
||||
|
||||
export const rootHandler = createHandlers((c) => {
|
||||
let singer: Singer | Singer[] | null = null;
|
||||
let singer: Singer | Singer[];
|
||||
const shouldShowSpecialSinger = Math.random() < 0.016;
|
||||
if (getSingerForBirthday().length !== 0) {
|
||||
singer = getSingerForBirthday();
|
||||
singer = JSON.parse(JSON.stringify(getSingerForBirthday())) as Singer[];
|
||||
for (const s of singer) {
|
||||
delete s.birthday;
|
||||
s.message = `祝${s.name}生日快乐~`;
|
||||
|
@ -12,7 +12,7 @@ const SnapshotQueryParamsSchema = object({
|
||||
reverse: boolean().optional(),
|
||||
});
|
||||
|
||||
const idSchema = mixed().test(
|
||||
export const idSchema = mixed().test(
|
||||
"is-valid-id",
|
||||
'id must be a string starting with "av" followed by digits, or "BV" followed by 10 alphanumeric characters, or a positive integer',
|
||||
async (value) => {
|
||||
|
86
packages/backend/videoInfo.ts
Normal file
86
packages/backend/videoInfo.ts
Normal file
@ -0,0 +1,86 @@
|
||||
import logger from "log/logger.ts";
|
||||
import { Redis } from "ioredis";
|
||||
import { number, ValidationError } from "yup";
|
||||
import { createHandlers } from "./utils.ts";
|
||||
import { getVideoInfo, getVideoInfoByBV } from "@crawler/net/videoInfo";
|
||||
import { idSchema } from "./snapshots.ts";
|
||||
import { NetSchedulerError } from "@core/net/delegate.ts";
|
||||
import type { Context } from "hono";
|
||||
import type { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||
import type { BlankEnv, BlankInput } from "hono/types";
|
||||
import type { VideoInfoData } from "@core/net/bilibili.d.ts";
|
||||
|
||||
|
||||
const redis = new Redis({ maxRetriesPerRequest: null });
|
||||
const CACHE_EXPIRATION_SECONDS = 60;
|
||||
|
||||
type ContextType = Context<BlankEnv, "/video/:id/info", BlankInput>;
|
||||
|
||||
async function insertVideoSnapshot(client: Client, data: VideoInfoData) {
|
||||
const views = data.stat.view;
|
||||
const danmakus = data.stat.danmaku;
|
||||
const replies = data.stat.reply;
|
||||
const likes = data.stat.like;
|
||||
const coins = data.stat.coin;
|
||||
const shares = data.stat.share;
|
||||
const favorites = data.stat.favorite;
|
||||
const aid = data.aid;
|
||||
|
||||
const query: string = `
|
||||
INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||
`;
|
||||
|
||||
await client.queryObject(
|
||||
query,
|
||||
[aid, views, danmakus, replies, likes, coins, shares, favorites],
|
||||
);
|
||||
|
||||
logger.log(`Inserted into snapshot for video ${aid} by videoInfo API.`, "api", "fn:insertVideoSnapshot");
|
||||
}
|
||||
|
||||
|
||||
export const videoInfoHandler = createHandlers(async (c: ContextType) => {
|
||||
const client = c.get("db");
|
||||
try {
|
||||
const id = await idSchema.validate(c.req.param("id"));
|
||||
let videoId: string | number = id as string;
|
||||
if (videoId.startsWith("av")) {
|
||||
videoId = parseInt(videoId.slice(2));
|
||||
} else if (await number().isValid(videoId)) {
|
||||
videoId = parseInt(videoId);
|
||||
}
|
||||
|
||||
const cacheKey = `cvsa:videoInfo:${videoId}`;
|
||||
const cachedData = await redis.get(cacheKey);
|
||||
|
||||
if (cachedData) {
|
||||
return c.json(JSON.parse(cachedData));
|
||||
}
|
||||
|
||||
let result: VideoInfoData | number;
|
||||
if (typeof videoId === "number") {
|
||||
result = await getVideoInfo(videoId, "getVideoInfo");
|
||||
} else {
|
||||
result = await getVideoInfoByBV(videoId, "getVideoInfo");
|
||||
}
|
||||
|
||||
if (typeof result === "number") {
|
||||
return c.json({ message: "Error fetching video info", code: result }, 500);
|
||||
}
|
||||
|
||||
await redis.setex(cacheKey, CACHE_EXPIRATION_SECONDS, JSON.stringify(result));
|
||||
|
||||
await insertVideoSnapshot(client, result);
|
||||
|
||||
return c.json(result);
|
||||
} catch (e) {
|
||||
if (e instanceof ValidationError) {
|
||||
return c.json({ message: "Invalid query parameters", errors: e.errors }, 400);
|
||||
} else if (e instanceof NetSchedulerError) {
|
||||
return c.json({ message: "Error fetching video info", code: e.code }, 500);
|
||||
} else {
|
||||
return c.json({ message: "Unhandled error", error: e }, 500);
|
||||
}
|
||||
}
|
||||
});
|
@ -1,4 +1,12 @@
|
||||
{
|
||||
"name": "@cvsa/core",
|
||||
"exports": "./main.ts"
|
||||
"exports": "./main.ts",
|
||||
"imports": {
|
||||
"ioredis": "npm:ioredis",
|
||||
"log/": "./log/",
|
||||
"db/": "./db/",
|
||||
"$std/": "https://deno.land/std@0.216.0/",
|
||||
"mq/": "./mq/",
|
||||
"chalk": "npm:chalk"
|
||||
}
|
||||
}
|
||||
|
@ -1,5 +1,5 @@
|
||||
import winston, { format, transports } from "npm:winston";
|
||||
import { TransformableInfo } from "npm:logform";
|
||||
import type { TransformableInfo } from "npm:logform";
|
||||
import chalk from "chalk";
|
||||
|
||||
const customFormat = format.printf((info: TransformableInfo) => {
|
@ -1,4 +1,4 @@
|
||||
import { SlidingWindow } from "mq/slidingWindow.ts";
|
||||
import { SlidingWindow } from "./slidingWindow.ts";
|
||||
|
||||
export interface RateLimiterConfig {
|
||||
window: SlidingWindow;
|
@ -1,5 +1,5 @@
|
||||
import logger from "log/logger.ts";
|
||||
import { RateLimiter, RateLimiterConfig } from "mq/rateLimiter.ts";
|
||||
import { RateLimiter, type RateLimiterConfig } from "mq/rateLimiter.ts";
|
||||
import { SlidingWindow } from "mq/slidingWindow.ts";
|
||||
import { redis } from "db/redis.ts";
|
||||
import Redis from "ioredis";
|
||||
@ -69,14 +69,6 @@ class NetworkDelegate {
|
||||
this.proxies[proxyName] = { type, data };
|
||||
}
|
||||
|
||||
private cleanupProxyLimiters(proxyName: string): void {
|
||||
for (const limiterId in this.proxyLimiters) {
|
||||
if (limiterId.startsWith(`proxy-${proxyName}`)) {
|
||||
delete this.proxyLimiters[limiterId];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
addTask(taskName: string, provider: string, proxies: string[] | "all"): void {
|
||||
this.tasks[taskName] = { provider, proxies };
|
||||
}
|
||||
@ -271,6 +263,7 @@ class NetworkDelegate {
|
||||
const out = decoder.decode(output.stdout);
|
||||
const rawData = JSON.parse(out);
|
||||
if (rawData.statusCode !== 200) {
|
||||
// noinspection ExceptionCaughtLocallyJS
|
||||
throw new NetSchedulerError(
|
||||
`Error proxying ${url} to ali-fc region ${region}, code: ${rawData.statusCode}.`,
|
||||
"ALICLOUD_PROXY_ERR",
|
@ -4,7 +4,15 @@ import { SnapshotNumber } from "mq/task/getVideoStats.ts";
|
||||
|
||||
export async function getVideosNearMilestone(client: Client) {
|
||||
const queryResult = await client.queryObject<LatestSnapshotType>(`
|
||||
SELECT ls
|
||||
SELECT ls.*
|
||||
FROM latest_video_snapshot ls
|
||||
RIGHT JOIN songs ON songs.aid = ls.aid
|
||||
WHERE
|
||||
(views >= 50000 AND views < 100000) OR
|
||||
(views >= 900000 AND views < 1000000) OR
|
||||
(views >= 9900000 AND views < 10000000)
|
||||
UNION
|
||||
SELECT ls.*
|
||||
FROM latest_video_snapshot ls
|
||||
WHERE
|
||||
(views >= 90000 AND views < 100000) OR
|
||||
|
@ -2,7 +2,7 @@ import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||
import { SnapshotScheduleType } from "@core/db/schema";
|
||||
import logger from "log/logger.ts";
|
||||
import { MINUTE } from "$std/datetime/constants.ts";
|
||||
import { redis } from "db/redis.ts";
|
||||
import { redis } from "@core/db/redis.ts";
|
||||
import { Redis } from "ioredis";
|
||||
|
||||
const REDIS_KEY = "cvsa:snapshot_window_counts";
|
||||
@ -69,6 +69,14 @@ export async function videoHasActiveSchedule(client: Client, aid: number) {
|
||||
return res.rows.length > 0;
|
||||
}
|
||||
|
||||
export async function videoHasActiveScheduleWithType(client: Client, aid: number, type: string) {
|
||||
const res = await client.queryObject<{ status: string }>(
|
||||
`SELECT status FROM snapshot_schedule WHERE aid = $1 AND (status = 'pending' OR status = 'processing') AND type = $2`,
|
||||
[aid, type],
|
||||
);
|
||||
return res.rows.length > 0;
|
||||
}
|
||||
|
||||
export async function videoHasProcessingSchedule(client: Client, aid: number) {
|
||||
const res = await client.queryObject<{ status: string }>(
|
||||
`SELECT status FROM snapshot_schedule WHERE aid = $1 AND status = 'processing'`,
|
||||
@ -173,7 +181,7 @@ export async function scheduleSnapshot(
|
||||
targetTime: number,
|
||||
force: boolean = false,
|
||||
) {
|
||||
if (await videoHasActiveSchedule(client, aid) && !force) return;
|
||||
if (await videoHasActiveScheduleWithType(client, aid, type) && !force) return;
|
||||
let adjustedTime = new Date(targetTime);
|
||||
if (type !== "milestone" && type !== "new") {
|
||||
adjustedTime = await adjustSnapshotTime(new Date(targetTime), 1000, redis);
|
||||
@ -264,11 +272,17 @@ export async function getSnapshotsInNextSecond(client: Client) {
|
||||
|
||||
export async function getBulkSnapshotsInNextSecond(client: Client) {
|
||||
const query = `
|
||||
SELECT *
|
||||
FROM snapshot_schedule
|
||||
WHERE started_at <= NOW() + INTERVAL '15 seconds' AND status = 'pending' AND type = 'normal'
|
||||
ORDER BY started_at
|
||||
LIMIT 1000;
|
||||
SELECT *
|
||||
FROM snapshot_schedule
|
||||
WHERE (started_at <= NOW() + INTERVAL '15 seconds')
|
||||
AND status = 'pending'
|
||||
AND (type = 'normal' OR type = 'archive')
|
||||
ORDER BY CASE
|
||||
WHEN type = 'normal' THEN 1
|
||||
WHEN type = 'archive' THEN 2
|
||||
END,
|
||||
started_at
|
||||
LIMIT 1000;
|
||||
`;
|
||||
const res = await client.queryObject<SnapshotScheduleType>(query, []);
|
||||
return res.rows;
|
||||
@ -298,3 +312,15 @@ export async function getVideosWithoutActiveSnapshotSchedule(client: Client) {
|
||||
const res = await client.queryObject<{ aid: number }>(query, []);
|
||||
return res.rows.map((r) => Number(r.aid));
|
||||
}
|
||||
|
||||
export async function getAllVideosWithoutActiveSnapshotSchedule(client: Client) {
|
||||
const query: string = `
|
||||
SELECT s.aid
|
||||
FROM bilibili_metadata s
|
||||
LEFT JOIN snapshot_schedule ss ON s.aid = ss.aid AND (ss.status = 'pending' OR ss.status = 'processing')
|
||||
WHERE ss.aid IS NULL
|
||||
`;
|
||||
const res = await client.queryObject<{ aid: number }>(query, []);
|
||||
return res.rows.map((r) => Number(r.aid));
|
||||
}
|
||||
|
||||
|
@ -12,7 +12,7 @@
|
||||
"worker:filter": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/filterWorker.ts",
|
||||
"adder": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts",
|
||||
"bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts",
|
||||
"all": "concurrently 'deno task worker:main' 'deno task adder' 'deno task bullui' 'deno task worker:filter'",
|
||||
"all": "concurrently --restart-tries -1 'deno task worker:main' 'deno task adder' 'deno task bullui' 'deno task worker:filter'",
|
||||
"test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run"
|
||||
},
|
||||
"lint": {
|
||||
@ -27,7 +27,8 @@
|
||||
"bullmq": "npm:bullmq",
|
||||
"mq/": "./mq/",
|
||||
"db/": "./db/",
|
||||
"log/": "./log/",
|
||||
"@core/": "../core/",
|
||||
"log/": "../core/log/",
|
||||
"net/": "./net/",
|
||||
"ml/": "./ml/",
|
||||
"utils/": "./utils/",
|
||||
|
@ -6,7 +6,7 @@ import {
|
||||
bulkScheduleSnapshot,
|
||||
bulkSetSnapshotStatus,
|
||||
findClosestSnapshot,
|
||||
findSnapshotBefore,
|
||||
findSnapshotBefore, getAllVideosWithoutActiveSnapshotSchedule,
|
||||
getBulkSnapshotsInNextSecond,
|
||||
getLatestSnapshot,
|
||||
getSnapshotsInNextSecond,
|
||||
@ -21,13 +21,14 @@ import { HOUR, MINUTE, SECOND, WEEK } from "$std/datetime/constants.ts";
|
||||
import logger from "log/logger.ts";
|
||||
import { SnapshotQueue } from "mq/index.ts";
|
||||
import { insertVideoSnapshot } from "mq/task/getVideoStats.ts";
|
||||
import { NetSchedulerError } from "net/delegate.ts";
|
||||
import { NetSchedulerError } from "@core/net/delegate.ts";
|
||||
import { getBiliVideoStatus, setBiliVideoStatus } from "db/allData.ts";
|
||||
import { truncate } from "utils/truncate.ts";
|
||||
import { lockManager } from "mq/lockManager.ts";
|
||||
import { getSongsPublihsedAt } from "db/songs.ts";
|
||||
import { bulkGetVideoStats } from "net/bulkGetVideoStats.ts";
|
||||
import { getAdjustedShortTermETA } from "../scheduling.ts";
|
||||
import {SnapshotScheduleType} from "@core/db/schema";
|
||||
|
||||
const priorityMap: { [key: string]: number } = {
|
||||
"milestone": 1,
|
||||
@ -52,15 +53,22 @@ export const bulkSnapshotTickWorker = async (_job: Job) => {
|
||||
const filteredAids = await bulkGetVideosWithoutProcessingSchedules(client, aids);
|
||||
if (filteredAids.length === 0) continue;
|
||||
await bulkSetSnapshotStatus(client, filteredAids, "processing");
|
||||
const dataMap: { [key: number]: number } = {};
|
||||
for (const schedule of group) {
|
||||
const id = Number(schedule.id);
|
||||
dataMap[id] = Number(schedule.aid);
|
||||
}
|
||||
const schedulesData = group.map((schedule) => {
|
||||
return {
|
||||
aid: Number(schedule.aid),
|
||||
id: Number(schedule.id),
|
||||
type: schedule.type,
|
||||
created_at: schedule.created_at,
|
||||
started_at: schedule.started_at,
|
||||
finished_at: schedule.finished_at,
|
||||
status: schedule.status
|
||||
}
|
||||
})
|
||||
await SnapshotQueue.add("bulkSnapshotVideo", {
|
||||
map: dataMap,
|
||||
schedules: schedulesData,
|
||||
}, { priority: 3 });
|
||||
}
|
||||
return `OK`
|
||||
} catch (e) {
|
||||
logger.error(e as Error);
|
||||
} finally {
|
||||
@ -74,7 +82,7 @@ export const snapshotTickWorker = async (_job: Job) => {
|
||||
const schedules = await getSnapshotsInNextSecond(client);
|
||||
for (const schedule of schedules) {
|
||||
if (await videoHasProcessingSchedule(client, Number(schedule.aid))) {
|
||||
return `ALREADY_PROCESSING`;
|
||||
continue;
|
||||
}
|
||||
let priority = 3;
|
||||
if (schedule.type && priorityMap[schedule.type]) {
|
||||
@ -83,11 +91,12 @@ export const snapshotTickWorker = async (_job: Job) => {
|
||||
const aid = Number(schedule.aid);
|
||||
await setSnapshotStatus(client, schedule.id, "processing");
|
||||
await SnapshotQueue.add("snapshotVideo", {
|
||||
aid: aid,
|
||||
aid: Number(aid),
|
||||
id: Number(schedule.id),
|
||||
type: schedule.type ?? "normal",
|
||||
}, { priority });
|
||||
}
|
||||
return `OK`;
|
||||
} catch (e) {
|
||||
logger.error(e as Error);
|
||||
} finally {
|
||||
@ -108,14 +117,15 @@ export const collectMilestoneSnapshotsWorker = async (_job: Job) => {
|
||||
for (const video of videos) {
|
||||
const aid = Number(video.aid);
|
||||
const eta = await getAdjustedShortTermETA(client, aid);
|
||||
if (eta > 72) continue;
|
||||
if (eta > 144) continue;
|
||||
const now = Date.now();
|
||||
const scheduledNextSnapshotDelay = eta * HOUR;
|
||||
const maxInterval = 4 * HOUR;
|
||||
const minInterval = 1 * SECOND;
|
||||
const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval);
|
||||
const targetTime = now + delay;
|
||||
await scheduleSnapshot(client, aid, "milestone", targetTime, true);
|
||||
await scheduleSnapshot(client, aid, "milestone", targetTime);
|
||||
logger.log(`Scheduled milestone snapshot for aid ${aid} in ${(delay / MINUTE).toFixed(2)} mins.`, "mq");
|
||||
}
|
||||
} catch (e) {
|
||||
logger.error(e as Error, "mq", "fn:collectMilestoneSnapshotsWorker");
|
||||
@ -143,6 +153,38 @@ const getRegularSnapshotInterval = async (client: Client, aid: number) => {
|
||||
return 6;
|
||||
};
|
||||
|
||||
export const archiveSnapshotsWorker = async (_job: Job) => {
|
||||
const client = await db.connect();
|
||||
const startedAt = Date.now();
|
||||
if (await lockManager.isLocked("dispatchArchiveSnapshots")) {
|
||||
logger.log("dispatchArchiveSnapshots is already running", "mq");
|
||||
client.release();
|
||||
return;
|
||||
}
|
||||
await lockManager.acquireLock("dispatchArchiveSnapshots", 30 * 60);
|
||||
try {
|
||||
const aids = await getAllVideosWithoutActiveSnapshotSchedule(client);
|
||||
for (const rawAid of aids) {
|
||||
const aid = Number(rawAid);
|
||||
const latestSnapshot = await getLatestVideoSnapshot(client, aid);
|
||||
const now = Date.now();
|
||||
const lastSnapshotedAt = latestSnapshot?.time ?? now;
|
||||
const interval = 168;
|
||||
logger.log(`Scheduled archive snapshot for aid ${aid} in ${interval} hours.`, "mq", "fn:archiveSnapshotsWorker");
|
||||
const targetTime = lastSnapshotedAt + interval * HOUR;
|
||||
await scheduleSnapshot(client, aid, "archive", targetTime);
|
||||
if (now - startedAt > 250 * MINUTE) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
logger.error(e as Error, "mq", "fn:archiveSnapshotsWorker");
|
||||
} finally {
|
||||
await lockManager.releaseLock("dispatchArchiveSnapshots");
|
||||
client.release();
|
||||
}
|
||||
};
|
||||
|
||||
export const regularSnapshotsWorker = async (_job: Job) => {
|
||||
const client = await db.connect();
|
||||
const startedAt = Date.now();
|
||||
@ -176,13 +218,14 @@ export const regularSnapshotsWorker = async (_job: Job) => {
|
||||
};
|
||||
|
||||
export const takeBulkSnapshotForVideosWorker = async (job: Job) => {
|
||||
const dataMap: { [key: number]: number } = job.data.map;
|
||||
const ids = Object.keys(dataMap).map((id) => Number(id));
|
||||
const schedules: SnapshotScheduleType[] = job.data.schedules;
|
||||
const ids = schedules.map((schedule) => Number(schedule.id));
|
||||
const aidsToFetch: number[] = [];
|
||||
const client = await db.connect();
|
||||
try {
|
||||
for (const id of ids) {
|
||||
const aid = Number(dataMap[id]);
|
||||
for (const schedule of schedules) {
|
||||
const aid = Number(schedule.aid);
|
||||
const id = Number(schedule.id);
|
||||
const exists = await snapshotScheduleExists(client, id);
|
||||
if (!exists) {
|
||||
continue;
|
||||
@ -217,7 +260,11 @@ export const takeBulkSnapshotForVideosWorker = async (job: Job) => {
|
||||
logger.log(`Taken snapshot for video ${aid} in bulk.`, "net", "fn:takeBulkSnapshotForVideosWorker");
|
||||
}
|
||||
await bulkSetSnapshotStatus(client, ids, "completed");
|
||||
for (const aid of aidsToFetch) {
|
||||
|
||||
for (const schedule of schedules) {
|
||||
const aid = Number(schedule.aid);
|
||||
const type = schedule.type;
|
||||
if (type == 'archive') continue;
|
||||
const interval = await getRegularSnapshotInterval(client, aid);
|
||||
logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq");
|
||||
await scheduleSnapshot(client, aid, "normal", Date.now() + interval * HOUR);
|
||||
@ -230,8 +277,8 @@ export const takeBulkSnapshotForVideosWorker = async (job: Job) => {
|
||||
"mq",
|
||||
"fn:takeBulkSnapshotForVideosWorker",
|
||||
);
|
||||
await bulkSetSnapshotStatus(client, ids, "completed");
|
||||
await bulkScheduleSnapshot(client, aidsToFetch, "normal", Date.now() + 2 * MINUTE);
|
||||
await bulkSetSnapshotStatus(client, ids, "no_proxy");
|
||||
await bulkScheduleSnapshot(client, aidsToFetch, "normal", Date.now() + 20 * MINUTE * Math.random());
|
||||
return;
|
||||
}
|
||||
logger.error(e as Error, "mq", "fn:takeBulkSnapshotForVideosWorker");
|
||||
@ -296,10 +343,11 @@ export const takeSnapshotForVideoWorker = async (job: Job) => {
|
||||
}
|
||||
if (type !== "milestone") return `DONE`;
|
||||
const eta = await getAdjustedShortTermETA(client, aid);
|
||||
if (eta > 72) return "ETA_TOO_LONG";
|
||||
if (eta > 144) return "ETA_TOO_LONG";
|
||||
const now = Date.now();
|
||||
const targetTime = now + eta * HOUR;
|
||||
await scheduleSnapshot(client, aid, type, targetTime);
|
||||
await setSnapshotStatus(client, id, "completed");
|
||||
return `DONE`;
|
||||
} catch (e) {
|
||||
if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") {
|
||||
@ -308,7 +356,7 @@ export const takeSnapshotForVideoWorker = async (job: Job) => {
|
||||
"mq",
|
||||
"fn:takeSnapshotForVideoWorker",
|
||||
);
|
||||
await setSnapshotStatus(client, id, "completed");
|
||||
await setSnapshotStatus(client, id, "no_proxy");
|
||||
await scheduleSnapshot(client, aid, type, Date.now() + retryInterval);
|
||||
return;
|
||||
}
|
||||
|
@ -1,9 +1,9 @@
|
||||
import { MINUTE, SECOND } from "$std/datetime/constants.ts";
|
||||
import { HOUR, MINUTE, SECOND } from "$std/datetime/constants.ts";
|
||||
import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "mq/index.ts";
|
||||
import logger from "log/logger.ts";
|
||||
import { initSnapshotWindowCounts } from "db/snapshotSchedule.ts";
|
||||
import { db } from "db/init.ts";
|
||||
import { redis } from "db/redis.ts";
|
||||
import { redis } from "@core/db/redis.ts";
|
||||
|
||||
export async function initMQ() {
|
||||
const client = await db.connect();
|
||||
@ -30,8 +30,8 @@ export async function initMQ() {
|
||||
immediately: true,
|
||||
}, {
|
||||
opts: {
|
||||
removeOnComplete: 1,
|
||||
removeOnFail: 1,
|
||||
removeOnComplete: 300,
|
||||
removeOnFail: 600,
|
||||
},
|
||||
});
|
||||
|
||||
@ -40,8 +40,8 @@ export async function initMQ() {
|
||||
immediately: true,
|
||||
}, {
|
||||
opts: {
|
||||
removeOnComplete: 1,
|
||||
removeOnFail: 1,
|
||||
removeOnComplete: 60,
|
||||
removeOnFail: 600,
|
||||
},
|
||||
});
|
||||
|
||||
@ -55,6 +55,11 @@ export async function initMQ() {
|
||||
immediately: true,
|
||||
});
|
||||
|
||||
await SnapshotQueue.upsertJobScheduler("dispatchArchiveSnapshots", {
|
||||
every: 6 * HOUR,
|
||||
immediately: true,
|
||||
});
|
||||
|
||||
await SnapshotQueue.upsertJobScheduler("scheduleCleanup", {
|
||||
every: 30 * MINUTE,
|
||||
immediately: true,
|
||||
|
@ -1,5 +1,5 @@
|
||||
import { Redis } from "ioredis";
|
||||
import { redis } from "db/redis.ts";
|
||||
import { redis } from "../../core/db/redis.ts";
|
||||
|
||||
class LockManager {
|
||||
private redis: Redis;
|
||||
|
@ -1,9 +1,3 @@
|
||||
/*
|
||||
* Returns the minimum ETA in hours for the next snapshot
|
||||
* @param client - Postgres client
|
||||
* @param aid - aid of the video
|
||||
* @returns ETA in hours
|
||||
*/
|
||||
import { findClosestSnapshot, getLatestSnapshot, hasAtLeast2Snapshots } from "db/snapshotSchedule.ts";
|
||||
import { truncate } from "utils/truncate.ts";
|
||||
import { closetMilestone } from "./exec/snapshotTick.ts";
|
||||
@ -12,6 +6,12 @@ import { HOUR, MINUTE } from "$std/datetime/constants.ts";
|
||||
|
||||
const log = (value: number, base: number = 10) => Math.log(value) / Math.log(base);
|
||||
|
||||
/*
|
||||
* Returns the minimum ETA in hours for the next snapshot
|
||||
* @param client - Postgres client
|
||||
* @param aid - aid of the video
|
||||
* @returns ETA in hours
|
||||
*/
|
||||
export const getAdjustedShortTermETA = async (client: Client, aid: number) => {
|
||||
const latestSnapshot = await getLatestSnapshot(client, aid);
|
||||
// Immediately dispatch a snapshot if there is no snapshot yet
|
||||
|
@ -1,5 +1,5 @@
|
||||
import networkDelegate from "./delegate.ts";
|
||||
import { MediaListInfoData, MediaListInfoResponse } from "net/bilibili.d.ts";
|
||||
import networkDelegate from "@core/net/delegate.ts";
|
||||
import { MediaListInfoData, MediaListInfoResponse } from "@core/net/bilibili.d.ts";
|
||||
import logger from "log/logger.ts";
|
||||
|
||||
/*
|
||||
|
@ -1,6 +1,6 @@
|
||||
import { VideoListResponse } from "net/bilibili.d.ts";
|
||||
import { VideoListResponse } from "@core/net/bilibili.d.ts";
|
||||
import logger from "log/logger.ts";
|
||||
import networkDelegate from "./delegate.ts";
|
||||
import networkDelegate from "@core/net/delegate.ts";
|
||||
|
||||
export async function getLatestVideoAids(page: number = 1, pageSize: number = 10): Promise<number[]> {
|
||||
const startFrom = 1 + pageSize * (page - 1);
|
||||
|
@ -1,5 +1,5 @@
|
||||
import networkDelegate from "./delegate.ts";
|
||||
import { VideoDetailsData, VideoDetailsResponse } from "net/bilibili.d.ts";
|
||||
import networkDelegate from "@core/net/delegate.ts";
|
||||
import { VideoDetailsData, VideoDetailsResponse } from "@core/net/bilibili.d.ts";
|
||||
import logger from "log/logger.ts";
|
||||
|
||||
export async function getVideoDetails(aid: number): Promise<VideoDetailsData | null> {
|
||||
|
@ -1,5 +1,5 @@
|
||||
import networkDelegate from "./delegate.ts";
|
||||
import { VideoInfoData, VideoInfoResponse } from "net/bilibili.d.ts";
|
||||
import networkDelegate from "@core/net/delegate.ts";
|
||||
import { VideoInfoData, VideoInfoResponse } from "@core/net/bilibili.d.ts";
|
||||
import logger from "log/logger.ts";
|
||||
|
||||
/*
|
||||
@ -25,3 +25,27 @@ export async function getVideoInfo(aid: number, task: string): Promise<VideoInfo
|
||||
}
|
||||
return data.data;
|
||||
}
|
||||
|
||||
/*
|
||||
* Fetch video metadata from bilibili API by BVID
|
||||
* @param {string} bvid - The video's BVID
|
||||
* @param {string} task - The task name used in scheduler. It can be one of the following:
|
||||
* - snapshotVideo
|
||||
* - getVideoInfo
|
||||
* - snapshotMilestoneVideo
|
||||
* @returns {Promise<VideoInfoData | number>} VideoInfoData or the error code returned by bilibili API
|
||||
* @throws {NetSchedulerError} - The error will be thrown in following cases:
|
||||
* - No proxy is available currently: with error code `NO_PROXY_AVAILABLE`
|
||||
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
|
||||
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
|
||||
*/
|
||||
export async function getVideoInfoByBV(bvid: string, task: string): Promise<VideoInfoData | number> {
|
||||
const url = `https://api.bilibili.com/x/web-interface/view?bvid=${bvid}`;
|
||||
const data = await networkDelegate.request<VideoInfoResponse>(url, task);
|
||||
const errMessage = `Error fetching metadata for ${bvid}:`;
|
||||
if (data.code !== 0) {
|
||||
logger.error(errMessage + data.code + "-" + data.message, "net", "fn:getVideoInfoByBV");
|
||||
return data.code;
|
||||
}
|
||||
return data.data;
|
||||
}
|
@ -1,5 +1,5 @@
|
||||
import { ConnectionOptions, Job, Worker } from "bullmq";
|
||||
import { redis } from "db/redis.ts";
|
||||
import { redis } from "../../core/db/redis.ts";
|
||||
import logger from "log/logger.ts";
|
||||
import { classifyVideosWorker, classifyVideoWorker } from "mq/exec/classifyVideo.ts";
|
||||
import { WorkerError } from "mq/schema.ts";
|
||||
|
@ -1,11 +1,12 @@
|
||||
import { ConnectionOptions, Job, Worker } from "bullmq";
|
||||
import { collectSongsWorker, getLatestVideosWorker } from "mq/executors.ts";
|
||||
import { redis } from "db/redis.ts";
|
||||
import { redis } from "@core/db/redis.ts";
|
||||
import logger from "log/logger.ts";
|
||||
import { lockManager } from "mq/lockManager.ts";
|
||||
import { WorkerError } from "mq/schema.ts";
|
||||
import { getVideoInfoWorker } from "mq/exec/getLatestVideos.ts";
|
||||
import {
|
||||
archiveSnapshotsWorker,
|
||||
bulkSnapshotTickWorker,
|
||||
collectMilestoneSnapshotsWorker,
|
||||
regularSnapshotsWorker,
|
||||
@ -15,8 +16,21 @@ import {
|
||||
takeSnapshotForVideoWorker,
|
||||
} from "mq/exec/snapshotTick.ts";
|
||||
|
||||
const releaseLockForJob = async (name: string) => {
|
||||
await lockManager.releaseLock(name);
|
||||
logger.log(`Released lock: ${name}`, "mq");
|
||||
}
|
||||
|
||||
const releaseAllLocks = async () => {
|
||||
const locks = ["dispatchRegularSnapshots", "dispatchArchiveSnapshots", "getLatestVideos"];
|
||||
for (const lock of locks) {
|
||||
await releaseLockForJob(lock);
|
||||
}
|
||||
}
|
||||
|
||||
Deno.addSignalListener("SIGINT", async () => {
|
||||
logger.log("SIGINT Received: Shutting down workers...", "mq");
|
||||
await releaseAllLocks();
|
||||
await latestVideoWorker.close(true);
|
||||
await snapshotWorker.close(true);
|
||||
Deno.exit();
|
||||
@ -24,6 +38,7 @@ Deno.addSignalListener("SIGINT", async () => {
|
||||
|
||||
Deno.addSignalListener("SIGTERM", async () => {
|
||||
logger.log("SIGTERM Received: Shutting down workers...", "mq");
|
||||
await releaseAllLocks();
|
||||
await latestVideoWorker.close(true);
|
||||
await snapshotWorker.close(true);
|
||||
Deno.exit();
|
||||
@ -34,14 +49,11 @@ const latestVideoWorker = new Worker(
|
||||
async (job: Job) => {
|
||||
switch (job.name) {
|
||||
case "getLatestVideos":
|
||||
await getLatestVideosWorker(job);
|
||||
break;
|
||||
return await getLatestVideosWorker(job);
|
||||
case "getVideoInfo":
|
||||
await getVideoInfoWorker(job);
|
||||
break;
|
||||
return await getVideoInfoWorker(job);
|
||||
case "collectSongs":
|
||||
await collectSongsWorker(job);
|
||||
break;
|
||||
return await collectSongsWorker(job);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
@ -63,35 +75,26 @@ latestVideoWorker.on("error", (err) => {
|
||||
logger.error(e.rawError, e.service, e.codePath);
|
||||
});
|
||||
|
||||
latestVideoWorker.on("closed", async () => {
|
||||
await lockManager.releaseLock("getLatestVideos");
|
||||
});
|
||||
|
||||
const snapshotWorker = new Worker(
|
||||
"snapshot",
|
||||
async (job: Job) => {
|
||||
switch (job.name) {
|
||||
case "snapshotVideo":
|
||||
await takeSnapshotForVideoWorker(job);
|
||||
break;
|
||||
return await takeSnapshotForVideoWorker(job);
|
||||
case "snapshotTick":
|
||||
await snapshotTickWorker(job);
|
||||
break;
|
||||
return await snapshotTickWorker(job);
|
||||
case "collectMilestoneSnapshots":
|
||||
await collectMilestoneSnapshotsWorker(job);
|
||||
break;
|
||||
return await collectMilestoneSnapshotsWorker(job);
|
||||
case "dispatchRegularSnapshots":
|
||||
await regularSnapshotsWorker(job);
|
||||
break;
|
||||
return await regularSnapshotsWorker(job);
|
||||
case "scheduleCleanup":
|
||||
await scheduleCleanupWorker(job);
|
||||
break;
|
||||
return await scheduleCleanupWorker(job);
|
||||
case "bulkSnapshotVideo":
|
||||
await takeBulkSnapshotForVideosWorker(job);
|
||||
break;
|
||||
return await takeBulkSnapshotForVideosWorker(job);
|
||||
case "bulkSnapshotTick":
|
||||
await bulkSnapshotTickWorker(job);
|
||||
break;
|
||||
return await bulkSnapshotTickWorker(job);
|
||||
case "dispatchArchiveSnapshots":
|
||||
return await archiveSnapshotsWorker(job);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
@ -102,8 +105,4 @@ const snapshotWorker = new Worker(
|
||||
snapshotWorker.on("error", (err) => {
|
||||
const e = err as WorkerError;
|
||||
logger.error(e.rawError, e.service, e.codePath);
|
||||
});
|
||||
|
||||
snapshotWorker.on("closed", async () => {
|
||||
await lockManager.releaseLock("dispatchRegularSnapshots");
|
||||
});
|
||||
});
|
1012
packages/frontend/bun.lock
Normal file
1012
packages/frontend/bun.lock
Normal file
File diff suppressed because it is too large
Load Diff
@ -1,25 +1,28 @@
|
||||
{
|
||||
"name": "frontend",
|
||||
"type": "module",
|
||||
"version": "0.0.1",
|
||||
"scripts": {
|
||||
"dev": "astro dev",
|
||||
"build": "astro build",
|
||||
"preview": "astro preview",
|
||||
"astro": "astro"
|
||||
},
|
||||
"dependencies": {
|
||||
"@astrojs/tailwind": "^6.0.2",
|
||||
"argon2id": "^1.0.1",
|
||||
"astro": "^5.5.5",
|
||||
"autoprefixer": "^10.4.21",
|
||||
"pg": "^8.11.11",
|
||||
"postcss": "^8.5.3",
|
||||
"tailwindcss": "^3.0.24",
|
||||
"vite-tsconfig-paths": "^5.1.4"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@rollup/plugin-wasm": "^6.2.2",
|
||||
"@types/pg": "^8.11.11"
|
||||
}
|
||||
"name": "frontend",
|
||||
"type": "module",
|
||||
"version": "0.0.1",
|
||||
"scripts": {
|
||||
"dev": "astro dev",
|
||||
"build": "astro build",
|
||||
"preview": "astro preview",
|
||||
"astro": "astro"
|
||||
},
|
||||
"dependencies": {
|
||||
"@astrojs/node": "^9.1.3",
|
||||
"@astrojs/svelte": "^7.0.9",
|
||||
"@astrojs/tailwind": "^6.0.2",
|
||||
"argon2id": "^1.0.1",
|
||||
"astro": "^5.5.5",
|
||||
"autoprefixer": "^10.4.21",
|
||||
"pg": "^8.11.11",
|
||||
"postcss": "^8.5.3",
|
||||
"svelte": "^5.25.7",
|
||||
"tailwindcss": "^3.0.24",
|
||||
"vite-tsconfig-paths": "^5.1.4"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@rollup/plugin-wasm": "^6.2.2",
|
||||
"@types/pg": "^8.11.11"
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user