Compare commits

..

1 Commits

Author SHA1 Message Date
01e076d745
fix: incorrect condition for filter the tags 2025-03-23 13:44:03 +08:00
493 changed files with 2666 additions and 25028 deletions

View File

@ -1,96 +0,0 @@
built/*
tests/cases/rwc/*
tests/cases/perf/*
!tests/cases/webharness/compilerToString.js
test-args.txt
~*.docx
\#*\#
.\#*
tests/baselines/local/*
tests/baselines/local.old/*
tests/services/baselines/local/*
tests/baselines/prototyping/local/*
tests/baselines/rwc/*
tests/baselines/reference/projectOutput/*
tests/baselines/local/projectOutput/*
tests/baselines/reference/testresults.tap
tests/baselines/symlinks/*
tests/services/baselines/prototyping/local/*
tests/services/browser/typescriptServices.js
src/harness/*.js
src/compiler/diagnosticInformationMap.generated.ts
src/compiler/diagnosticMessages.generated.json
src/parser/diagnosticInformationMap.generated.ts
src/parser/diagnosticMessages.generated.json
rwc-report.html
*.swp
build.json
*.actual
tests/webTestServer.js
tests/webTestServer.js.map
tests/webhost/*.d.ts
tests/webhost/webtsc.js
tests/cases/**/*.js
tests/cases/**/*.js.map
*.config
scripts/eslint/built/
scripts/debug.bat
scripts/run.bat
scripts/**/*.js
scripts/**/*.js.map
coverage/
internal/
**/.DS_Store
.settings
**/.vs
**/.vscode/*
!**/.vscode/tasks.json
!**/.vscode/settings.template.json
!**/.vscode/launch.template.json
!**/.vscode/extensions.json
!tests/cases/projects/projectOption/**/node_modules
!tests/cases/projects/NodeModulesSearch/**/*
!tests/baselines/reference/project/nodeModules*/**/*
yarn.lock
yarn-error.log
.parallelperf.*
tests/baselines/reference/dt
.failed-tests
TEST-results.xml
package-lock.json
.eslintcache
*v8.log
# dotenv environment variable files
.env
.env.development.local
.env.test.local
.env.production.local
.env.local
# npm dependencies
node_modules/
# project specific
logs/
__pycache__
ml/filter/runs
ml/pred/runs
ml/pred/checkpoints
ml/pred/observed
ml/data/
ml/filter/checkpoints
scripts
model/
.astro
# Database
*.dump
*.db
*.sqlite
*.sqlite3
data/
docker-compose.yml

1
.gitattributes vendored
View File

@ -1 +0,0 @@
*.woff2 filter=lfs diff=lfs merge=lfs -text

95
.gitignore vendored
View File

@ -1,44 +1,87 @@
built/*
tests/cases/rwc/*
tests/cases/perf/*
!tests/cases/webharness/compilerToString.js
test-args.txt
~*.docx
\#*\#
.\#*
tests/baselines/local/*
tests/baselines/local.old/*
tests/services/baselines/local/*
tests/baselines/prototyping/local/*
tests/baselines/rwc/*
tests/baselines/reference/projectOutput/*
tests/baselines/local/projectOutput/*
tests/baselines/reference/testresults.tap
tests/baselines/symlinks/*
tests/services/baselines/prototyping/local/*
tests/services/browser/typescriptServices.js
src/harness/*.js
src/compiler/diagnosticInformationMap.generated.ts
src/compiler/diagnosticMessages.generated.json
src/parser/diagnosticInformationMap.generated.ts
src/parser/diagnosticMessages.generated.json
rwc-report.html
*.swp
build.json
*.actual
tests/webTestServer.js
tests/webTestServer.js.map
tests/webhost/*.d.ts
tests/webhost/webtsc.js
tests/cases/**/*.js
tests/cases/**/*.js.map
*.config
scripts/eslint/built/
scripts/debug.bat
scripts/run.bat
scripts/**/*.js
scripts/**/*.js.map
coverage/
internal/
**/.DS_Store
.settings
**/.vs
**/.vscode/*
!**/.vscode/tasks.json
!**/.vscode/settings.template.json
!**/.vscode/launch.template.json
!**/.vscode/extensions.json
!tests/cases/projects/projectOption/**/node_modules
!tests/cases/projects/NodeModulesSearch/**/*
!tests/baselines/reference/project/nodeModules*/**/*
.idea
yarn.lock
yarn-error.log
.parallelperf.*
tests/baselines/reference/dt
.failed-tests
TEST-results.xml
package-lock.json
.eslintcache
*v8.log
# dotenv environment variable files
.env
.env.*
.env.development.local
.env.test.local
.env.production.local
.env.local
# Fresh build directory
_fresh/
# npm dependencies
node_modules/
# project specific
logs/
__pycache__
ml/filter/runs
ml/pred/runs
ml/pred/checkpoints
ml/pred/observed
ml/data/
ml/filter/checkpoints
filter/runs
pred/runs
pred/checkpoints
data/
filter/checkpoints
scripts
model/
.astro
# Database
*.dump
*.db
*.sqlite
*.sqlite3
data/
redis/
# Build
dist/
build/
docker-compose.yml
ucaptcha-config.yaml

10
.idea/.gitignore vendored
View File

@ -1,10 +0,0 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml
dataSources.xml
MarsCodeWorkspaceAppSettings.xml

View File

@ -1,6 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="BunSettings">
<option name="bunPath" value="$USER_HOME$/.bun/bin/bun" />
</component>
</project>

View File

@ -1,55 +0,0 @@
<component name="ProjectCodeStyleConfiguration">
<code_scheme name="Project" version="173">
<option name="LINE_SEPARATOR" value="&#10;" />
<HTMLCodeStyleSettings>
<option name="HTML_SPACE_INSIDE_EMPTY_TAG" value="true" />
</HTMLCodeStyleSettings>
<JSCodeStyleSettings version="0">
<option name="FORCE_SEMICOLON_STYLE" value="true" />
<option name="SPACE_BEFORE_FUNCTION_LEFT_PARENTH" value="false" />
<option name="FORCE_QUOTE_STYlE" value="true" />
<option name="ENFORCE_TRAILING_COMMA" value="Remove" />
<option name="SPACES_WITHIN_OBJECT_LITERAL_BRACES" value="true" />
<option name="SPACES_WITHIN_IMPORTS" value="true" />
</JSCodeStyleSettings>
<TypeScriptCodeStyleSettings version="0">
<option name="FORCE_SEMICOLON_STYLE" value="true" />
<option name="SPACE_BEFORE_FUNCTION_LEFT_PARENTH" value="false" />
<option name="FORCE_QUOTE_STYlE" value="true" />
<option name="ENFORCE_TRAILING_COMMA" value="Remove" />
<option name="SPACES_WITHIN_OBJECT_LITERAL_BRACES" value="true" />
<option name="SPACES_WITHIN_IMPORTS" value="true" />
</TypeScriptCodeStyleSettings>
<VueCodeStyleSettings>
<option name="INTERPOLATION_NEW_LINE_AFTER_START_DELIMITER" value="false" />
<option name="INTERPOLATION_NEW_LINE_BEFORE_END_DELIMITER" value="false" />
</VueCodeStyleSettings>
<codeStyleSettings language="HTML">
<option name="SOFT_MARGINS" value="120" />
<indentOptions>
<option name="CONTINUATION_INDENT_SIZE" value="4" />
<option name="USE_TAB_CHARACTER" value="true" />
</indentOptions>
</codeStyleSettings>
<codeStyleSettings language="JavaScript">
<option name="SOFT_MARGINS" value="120" />
<indentOptions>
<option name="USE_TAB_CHARACTER" value="true" />
</indentOptions>
</codeStyleSettings>
<codeStyleSettings language="TypeScript">
<option name="SOFT_MARGINS" value="120" />
<indentOptions>
<option name="USE_TAB_CHARACTER" value="true" />
</indentOptions>
</codeStyleSettings>
<codeStyleSettings language="Vue">
<option name="SOFT_MARGINS" value="120" />
<indentOptions>
<option name="INDENT_SIZE" value="4" />
<option name="TAB_SIZE" value="4" />
<option name="USE_TAB_CHARACTER" value="true" />
</indentOptions>
</codeStyleSettings>
</code_scheme>
</component>

View File

@ -1,5 +0,0 @@
<component name="ProjectCodeStyleConfiguration">
<state>
<option name="USE_PER_PROJECT_SETTINGS" value="true" />
</state>
</component>

View File

@ -1,37 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="WEB_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/.tmp" />
<excludeFolder url="file://$MODULE_DIR$/temp" />
<excludeFolder url="file://$MODULE_DIR$/tmp" />
<excludeFolder url="file://$MODULE_DIR$/ml/data" />
<excludeFolder url="file://$MODULE_DIR$/doc" />
<excludeFolder url="file://$MODULE_DIR$/ml/filter/checkpoints" />
<excludeFolder url="file://$MODULE_DIR$/ml/filter/runs" />
<excludeFolder url="file://$MODULE_DIR$/ml/lab/data" />
<excludeFolder url="file://$MODULE_DIR$/ml/lab/temp" />
<excludeFolder url="file://$MODULE_DIR$/logs" />
<excludeFolder url="file://$MODULE_DIR$/model" />
<excludeFolder url="file://$MODULE_DIR$/src/db" />
<excludeFolder url="file://$MODULE_DIR$/.idea" />
<excludeFolder url="file://$MODULE_DIR$/.vscode" />
<excludeFolder url="file://$MODULE_DIR$/.zed" />
<excludeFolder url="file://$MODULE_DIR$/packages/frontend/.astro" />
<excludeFolder url="file://$MODULE_DIR$/scripts" />
<excludeFolder url="file://$MODULE_DIR$/.astro" />
<excludeFolder url="file://$MODULE_DIR$/ml/pred/checkpoints" />
<excludeFolder url="file://$MODULE_DIR$/ml/pred/observed" />
<excludeFolder url="file://$MODULE_DIR$/ml/pred/runs" />
<excludeFolder url="file://$MODULE_DIR$/packages/backend/logs" />
<excludeFolder url="file://$MODULE_DIR$/packages/core/net/logs" />
<excludeFolder url="file://$MODULE_DIR$/packages/crawler/logs" />
<excludeFolder url="file://$MODULE_DIR$/data" />
<excludeFolder url="file://$MODULE_DIR$/redis" />
<excludeFolder url="file://$MODULE_DIR$/ml" />
<excludeFolder url="file://$MODULE_DIR$/src" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

View File

@ -1,7 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="DenoSettings">
<option name="denoInit" value="{&#10; &quot;enable&quot;: true,&#10; &quot;lint&quot;: true,&#10; &quot;unstable&quot;: true,&#10; &quot;importMap&quot;: &quot;import_map.json&quot;,&#10; &quot;config&quot;: &quot;deno.json&quot;,&#10; &quot;fmt&quot;: {&#10; &quot;useTabs&quot;: true,&#10; &quot;lineWidth&quot;: 120,&#10; &quot;indentWidth&quot;: 4,&#10; &quot;semiColons&quot;: true,&#10; &quot;proseWrap&quot;: &quot;always&quot;&#10; }&#10;}" />
<option name="useDenoValue" value="DISABLE" />
</component>
</project>

View File

@ -1,36 +0,0 @@
<component name="InspectionProjectProfileManager">
<profile version="1.0">
<option name="myName" value="Project Default" />
<option name="scopesOrder">
<list>
<option value="Astro" />
<option value="All Changed Files" />
<option value="Open Files" />
<option value="Project Files" />
<option value="Scratches and Consoles" />
<option value="Tests" />
</list>
</option>
<inspection_tool class="ES6UnusedImports" enabled="true" level="WARNING" enabled_by_default="true">
<scope name="Astro" level="INFORMATION" enabled="false" editorAttributes="INFORMATION_ATTRIBUTES" />
</inspection_tool>
<inspection_tool class="GrazieInspection" enabled="false" level="GRAMMAR_ERROR" enabled_by_default="false" />
<inspection_tool class="HtmlUnknownAttribute" enabled="true" level="WARNING" enabled_by_default="true">
<option name="myValues">
<value>
<list size="1">
<item index="0" class="java.lang.String" itemvalue="autocorrect" />
</list>
</value>
</option>
<option name="myCustomValuesEnabled" value="true" />
</inspection_tool>
<inspection_tool class="LanguageDetectionInspection" enabled="false" level="WARNING" enabled_by_default="false" />
<inspection_tool class="SpellCheckingInspection" enabled="false" level="TYPO" enabled_by_default="false">
<option name="processCode" value="true" />
<option name="processLiterals" value="true" />
<option name="processComments" value="true" />
</inspection_tool>
<inspection_tool class="TsLint" enabled="true" level="WARNING" enabled_by_default="true" />
</profile>
</component>

View File

@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/cvsa.iml" filepath="$PROJECT_DIR$/.idea/cvsa.iml" />
</modules>
</component>
</project>

View File

@ -1,3 +0,0 @@
<component name="DependencyValidationManager">
<scope name="Astro" pattern="file:*.astro" />
</component>

View File

@ -1,6 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="SqlDialectMappings">
<file url="PROJECT" dialect="PostgreSQL" />
</component>
</project>

View File

@ -1,6 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="" vcs="Git" />
</component>
</project>

View File

@ -1,8 +0,0 @@
{
"useTabs": true,
"tabWidth": 4,
"trailingComma": "none",
"singleQuote": false,
"printWidth": 120,
"endOfLine": "lf"
}

View File

@ -3,9 +3,3 @@ data
*.svg
*.txt
*.md
*config*
Inter.css
MiSans.css
*.yaml
*.yml
*.mdx

6
.vscode/extensions.json vendored Normal file
View File

@ -0,0 +1,6 @@
{
"recommendations": [
"denoland.vscode-deno",
"bradlc.vscode-tailwindcss"
]
}

35
.zed/settings.json Normal file
View File

@ -0,0 +1,35 @@
// Folder-specific settings
//
// For a full list of overridable settings, and general information on folder-specific settings,
// see the documentation: https://zed.dev/docs/configuring-zed#settings-files
{
"lsp": {
"deno": {
"settings": {
"deno": {
"enable": true
}
}
}
},
"languages": {
"TypeScript": {
"language_servers": [
"deno",
"!typescript-language-server",
"!vtsls",
"!eslint"
],
"formatter": "language_server"
},
"TSX": {
"language_servers": [
"deno",
"!typescript-language-server",
"!vtsls",
"!eslint"
],
"formatter": "language_server"
}
}
}

View File

@ -1,23 +0,0 @@
FROM oven/bun:1.2.8-debian
WORKDIR /app
COPY ./packages/core ./core
COPY ./packages/backend/package.json ./packages/backend/bun.lock ./backend/
RUN apt update && apt install -y curl
RUN ln -s /bin/uname /usr/bin/uname
RUN /bin/bash -c "$(curl -fsSL https://aliyuncli.alicdn.com/install.sh)"
WORKDIR backend
RUN bun install
COPY ./packages/backend/ .
RUN mkdir -p /app/logs
CMD ["bun", "start"]

View File

@ -1,19 +0,0 @@
FROM oven/bun:1.2.8-debian
WORKDIR /app
COPY . .
RUN bun i
RUN mkdir -p /app/logs
RUN apt update && apt install -y curl
RUN ln -s /bin/uname /usr/bin/uname
RUN /bin/bash -c "$(curl -fsSL https://aliyuncli.alicdn.com/install.sh)"
WORKDIR packages/crawler
CMD ["bun", "all"]

View File

@ -1,23 +0,0 @@
FROM oven/bun
ARG BACKEND_URL
WORKDIR /app
COPY . .
RUN bun install
WORKDIR packages/frontend
RUN bun run build
ENV HOST=0.0.0.0
ENV PORT=4321
ENV BACKEND_URL=${BACKEND_URL}
EXPOSE 4321
RUN mkdir -p /app/logs
CMD ["bun", "/app/packages/frontend/dist/server/entry.mjs"]

View File

@ -1,14 +0,0 @@
FROM node:lts-slim AS production
WORKDIR /app
COPY ./packages/next/.next ./.next
COPY ./packages/next/public ./public
COPY ./packages/next/package.json ./package.json
COPY ./packages/next/node_modules ./node_modules
ENV NODE_ENV production
EXPOSE 7400
CMD ["npm", "start"]

View File

@ -2,11 +2,6 @@
「中V档案馆」是一个旨在收录与展示「中文歌声合成作品」及有关信息的网站。
## 新闻 - 测试版本上线
目前中V档案馆上线了用于测试的前端网页和API接口它们分别位于[projectcvsa.com](https://projectcvsa.com)和[api.projectcvsa.com](https://api.projectcvsa.com)。
API调用方法请参见[接口文档](https://docs.projectcvsa.com/api-doc/)。
## 创建背景与关联工作
纵观整个互联网对于「中文歌声合成」或「中文虚拟歌手」常简称为中V或VC相关信息进行较为系统、全面地整理收集的主要有以下几个网站
@ -36,7 +31,7 @@ API调用方法请参见[接口文档](https://docs.projectcvsa.com/api-doc/)。
## 技术架构
参见[CVSA文档](https://docs.projectcvsa.com/)。
参见[CVSA文档](https://cvsa.gitbook.io/)。
## 开放许可

1732
bun.lock

File diff suppressed because it is too large Load Diff

12
components/Button.tsx Normal file
View File

@ -0,0 +1,12 @@
import { JSX } from "preact";
import { IS_BROWSER } from "$fresh/runtime.ts";
export function Button(props: JSX.HTMLAttributes<HTMLButtonElement>) {
return (
<button
{...props}
disabled={!IS_BROWSER || props.disabled}
class="px-2 py-1 border-gray-500 border-2 rounded bg-white hover:bg-gray-200 transition-colors"
/>
);
}

55
data/filter/1.py Normal file
View File

@ -0,0 +1,55 @@
import json
import random
def process_data(input_file, output_file):
"""
从输入文件中读取数据找出model和human不一致的行
删除"model""human"键重命名为"label"
然后将处理后的数据添加到输出文件中
在写入之前它会加载output_file中的所有样本
并使用aid键进行去重过滤
Args:
input_file (str): 输入文件的路径
output_file (str): 输出文件的路径
"""
# 加载output_file中已有的数据用于去重
existing_data = set()
try:
with open(output_file, 'r', encoding='utf-8') as f_out:
for line in f_out:
try:
data = json.loads(line)
existing_data.add(data['aid'])
except json.JSONDecodeError:
pass # 忽略JSON解码错误继续读取下一行
except FileNotFoundError:
pass # 如果文件不存在,则忽略
with open(input_file, 'r', encoding='utf-8') as f_in, open(output_file, 'a', encoding='utf-8') as f_out:
for line in f_in:
try:
data = json.loads(line)
if data['model'] != data['human'] or random.random() < 0.2:
if data['aid'] not in existing_data: # 检查aid是否已存在
del data['model']
data['label'] = data['human']
del data['human']
f_out.write(json.dumps(data, ensure_ascii=False) + '\n')
existing_data.add(data['aid']) # 将新的aid添加到集合中
except json.JSONDecodeError as e:
print(f"JSON解码错误: {e}")
print(f"错误行内容: {line.strip()}")
except KeyError as e:
print(f"KeyError: 键 '{e}' 不存在")
print(f"错误行内容: {line.strip()}")
# 调用函数处理数据
input_file = 'real_test.jsonl'
output_file = 'labeled_data.jsonl'
process_data(input_file, output_file)
print(f"处理完成,结果已写入 {output_file}")

60
deno.json Normal file
View File

@ -0,0 +1,60 @@
{
"lock": false,
"tasks": {
"crawl-raw-bili": "deno --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run src/db/raw/insertAidsToDB.ts",
"crawl-bili-aids": "deno --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run src/db/raw/fetchAids.ts",
"check": "deno fmt --check && deno lint && deno check **/*.ts && deno check **/*.tsx",
"cli": "echo \"import '\\$fresh/src/dev/cli.ts'\" | deno run --unstable -A -",
"manifest": "deno task cli manifest $(pwd)",
"start": "deno run -A --watch=static/,routes/ dev.ts",
"build": "deno run -A dev.ts build",
"preview": "deno run -A main.ts",
"update": "deno run -A -r https://fresh.deno.dev/update .",
"worker:main": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write --allow-run ./src/worker.ts",
"worker:filter": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/filterWorker.ts",
"adder": "deno run --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts",
"bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts",
"all": "concurrently 'deno task worker:main' 'deno task adder' 'deno task bullui' 'deno task worker:filter'",
"test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run"
},
"lint": {
"rules": {
"tags": ["fresh", "recommended"]
}
},
"exclude": ["**/_fresh/*"],
"imports": {
"@std/assert": "jsr:@std/assert@1",
"$fresh/": "https://deno.land/x/fresh@1.7.3/",
"preact": "https://esm.sh/preact@10.22.0",
"preact/": "https://esm.sh/preact@10.22.0/",
"@preact/signals": "https://esm.sh/*@preact/signals@1.2.2",
"@preact/signals-core": "https://esm.sh/*@preact/signals-core@1.5.1",
"tailwindcss": "npm:tailwindcss@3.4.1",
"tailwindcss/": "npm:/tailwindcss@3.4.1/",
"tailwindcss/plugin": "npm:/tailwindcss@3.4.1/plugin.js",
"$std/": "https://deno.land/std@0.216.0/",
"@huggingface/transformers": "npm:@huggingface/transformers@3.0.0",
"bullmq": "npm:bullmq",
"lib/": "./lib/",
"ioredis": "npm:ioredis",
"@bull-board/api": "npm:@bull-board/api",
"@bull-board/express": "npm:@bull-board/express",
"express": "npm:express",
"src/": "./src/",
"onnxruntime": "npm:onnxruntime-node@1.19.2",
"chalk": "npm:chalk"
},
"compilerOptions": {
"jsx": "react-jsx",
"jsxImportSource": "preact"
},
"nodeModulesDir": "auto",
"fmt": {
"useTabs": true,
"lineWidth": 120,
"indentWidth": 4,
"semiColons": true,
"proseWrap": "always"
}
}

7
dev.ts Executable file
View File

@ -0,0 +1,7 @@
#!/usr/bin/env -S deno run -A --watch=static/,routes/
import dev from "$fresh/dev.ts";
import config from "./fresh.config.ts";
import "$std/dotenv/load.ts";
await dev(import.meta.url, "./main.ts", config);

View File

@ -1,21 +1,22 @@
# Table of contents
* [Welcome](README.md)
- [Welcome](README.md)
## About
* [About CVSA Project](about/this-project.md)
* [Scope of Inclusion](about/scope-of-inclusion.md)
- [About CVSA Project](about/this-project.md)
- [Scope of Inclusion](about/scope-of-inclusion.md)
## Architecure
* [Overview](architecure/overview.md)
* [Crawler](architecure/crawler.md)
* [Database Structure](architecure/database-structure/README.md)
* [Type of Song](architecure/database-structure/type-of-song.md)
* [Artificial Intelligence](architecure/artificial-intelligence.md)
- [Overview](architecure/overview.md)
- [Database Structure](architecure/database-structure/README.md)
- [Type of Song](architecure/database-structure/type-of-song.md)
- [Message Queue](architecure/message-queue/README.md)
- [VideoTagsQueue](architecure/message-queue/videotagsqueue.md)
- [Artificial Intelligence](architecure/artificial-intelligence.md)
## API Doc
* [Catalog](api-doc/catalog.md)
* [Songs](api-doc/songs.md)
- [Catalog](api-doc/catalog.md)
- [Songs](api-doc/songs.md)

View File

@ -7,34 +7,13 @@ For a **song**, it must meet the following conditions to be included in CVSA:
### Category 30
In principle, the songs must be featured in a video that is categorized under the VOCALOID·UTAU (ID 30) category in
[Bilibili](https://en.wikipedia.org/wiki/Bilibili) in order to be observed by our
[automation program](../architecure/overview.md#crawler). We welcome editors to manually add songs that have not been
uploaded to bilibili / categorized under this category.
In principle, the songs featured in CVSA must be included in a video categorized under VOCALOID·UTAU (ID 30) that is
posted on Bilibili. In some special cases, this rule may not be enforced.&#x20;
#### NEWS
### At Leats One Line of Chinese
Recently, Bilibili seems to be offlining the sub-category. This means the VOCALOID·UTAU category can no longer be
entered from the frontend, and producers can no longer upload videos to this category (instead, they can only choose the
parent category "Music").&#x20;
According to our experiments, Bilibili still retains the code logic of sub-categories in the backend, and newly
published songs may still be in the VOCALOID·UTAU sub-category, and the related APIs can still work normally. However,
there are [reports](https://www.bilibili.com/opus/1041223385394184199) that some of the new songs have been placed under
the "Music General" sub-category.\
We are still waiting for Bilibili's follow-up actions, and in the future, we may adjust the scope of our automated
program's crawling.
### At Leats One Line of Chinese / Chinese Virtual Singer
The lyrics of the song must contain at least one line in Chinese. Otherwise, if the lyrics of the song do not contain
Chinese, it will only be included in the CVSA only if a Chinese virtual singer has been used.
We define a **Chinese virtual singer** as follows:
1. The singer primarily uses Chinese voicebank (i.e. the most widely used voickbank for the singer is Chinese)
2. The singer is operated by a company, organization, individual or group located in Mainland China, Hong Kong, Macau or
Taiwan.
The lyrics of the song must contain at least one line in Chinese. This means that even if a voicebank that only supports
Chinese is used, if the lyrics of the song do not contain Chinese, it will not be included in the CVSA.
### Using Vocal Synthesizer

View File

@ -12,10 +12,3 @@ Located at `/filter/` under project root dir, it classifies a video in the
- 0: Not related to Chinese vocal synthesis
- 1: A original song with Chinese vocal synthesis
- 2: A cover/remix song with Chinese vocal synthesis
### The Predictor
Located at `/pred/`under the project root dir, it predicts the future views of a video. This is a regression model that
takes historical view trends of a video, other contextual information (such as the current time), and future time points
to be predicted as feature inputs, and outputs the increment in the video's view count from "now" to the specified
future time point.

View File

@ -1,4 +0,0 @@
# Crawler
A central aspect of CVSA's technical design is its emphasis on automation. The data collection process within the `crawler` is orchestrated using a message queue powered by [BullMQ](https://bullmq.io/). This enables concurrent processing of various tasks involved in the data lifecycle. State management and data persistence are handled by a combination of Redis for caching and real-time data, and PostgreSQL as the primary database.

View File

@ -10,6 +10,3 @@ following tables:
- all\_data: metadata of all videos in [category 30](../../about/scope-of-inclusion.md#category-30).
- labelling\_result: Contains label of videos in `all_data`tagged by our
[AI system](../artificial-intelligence.md#the-filter).
- video\_snapshot: Statistical data of videos that are fetched regularly (e.g., number of views, etc.), we call this
fetch process as "snapshot".
- snapshot\_schedule: The scheduling information for video snapshots.

View File

@ -0,0 +1 @@
# Message Queue

View File

@ -0,0 +1,12 @@
# VideoTagsQueue
### Jobs
The VideoTagsQueue contains two jobs: `getVideoTags`and `getVideosTags`. The former is used to fetch the tags of a
video, and the latter is responsible for scheduling the former.
### Return value
The return values across two jobs follows the following table:
<table><thead><tr><th width="168">Return Value</th><th>Description</th></tr></thead><tbody><tr><td>0</td><td>In <code>getVideoTags</code>: the tags was successfully fetched<br>In <code>getVideosTags</code>: all null-tags videos have a corresponding job successfully queued.</td></tr><tr><td>1</td><td>Used in <code>getVideoTags</code>: occured <code>fetch</code>error during the job</td></tr><tr><td>2</td><td>Used in <code>getVideoTags</code>: we've reached the rate limit set in NetScheduler</td></tr><tr><td>3</td><td>Used in <code>getVideoTags</code>: did't provide aid in the job data</td></tr><tr><td>4</td><td>Used in<code>getVideosTags</code>: There's no video with NULL as `tags`</td></tr><tr><td>1xx</td><td>Used in<code>getVideosTags</code>: the number of tasks in the queue has exceeded the limit, thus <code>getVideosTags</code> stops adding tasks. <code>xx</code> is the number of jobs added to the queue during execution.</td></tr></tbody></table>

View File

@ -1,4 +1,5 @@
---
icon: globe-pointer
layout:
title:
visible: true
@ -14,29 +15,5 @@ layout:
# Overview
The CVSA is a [monorepo](https://en.wikipedia.org/wiki/Monorepo) codebase, mainly using TypeScript as the development language. With [Deno workspace](https://docs.deno.com/runtime/fundamentals/workspaces/), the major part of the codebase is under `packages/`.&#x20;
**Project structure:**
```
cvsa
├── deno.json
├── packages
│ ├── backend
│ ├── core
│ ├── crawler
│ └── frontend
└── README.md
```
**Package Breakdown:**
* **`backend`**: This package houses the server-side logic, built with the [Hono](https://hono.dev/) web framework. It's responsible for interacting with the database and exposing data through REST and GraphQL APIs for consumption by the frontend, internal applications, and third-party developers.
* **`frontend`**: The user-facing web interface of CVSA is developed using [Astro](https://astro.build/). This package handles the presentation layer, displaying information fetched from the database.
* **`crawler`**: This automated data collection system is a key component of CVSA. It's designed to automatically discover and gather new song data from bilibili, as well as track relevant statistics over time.
* **`core`**: This package contains reusable and generic code that is utilized across multiple workspaces within the CVSA monorepo.
### Crawler
Automation is the biggest highlight of CVSA's technical design. The data collection process within the `crawler` is orchestrated using a message queue powered by [BullMQ](https://bullmq.io/). This enables concurrent processing of various tasks involved in the data collection lifecycle. State management and data persistence are handled by a combination of Redis for caching and real-time data, and PostgreSQL as the primary database.
Automation is the biggest highlight of CVSA's technical design. To achieve this, we use a message queue powered by
[BullMQ](https://bullmq.io/) to concurrently process various tasks in the data collection life cycle.

View File

@ -1,106 +0,0 @@
openapi: 3.0.0
info:
title: CVSA API
version: v1
servers:
- url: https://api.projectcvsa.com
paths:
/video/{id}/snapshots:
get:
summary: 获取视频快照列表
description: 根据视频 ID 获取视频的快照列表。视频 ID 可以是以 "av" 开头的数字,以 "BV" 开头的 12 位字母数字,或者一个正整数。
parameters:
- in: path
name: id
required: true
schema:
type: string
description: "视频 ID (如: av78977256, BV1KJ411C7CW, 78977256)"
- in: query
name: ps
schema:
type: integer
minimum: 1
description: 每页返回的快照数量 (pageSize),默认为 1000。
- in: query
name: pn
schema:
type: integer
minimum: 1
description: 页码 (pageNumber)用于分页查询。offset 与 pn 只能选择一个。
- in: query
name: offset
schema:
type: integer
minimum: 1
description: 偏移量用于基于偏移量的查询。offset 与 pn 只能选择一个。
- in: query
name: reverse
schema:
type: boolean
description: 是否反向排序(从旧到新),默认为 false。
responses:
'200':
description: 成功获取快照列表
content:
application/json:
schema:
type: array
items:
type: object
properties:
id:
type: integer
description: 快照 ID
aid:
type: integer
description: 视频的 av 号
views:
type: integer
description: 视频播放量
coins:
type: integer
description: 视频投币数
likes:
type: integer
description: 视频点赞数
favorites:
type: integer
description: 视频收藏数
shares:
type: integer
description: 视频分享数
danmakus:
type: integer
description: 视频弹幕数
replies:
type: integer
description: 视频评论数
'400':
description: 无效的查询参数
content:
application/json:
schema:
type: object
properties:
message:
type: string
description: 错误消息
errors:
type: object
description: 详细的错误信息
'500':
description: 服务器内部错误
content:
application/json:
schema:
type: object
properties:
message:
type: string
description: 错误消息
error:
type: object
description: 详细的错误信息

View File

@ -1,11 +1,11 @@
# Table of contents
* [欢迎](README.md)
- [欢迎](README.md)
## 关于 <a href="#about" id="about"></a>
* [关于本项目](about/this-project.md)
* [收录范围](about/scope-of-inclusion.md)
- [关于本项目](about/this-project.md)
- [收录范围](about/scope-of-inclusion.md)
## 技术架构 <a href="#architecture" id="architecture"></a>
@ -14,9 +14,9 @@
- [歌曲类型](architecture/database-structure/type-of-song.md)
- [人工智能](architecture/artificial-intelligence.md)
- [消息队列](architecture/message-queue/README.md)
- [LatestVideosQueue 队列](architecture/message-queue/latestvideosqueue-dui-lie.md)
- [VideoTagsQueue队列](architecture/message-queue/video-tags-queue.md)
## API 文档 <a href="#api-doc" id="api-doc"></a>
* [目录](api-doc/catalog.md)
* [视频快照](api-doc/video-snapshot.md)
- [目录](api-doc/catalog.md)
- [歌曲](api-doc/songs.md)

View File

@ -1,4 +1,3 @@
# 目录
* [视频快照](video-snapshot.md)
- [歌曲](songs.md)

3
doc/zh/api-doc/songs.md Normal file
View File

@ -0,0 +1,3 @@
# 歌曲
暂未实现。

View File

@ -1,6 +0,0 @@
# 视频快照
{% openapi src="../.gitbook/assets/1.yaml" path="/video/{id}/snapshots" method="get" %}
[1.yaml](../.gitbook/assets/1.yaml)
{% endopenapi %}

View File

@ -2,14 +2,9 @@
CVSA 使用 [PostgreSQL](https://www.postgresql.org/) 作为数据库。
CVSA 设计了两个
CVSA 的所有公开数据(不包括用户的个人数据)都存储在名为 `cvsa_main` 的数据库中,该数据库包含以下表:
- songs存储歌曲的主要信息
- bilibili\_user存储 Bilibili 用户信息快照
- bilibili\_metadata[分区 30](../../about/scope-of-inclusion.md#vocaloiduatu-fen-qu) 中所有视频的元数据
- bili\_user存储 Bilibili 用户信息快照
- all\_data[分区 30](../../about/scope-of-inclusion.md#vocaloiduatu-fen-qu) 中所有视频的元数据
- labelling\_result包含由我们的 AI 系统 标记的 `all_data` 中视频的标签。
- latest\_video\_snapshot存储视频最新的快照
- video\_snapshot存储视频的快照包括特定时间下视频的统计信息播放量、点赞数等
- snapshot\_schedule视频快照的规划信息为辅助表

View File

@ -1 +0,0 @@
# LatestVideosQueue 队列

View File

@ -0,0 +1,15 @@
---
description: 关于VideoTagsQueue队列的信息。
---
# VideoTagsQueue队列
### 任务
视频标签队列包含两个任务:`getVideoTags`和`getVideosTags`。前者用于获取视频的标签,后者负责调度前者。
### 返回值
两个任务的返回值遵循以下表格:
<table><thead><tr><th width="168">返回值</th><th>描述</th></tr></thead><tbody><tr><td>0</td><td><code>getVideoTags</code> 中:标签成功获取<br><code>getVideosTags</code> 中:所有无标签视频的相应任务已成功排队。</td></tr><tr><td>1</td><td><code>getVideoTags</code> 中:任务期间发生 <code>fetch</code> 错误</td></tr><tr><td>2</td><td><code>getVideoTags</code> 中:已达到 NetScheduler 设置的速率限制</td></tr><tr><td>3</td><td><code>getVideoTags</code> 中:未在任务数据中提供帮助</td></tr><tr><td>4</td><td><code>getVideosTags</code> 中:没有视频的 `tags` 为 NULL</td></tr><tr><td>1xx</td><td><code>getVideosTags</code> 中:队列中的任务数量超过了限制,因此 <code>getVideosTags</code> 停止添加任务。<code>xx</code> 是在执行期间添加到队列的任务数量。</td></tr></tbody></table>

View File

@ -1,4 +1,5 @@
---
icon: globe-pointer
layout:
title:
visible: true
@ -14,13 +15,4 @@ layout:
# 概览
整个CVSA项目分为三个组件**crawler**, **frontend** 和 **backend。**
### **crawler**
位于项目目录`packages/crawler` 下,它负责以下工作:
- 抓取新的视频并收录作品
- 持续监控视频的播放量等统计信息
整个 crawler 由 BullMQ 消息队列驱动,使用 Redis 和 PostgreSQL 管理状态。
自动化是 CVSA 技术设计的最大亮点为了实现自动化我们使用BullMQ驱动的消息队列来并发处理数据采集生命周期中的各项任务。

View File

@ -1,71 +0,0 @@
version: '3.8'
services:
db:
image: postgres:17
ports:
- "5431:5432"
environment:
POSTGRES_USER: cvsa
POSTGRES_PASSWORD: ""
POSTGRES_DB: cvsa_main
volumes:
- ./data:/var/lib/postgresql/data
redis:
image: redis:latest
ports:
- "6378:6379"
volumes:
- ./redis/data:/data
- ./redis/redis.conf:/usr/local/etc/redis/redis.conf
- ./redis/logs:/logs
frontend:
build:
context: .
dockerfile: Dockerfile.frontend
ports:
- "4321:4321"
environment:
- HOST=0.0.0.0
- PORT=4321
- DB_HOST=db
- DB_NAME=cvsa_main
- DB_NAME_CRED=cvsa_cred
- DB_USER=cvsa
- DB_PORT=5432
- DB_PASSWORD=""
- LOG_VERBOSE=/app/logs/verbose.log
- LOG_WARN=/app/logs/warn.log
- LOG_ERR=/app/logs/error.log
depends_on:
- db
volumes:
- /path/to/your/logs:/app/logs
backend:
build:
context: .
dockerfile: Dockerfile.backend
ports:
- "8000:8000"
environment:
- HOST=0.0.0.0
- DB_HOST=db
- DB_NAME=cvsa_main
- DB_NAME_CRED=cvsa_cred
- DB_USER=cvsa
- DB_PORT=5432
- DB_PASSWORD=""
- LOG_VERBOSE=/app/logs/verbose.log
- LOG_WARN=/app/logs/warn.log
- LOG_ERR=/app/logs/error.log
- REDIS_HOST=redis
- REDIS_PORT=6379
depends_on:
- db
volumes:
- /path/to/your/logs:/app/logs
volumes:
db_data:

6
fresh.config.ts Normal file
View File

@ -0,0 +1,6 @@
import { defineConfig } from "$fresh/server.ts";
import tailwind from "$fresh/plugins/tailwind.ts";
export default defineConfig({
plugins: [tailwind()],
});

27
fresh.gen.ts Normal file
View File

@ -0,0 +1,27 @@
// DO NOT EDIT. This file is generated by Fresh.
// This file SHOULD be checked into source version control.
// This file is automatically updated during development when running `dev.ts`.
import * as $_404 from "./routes/_404.tsx";
import * as $_app from "./routes/_app.tsx";
import * as $api_joke from "./routes/api/joke.ts";
import * as $greet_name_ from "./routes/greet/[name].tsx";
import * as $index from "./routes/index.tsx";
import * as $Counter from "./islands/Counter.tsx";
import type { Manifest } from "$fresh/server.ts";
const manifest = {
routes: {
"./routes/_404.tsx": $_404,
"./routes/_app.tsx": $_app,
"./routes/api/joke.ts": $api_joke,
"./routes/greet/[name].tsx": $greet_name_,
"./routes/index.tsx": $index,
},
islands: {
"./islands/Counter.tsx": $Counter,
},
baseUrl: import.meta.url,
} satisfies Manifest;
export default manifest;

16
islands/Counter.tsx Normal file
View File

@ -0,0 +1,16 @@
import type { Signal } from "@preact/signals";
import { Button } from "../components/Button.tsx";
interface CounterProps {
count: Signal<number>;
}
export default function Counter(props: CounterProps) {
return (
<div class="flex gap-8 py-6">
<Button onClick={() => props.count.value -= 1}>-1</Button>
<p class="text-3xl tabular-nums">{props.count}</p>
<Button onClick={() => props.count.value += 1}>+1</Button>
</div>
);
}

View File

67
lib/db/allData.ts Normal file
View File

@ -0,0 +1,67 @@
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { AllDataType, BiliUserType } from "lib/db/schema.d.ts";
import { modelVersion } from "lib/ml/filter_inference.ts";
export async function videoExistsInAllData(client: Client, aid: number) {
return await client.queryObject<{ exists: boolean }>(`SELECT EXISTS(SELECT 1 FROM bilibili_metadata WHERE aid = $1)`, [aid])
.then((result) => result.rows[0].exists);
}
export async function userExistsInBiliUsers(client: Client, uid: number) {
return await client.queryObject<{ exists: boolean }>(`SELECT EXISTS(SELECT 1 FROM bilibili_user WHERE uid = $1)`, [
uid,
]);
}
export async function getUnlabelledVideos(client: Client) {
const queryResult = await client.queryObject<{ aid: number }>(
`SELECT a.aid FROM bilibili_metadata a LEFT JOIN labelling_result l ON a.aid = l.aid WHERE l.aid IS NULL`,
);
return queryResult.rows.map((row) => row.aid);
}
export async function insertVideoLabel(client: Client, aid: number, label: number) {
return await client.queryObject(
`INSERT INTO labelling_result (aid, label, model_version) VALUES ($1, $2, $3) ON CONFLICT (aid, model_version) DO NOTHING`,
[aid, label, modelVersion],
);
}
export async function getVideoInfoFromAllData(client: Client, aid: number) {
const queryResult = await client.queryObject<AllDataType>(
`SELECT * FROM bilibili_metadata WHERE aid = $1`,
[aid],
);
const row = queryResult.rows[0];
let authorInfo = "";
if (row.uid && await userExistsInBiliUsers(client, row.uid)) {
const q = await client.queryObject<BiliUserType>(
`SELECT * FROM bilibili_user WHERE uid = $1`,
[row.uid],
);
const userRow = q.rows[0];
if (userRow) {
authorInfo = userRow.desc;
}
}
return {
title: row.title,
description: row.description,
tags: row.tags,
author_info: authorInfo,
};
}
export async function getUnArchivedBiliUsers(client: Client) {
const queryResult = await client.queryObject<{ uid: number }>(
`
SELECT ad.uid
FROM bilibili_metadata ad
LEFT JOIN bilibili_user bu ON ad.uid = bu.uid
WHERE bu.uid IS NULL;
`,
[],
);
const rows = queryResult.rows;
return rows.map((row) => row.uid);
}

6
lib/db/init.ts Normal file
View File

@ -0,0 +1,6 @@
import { Pool } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { postgresConfig } from "lib/db/pgConfig.ts";
const pool = new Pool(postgresConfig, 12);
export const db = pool;

21
lib/db/pgConfig.ts Normal file
View File

@ -0,0 +1,21 @@
const requiredEnvVars = ["DB_HOST", "DB_NAME", "DB_USER", "DB_PASSWORD", "DB_PORT"];
const unsetVars = requiredEnvVars.filter((key) => Deno.env.get(key) === undefined);
if (unsetVars.length > 0) {
throw new Error(`Missing required environment variables: ${unsetVars.join(", ")}`);
}
const databaseHost = Deno.env.get("DB_HOST")!;
const databaseName = Deno.env.get("DB_NAME")!;
const databaseUser = Deno.env.get("DB_USER")!;
const databasePassword = Deno.env.get("DB_PASSWORD")!;
const databasePort = Deno.env.get("DB_PORT")!;
export const postgresConfig = {
hostname: databaseHost,
port: parseInt(databasePort),
database: databaseName,
user: databaseUser,
password: databasePassword,
};

3
lib/db/redis.ts Normal file
View File

@ -0,0 +1,3 @@
import { Redis } from "ioredis";
export const redis = new Redis({ maxRetriesPerRequest: null });

33
lib/db/schema.d.ts vendored Normal file
View File

@ -0,0 +1,33 @@
export interface AllDataType {
id: number;
aid: number;
bvid: string | null;
description: string | null;
uid: number | null;
tags: string | null;
title: string | null;
published_at: string | null;
duration: number;
created_at: string | null;
}
export interface BiliUserType {
id: number;
uid: number;
username: string;
desc: string;
fans: number;
}
export interface VideoSnapshotType {
id: number;
created_at: string;
views: number;
coins: number;
likes: number;
favorites: number;
shares: number;
danmakus: number;
aid: bigint;
replies: number;
}

198
lib/db/snapshot.ts Normal file
View File

@ -0,0 +1,198 @@
import { DAY, SECOND } from "$std/datetime/constants.ts";
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { VideoSnapshotType } from "lib/db/schema.d.ts";
import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts";
export async function getSongsNearMilestone(client: Client) {
const queryResult = await client.queryObject<VideoSnapshotType>(`
WITH filtered_snapshots AS (
SELECT
vs.*
FROM
video_snapshot vs
WHERE
(vs.views >= 90000 AND vs.views < 100000) OR
(vs.views >= 900000 AND vs.views < 1000000)
),
ranked_snapshots AS (
SELECT
fs.*,
ROW_NUMBER() OVER (PARTITION BY fs.aid ORDER BY fs.created_at DESC) as rn,
MAX(fs.views) OVER (PARTITION BY fs.aid) as max_views_per_aid
FROM
filtered_snapshots fs
INNER JOIN
songs s ON fs.aid = s.aid
)
SELECT
rs.id, rs.created_at, rs.views, rs.coins, rs.likes, rs.favorites, rs.shares, rs.danmakus, rs.aid, rs.replies
FROM
ranked_snapshots rs
WHERE
rs.rn = 1;
`);
return queryResult.rows.map((row) => {
return {
...row,
aid: Number(row.aid),
};
});
}
export async function getUnsnapshotedSongs(client: Client) {
const queryResult = await client.queryObject<{ aid: bigint }>(`
SELECT DISTINCT s.aid
FROM songs s
LEFT JOIN video_snapshot v ON s.aid = v.aid
WHERE v.aid IS NULL;
`);
return queryResult.rows.map((row) => Number(row.aid));
}
export async function getSongSnapshotCount(client: Client, aid: number) {
const queryResult = await client.queryObject<{ count: number }>(
`
SELECT COUNT(*) AS count
FROM video_snapshot
WHERE aid = $1;
`,
[aid],
);
return queryResult.rows[0].count;
}
export async function getShortTermEtaPrediction(client: Client, aid: number) {
const queryResult = await client.queryObject<{ eta: number }>(
`
WITH old_snapshot AS (
SELECT created_at, views
FROM video_snapshot
WHERE aid = $1 AND
NOW() - created_at > '20 min'
ORDER BY created_at DESC
LIMIT 1
),
new_snapshot AS (
SELECT created_at, views
FROM video_snapshot
WHERE aid = $1
ORDER BY created_at DESC
LIMIT 1
)
SELECT
CASE
WHEN n.views > 100000
THEN
(1000000 - n.views) -- Views remaining
/
(
(n.views - o.views) -- Views delta
/
(EXTRACT(EPOCH FROM (n.created_at - o.created_at)) + 0.001) -- Time delta in seconds
+ 0.001
) -- Increment per second
ELSE
(100000 - n.views) -- Views remaining
/
(
(n.views - o.views) -- Views delta
/
(EXTRACT(EPOCH FROM (n.created_at - o.created_at)) + 0.001) -- Time delta in seconds
+ 0.001
) -- Increment per second
END AS eta
FROM old_snapshot o, new_snapshot n;
`,
[aid],
);
if (queryResult.rows.length === 0) {
return null;
}
return queryResult.rows[0].eta;
}
export async function getIntervalFromLastSnapshotToNow(client: Client, aid: number) {
const queryResult = await client.queryObject<{ interval: number }>(
`
SELECT EXTRACT(EPOCH FROM (NOW() - created_at)) AS interval
FROM video_snapshot
WHERE aid = $1
ORDER BY created_at DESC
LIMIT 1;
`,
[aid],
);
if (queryResult.rows.length === 0) {
return null;
}
return queryResult.rows[0].interval;
}
export async function songEligibleForMilestoneSnapshot(client: Client, aid: number) {
const count = await getSongSnapshotCount(client, aid);
if (count < 2) {
return true;
}
const queryResult = await client.queryObject<
{ views1: number; created_at1: string; views2: number; created_at2: string }
>(
`
WITH latest_snapshot AS (
SELECT
aid,
views,
created_at
FROM video_snapshot
WHERE aid = $1
ORDER BY created_at DESC
LIMIT 1
),
pairs AS (
SELECT
a.views AS views1,
a.created_at AS created_at1,
b.views AS views2,
b.created_at AS created_at2,
(b.created_at - a.created_at) AS interval
FROM video_snapshot a
JOIN latest_snapshot b
ON a.aid = b.aid
AND a.created_at < b.created_at
)
SELECT
views1,
created_at1,
views2,
created_at2
FROM (
SELECT
*,
ROW_NUMBER() OVER (
ORDER BY
CASE WHEN interval <= INTERVAL '3 days' THEN 0 ELSE 1 END,
CASE WHEN interval <= INTERVAL '3 days' THEN -interval ELSE interval END
) AS rn
FROM pairs
) ranked
WHERE rn = 1;
`,
[aid],
);
if (queryResult.rows.length === 0) {
return true;
}
const recentViewsData = queryResult.rows[0];
const time1 = parseTimestampFromPsql(recentViewsData.created_at1);
const time2 = parseTimestampFromPsql(recentViewsData.created_at2);
const intervalSec = (time2 - time1) / SECOND;
const views1 = recentViewsData.views1;
const views2 = recentViewsData.views2;
const viewsDiff = views2 - views1;
if (viewsDiff == 0) {
return false;
}
const nextMilestone = views2 >= 100000 ? 1000000 : 100000;
const expectedViewsDiff = nextMilestone - views2;
const expectedIntervalSec = expectedViewsDiff / viewsDiff * intervalSec;
return expectedIntervalSec <= 3 * DAY;
}

29
lib/db/songs.ts Normal file
View File

@ -0,0 +1,29 @@
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
export async function getNotCollectedSongs(client: Client) {
const queryResult = await client.queryObject<{ aid: number }>(`
SELECT lr.aid
FROM labelling_result lr
WHERE lr.label != 0
AND NOT EXISTS (
SELECT 1
FROM songs s
WHERE s.aid = lr.aid
);
`);
return queryResult.rows.map((row) => row.aid);
}
export async function aidExistsInSongs(client: Client, aid: number) {
const queryResult = await client.queryObject<{ exists: boolean }>(
`
SELECT EXISTS (
SELECT 1
FROM songs
WHERE aid = $1
);
`,
[aid],
);
return queryResult.rows[0].exists;
}

View File

@ -1,5 +1,5 @@
import winston, { format, transports } from "winston";
import type { TransformableInfo } from "logform";
import winston, { format, transports } from "npm:winston";
import { TransformableInfo } from "npm:logform";
import chalk from "chalk";
const customFormat = format.printf((info: TransformableInfo) => {
@ -24,13 +24,13 @@ const createTransport = (level: string, filename: string) => {
let maxsize = undefined;
let maxFiles = undefined;
let tailable = undefined;
if (level === "silly") {
maxsize = 500 * MB;
maxFiles = undefined;
if (level === "verbose") {
maxsize = 10 * MB;
maxFiles = 10;
tailable = false;
} else if (level === "warn") {
maxsize = 10 * MB;
maxFiles = 5;
maxFiles = 1;
tailable = false;
}
function replacer(key: unknown, value: unknown) {
@ -52,9 +52,9 @@ const createTransport = (level: string, filename: string) => {
});
};
const sillyLogPath = process.env["LOG_VERBOSE"] ?? "logs/verbose.log";
const warnLogPath = process.env["LOG_WARN"] ?? "logs/warn.log";
const errorLogPath = process.env["LOG_ERROR"] ?? "logs/error.log";
const sillyLogPath = Deno.env.get("LOG_VERBOSE") ?? "logs/verbose.log";
const warnLogPath = Deno.env.get("LOG_WARN") ?? "logs/warn.log";
const errorLogPath = Deno.env.get("LOG_ERROR") ?? "logs/error.log";
const winstonLogger = winston.createLogger({
levels: winston.config.npm.levels,
@ -62,7 +62,7 @@ const winstonLogger = winston.createLogger({
new transports.Console({
level: "debug",
format: format.combine(
format.timestamp({ format: "YYYY-MM-DD HH:mm:ss.SSS" }),
format.timestamp({ format: "HH:mm:ss.SSS" }),
format.colorize(),
format.errors({ stack: true }),
customFormat,

View File

@ -1,4 +1,4 @@
import logger from "@core/log/logger.ts";
import logger from "lib/log/logger.ts";
logger.error(Error("test error"), "test service");
logger.debug(`some string`);

173
lib/ml/benchmark.ts Normal file
View File

@ -0,0 +1,173 @@
import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers";
import * as ort from "onnxruntime";
import { softmax } from "lib/ml/filter_inference.ts";
// 配置参数
const sentenceTransformerModelName = "alikia2x/jina-embedding-v3-m2v-1024";
const onnxClassifierPath = "./model/video_classifier_v3_17.onnx";
const onnxEmbeddingPath = "./model/embedding_original.onnx";
const testDataPath = "./data/filter/test1.jsonl";
// 初始化会话
const [sessionClassifier, sessionEmbedding] = await Promise.all([
ort.InferenceSession.create(onnxClassifierPath),
ort.InferenceSession.create(onnxEmbeddingPath),
]);
let tokenizer: PreTrainedTokenizer;
// 初始化分词器
async function loadTokenizer() {
const tokenizerConfig = { local_files_only: true };
tokenizer = await AutoTokenizer.from_pretrained(sentenceTransformerModelName, tokenizerConfig);
}
// 新的嵌入生成函数使用ONNX
async function getONNXEmbeddings(texts: string[], session: ort.InferenceSession): Promise<number[]> {
const { input_ids } = await tokenizer(texts, {
add_special_tokens: false,
return_tensor: false,
});
// 构造输入参数
const cumsum = (arr: number[]): number[] =>
arr.reduce((acc: number[], num: number, i: number) => [...acc, num + (acc[i - 1] || 0)], []);
const offsets: number[] = [0, ...cumsum(input_ids.slice(0, -1).map((x: string) => x.length))];
const flattened_input_ids = input_ids.flat();
// 准备ONNX输入
const inputs = {
input_ids: new ort.Tensor("int64", new BigInt64Array(flattened_input_ids.map(BigInt)), [
flattened_input_ids.length,
]),
offsets: new ort.Tensor("int64", new BigInt64Array(offsets.map(BigInt)), [offsets.length]),
};
// 执行推理
const { embeddings } = await session.run(inputs);
return Array.from(embeddings.data as Float32Array);
}
// 分类推理函数
async function runClassification(embeddings: number[]): Promise<number[]> {
const inputTensor = new ort.Tensor(
Float32Array.from(embeddings),
[1, 3, 1024],
);
const { logits } = await sessionClassifier.run({ channel_features: inputTensor });
return softmax(logits.data as Float32Array);
}
// 指标计算函数
function calculateMetrics(labels: number[], predictions: number[], elapsedTime: number): {
accuracy: number;
precision: number;
recall: number;
f1: number;
"Class 0 Prec": number;
speed: string;
} {
// 输出label和prediction不一样的index列表
const arr = [];
for (let i = 0; i < labels.length; i++) {
if (labels[i] !== predictions[i] && predictions[i] == 0) {
arr.push([i + 1, labels[i], predictions[i]]);
}
}
console.log(arr);
// 初始化混淆矩阵
const classCount = Math.max(...labels, ...predictions) + 1;
const matrix = Array.from({ length: classCount }, () => Array.from({ length: classCount }, () => 0));
// 填充矩阵
labels.forEach((trueLabel, i) => {
matrix[trueLabel][predictions[i]]++;
});
// 计算各指标
let totalTP = 0, totalFP = 0, totalFN = 0;
for (let c = 0; c < classCount; c++) {
const TP = matrix[c][c];
const FP = matrix.flatMap((row, i) => i === c ? [] : [row[c]]).reduce((a, b) => a + b, 0);
const FN = matrix[c].filter((_, i) => i !== c).reduce((a, b) => a + b, 0);
totalTP += TP;
totalFP += FP;
totalFN += FN;
}
const precision = totalTP / (totalTP + totalFP);
const recall = totalTP / (totalTP + totalFN);
const f1 = 2 * (precision * recall) / (precision + recall) || 0;
// 计算Class 0 Precision
const class0TP = matrix[0][0];
const class0FP = matrix.flatMap((row, i) => i === 0 ? [] : [row[0]]).reduce((a, b) => a + b, 0);
const class0Precision = class0TP / (class0TP + class0FP) || 0;
return {
accuracy: labels.filter((l, i) => l === predictions[i]).length / labels.length,
precision,
recall,
f1,
speed: `${(labels.length / (elapsedTime / 1000)).toFixed(1)} samples/sec`,
"Class 0 Prec": class0Precision,
};
}
// 改造后的评估函数
async function evaluateModel(session: ort.InferenceSession): Promise<{
accuracy: number;
precision: number;
recall: number;
f1: number;
"Class 0 Prec": number;
}> {
const data = await Deno.readTextFile(testDataPath);
const samples = data.split("\n")
.map((line) => {
try {
return JSON.parse(line);
} catch {
return null;
}
})
.filter(Boolean);
const allPredictions: number[] = [];
const allLabels: number[] = [];
const t = new Date().getTime();
for (const sample of samples) {
try {
const embeddings = await getONNXEmbeddings([
sample.title,
sample.description,
sample.tags.join(","),
], session);
const probabilities = await runClassification(embeddings);
allPredictions.push(probabilities.indexOf(Math.max(...probabilities)));
allLabels.push(sample.label);
} catch (error) {
console.error("Processing error:", error);
}
}
const elapsed = new Date().getTime() - t;
return calculateMetrics(allLabels, allPredictions, elapsed);
}
// 主函数
async function main() {
await loadTokenizer();
const metrics = await evaluateModel(sessionEmbedding);
console.log("Model Metrics:");
console.table(metrics);
}
await main();

View File

@ -0,0 +1,99 @@
import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers";
import * as ort from "onnxruntime";
import logger from "lib/log/logger.ts";
import { WorkerError } from "lib/mq/schema.ts";
const tokenizerModel = "alikia2x/jina-embedding-v3-m2v-1024";
const onnxClassifierPath = "./model/video_classifier_v3_17.onnx";
const onnxEmbeddingOriginalPath = "./model/model.onnx";
export const modelVersion = "3.17";
let sessionClassifier: ort.InferenceSession | null = null;
let sessionEmbedding: ort.InferenceSession | null = null;
let tokenizer: PreTrainedTokenizer | null = null;
export async function initializeModels() {
if (tokenizer && sessionClassifier && sessionEmbedding) {
return;
}
try {
tokenizer = await AutoTokenizer.from_pretrained(tokenizerModel);
const [classifierSession, embeddingSession] = await Promise.all([
ort.InferenceSession.create(onnxClassifierPath),
ort.InferenceSession.create(onnxEmbeddingOriginalPath),
]);
sessionClassifier = classifierSession;
sessionEmbedding = embeddingSession;
logger.log("Filter models initialized", "ml");
} catch (error) {
throw new WorkerError(error as Error, "ml", "fn:initializeModels");
}
}
export function softmax(logits: Float32Array): number[] {
const maxLogit = Math.max(...logits);
const exponents = logits.map((logit) => Math.exp(logit - maxLogit));
const sumOfExponents = exponents.reduce((sum, exp) => sum + exp, 0);
return Array.from(exponents.map((exp) => exp / sumOfExponents));
}
async function getONNXEmbeddings(texts: string[], session: ort.InferenceSession): Promise<number[]> {
if (!tokenizer) {
throw new Error("Tokenizer is not initialized. Call initializeModels() first.");
}
const { input_ids } = await tokenizer(texts, {
add_special_tokens: false,
return_tensor: false,
});
const cumsum = (arr: number[]): number[] =>
arr.reduce((acc: number[], num: number, i: number) => [...acc, num + (acc[i - 1] || 0)], []);
const offsets: number[] = [0, ...cumsum(input_ids.slice(0, -1).map((x: string) => x.length))];
const flattened_input_ids = input_ids.flat();
const inputs = {
input_ids: new ort.Tensor("int64", new BigInt64Array(flattened_input_ids.map(BigInt)), [
flattened_input_ids.length,
]),
offsets: new ort.Tensor("int64", new BigInt64Array(offsets.map(BigInt)), [offsets.length]),
};
const { embeddings } = await session.run(inputs);
return Array.from(embeddings.data as Float32Array);
}
async function runClassification(embeddings: number[]): Promise<number[]> {
if (!sessionClassifier) {
throw new Error("Classifier session is not initialized. Call initializeModels() first.");
}
const inputTensor = new ort.Tensor(
Float32Array.from(embeddings),
[1, 3, 1024],
);
const { logits } = await sessionClassifier.run({ channel_features: inputTensor });
return softmax(logits.data as Float32Array);
}
export async function classifyVideo(
title: string,
description: string,
tags: string,
aid: number,
): Promise<number> {
if (!sessionEmbedding) {
throw new Error("Embedding session is not initialized. Call initializeModels() first.");
}
const embeddings = await getONNXEmbeddings([
title,
description,
tags,
], sessionEmbedding);
const probabilities = await runClassification(embeddings);
logger.log(`Prediction result for aid: ${aid}: [${probabilities.map((p) => p.toFixed(5))}]`, "ml");
return probabilities.indexOf(Math.max(...probabilities));
}

165
lib/ml/quant_benchmark.ts Normal file
View File

@ -0,0 +1,165 @@
import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers";
import * as ort from "onnxruntime";
import { softmax } from "lib/ml/filter_inference.ts";
// 配置参数
const sentenceTransformerModelName = "alikia2x/jina-embedding-v3-m2v-1024";
const onnxClassifierPath = "./model/video_classifier_v3_11.onnx";
const onnxEmbeddingOriginalPath = "./model/embedding_original.onnx";
const onnxEmbeddingQuantizedPath = "./model/embedding_original.onnx";
// 初始化会话
const [sessionClassifier, sessionEmbeddingOriginal, sessionEmbeddingQuantized] = await Promise.all([
ort.InferenceSession.create(onnxClassifierPath),
ort.InferenceSession.create(onnxEmbeddingOriginalPath),
ort.InferenceSession.create(onnxEmbeddingQuantizedPath),
]);
let tokenizer: PreTrainedTokenizer;
// 初始化分词器
async function loadTokenizer() {
const tokenizerConfig = { local_files_only: true };
tokenizer = await AutoTokenizer.from_pretrained(sentenceTransformerModelName, tokenizerConfig);
}
// 新的嵌入生成函数使用ONNX
async function getONNXEmbeddings(texts: string[], session: ort.InferenceSession): Promise<number[]> {
const { input_ids } = await tokenizer(texts, {
add_special_tokens: false,
return_tensor: false,
});
// 构造输入参数
const cumsum = (arr: number[]): number[] =>
arr.reduce((acc: number[], num: number, i: number) => [...acc, num + (acc[i - 1] || 0)], []);
const offsets: number[] = [0, ...cumsum(input_ids.slice(0, -1).map((x: string) => x.length))];
const flattened_input_ids = input_ids.flat();
// 准备ONNX输入
const inputs = {
input_ids: new ort.Tensor("int64", new BigInt64Array(flattened_input_ids.map(BigInt)), [
flattened_input_ids.length,
]),
offsets: new ort.Tensor("int64", new BigInt64Array(offsets.map(BigInt)), [offsets.length]),
};
// 执行推理
const { embeddings } = await session.run(inputs);
return Array.from(embeddings.data as Float32Array);
}
// 分类推理函数
async function runClassification(embeddings: number[]): Promise<number[]> {
const inputTensor = new ort.Tensor(
Float32Array.from(embeddings),
[1, 4, 1024],
);
const { logits } = await sessionClassifier.run({ channel_features: inputTensor });
return softmax(logits.data as Float32Array);
}
// 指标计算函数
function calculateMetrics(labels: number[], predictions: number[], elapsedTime: number): {
accuracy: number;
precision: number;
recall: number;
f1: number;
speed: string;
} {
// 初始化混淆矩阵
const classCount = Math.max(...labels, ...predictions) + 1;
const matrix = Array.from({ length: classCount }, () => Array.from({ length: classCount }, () => 0));
// 填充矩阵
labels.forEach((trueLabel, i) => {
matrix[trueLabel][predictions[i]]++;
});
// 计算各指标
let totalTP = 0, totalFP = 0, totalFN = 0;
for (let c = 0; c < classCount; c++) {
const TP = matrix[c][c];
const FP = matrix.flatMap((row, i) => i === c ? [] : [row[c]]).reduce((a, b) => a + b, 0);
const FN = matrix[c].filter((_, i) => i !== c).reduce((a, b) => a + b, 0);
totalTP += TP;
totalFP += FP;
totalFN += FN;
}
const precision = totalTP / (totalTP + totalFP);
const recall = totalTP / (totalTP + totalFN);
const f1 = 2 * (precision * recall) / (precision + recall) || 0;
return {
accuracy: labels.filter((l, i) => l === predictions[i]).length / labels.length,
precision,
recall,
f1,
speed: `${(labels.length / (elapsedTime / 1000)).toFixed(1)} samples/sec`,
};
}
// 改造后的评估函数
async function evaluateModel(session: ort.InferenceSession): Promise<{
accuracy: number;
precision: number;
recall: number;
f1: number;
}> {
const data = await Deno.readTextFile("./data/filter/test1.jsonl");
const samples = data.split("\n")
.map((line) => {
try {
return JSON.parse(line);
} catch {
return null;
}
})
.filter(Boolean);
const allPredictions: number[] = [];
const allLabels: number[] = [];
const t = new Date().getTime();
for (const sample of samples) {
try {
const embeddings = await getONNXEmbeddings([
sample.title,
sample.description,
sample.tags.join(","),
sample.author_info,
], session);
const probabilities = await runClassification(embeddings);
allPredictions.push(probabilities.indexOf(Math.max(...probabilities)));
allLabels.push(sample.label);
} catch (error) {
console.error("Processing error:", error);
}
}
const elapsed = new Date().getTime() - t;
return calculateMetrics(allLabels, allPredictions, elapsed);
}
// 主函数
async function main() {
await loadTokenizer();
// 评估原始模型
const originalMetrics = await evaluateModel(sessionEmbeddingOriginal);
console.log("Original Model Metrics:");
console.table(originalMetrics);
// 评估量化模型
const quantizedMetrics = await evaluateModel(sessionEmbeddingQuantized);
console.log("Quantized Model Metrics:");
console.table(quantizedMetrics);
}
await main();

View File

@ -0,0 +1,67 @@
import { Job } from "bullmq";
import { db } from "lib/db/init.ts";
import { getUnlabelledVideos, getVideoInfoFromAllData, insertVideoLabel } from "lib/db/allData.ts";
import { classifyVideo } from "lib/ml/filter_inference.ts";
import { ClassifyVideoQueue } from "lib/mq/index.ts";
import logger from "lib/log/logger.ts";
import { lockManager } from "lib/mq/lockManager.ts";
import { aidExistsInSongs } from "lib/db/songs.ts";
import { insertIntoSongs } from "lib/mq/task/collectSongs.ts";
export const classifyVideoWorker = async (job: Job) => {
const client = await db.connect();
const aid = job.data.aid;
if (!aid) {
return 3;
}
const videoInfo = await getVideoInfoFromAllData(client, aid);
const title = videoInfo.title?.trim() || "untitled";
const description = videoInfo.description?.trim() || "N/A";
const tags = videoInfo.tags?.trim() || "empty";
const label = await classifyVideo(title, description, tags, aid);
if (label == -1) {
logger.warn(`Failed to classify video ${aid}`, "ml");
}
await insertVideoLabel(client, aid, label);
const exists = await aidExistsInSongs(client, aid);
if (!exists && label !== 0) {
await insertIntoSongs(client, aid);
}
client.release();
await job.updateData({
...job.data,
label: label,
});
return 0;
};
export const classifyVideosWorker = async () => {
if (await lockManager.isLocked("classifyVideos")) {
logger.log("job:classifyVideos is locked, skipping.", "mq");
return;
}
await lockManager.acquireLock("classifyVideos");
const client = await db.connect();
const videos = await getUnlabelledVideos(client);
logger.log(`Found ${videos.length} unlabelled videos`);
client.release();
let i = 0;
for (const aid of videos) {
if (i > 200) {
await lockManager.releaseLock("classifyVideos");
return 10000 + i;
}
await ClassifyVideoQueue.add("classifyVideo", { aid: Number(aid) });
i++;
}
await lockManager.releaseLock("classifyVideos");
return 0;
};

View File

@ -0,0 +1,37 @@
import { Job } from "bullmq";
import { queueLatestVideos } from "lib/mq/task/queueLatestVideo.ts";
import { db } from "lib/db/init.ts";
import { insertVideoInfo } from "lib/mq/task/getVideoDetails.ts";
import { collectSongs } from "lib/mq/task/collectSongs.ts";
export const getLatestVideosWorker = async (_job: Job): Promise<void> => {
const client = await db.connect();
try {
await queueLatestVideos(client);
} finally {
client.release();
}
};
export const collectSongsWorker = async (_job: Job): Promise<void> => {
const client = await db.connect();
try {
await collectSongs(client);
} finally {
client.release();
}
};
export const getVideoInfoWorker = async (job: Job): Promise<number> => {
const client = await db.connect();
try {
const aid = job.data.aid;
if (!aid) {
return 3;
}
await insertVideoInfo(client, aid);
return 0;
} finally {
client.release();
}
};

227
lib/mq/exec/snapshotTick.ts Normal file
View File

@ -0,0 +1,227 @@
import { Job } from "bullmq";
import { HOUR, MINUTE, SECOND } from "$std/datetime/constants.ts";
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { db } from "lib/db/init.ts";
import {
getIntervalFromLastSnapshotToNow,
getShortTermEtaPrediction,
getSongsNearMilestone,
getUnsnapshotedSongs,
songEligibleForMilestoneSnapshot,
} from "lib/db/snapshot.ts";
import { SnapshotQueue } from "lib/mq/index.ts";
import { insertVideoStats } from "lib/mq/task/getVideoStats.ts";
import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts";
import { redis } from "lib/db/redis.ts";
import { NetSchedulerError } from "lib/mq/scheduler.ts";
import logger from "lib/log/logger.ts";
import { formatSeconds } from "lib/utils/formatSeconds.ts";
import { truncate } from "lib/utils/truncate.ts";
async function snapshotScheduled(aid: number) {
try {
return await redis.exists(`cvsa:snapshot:${aid}`);
} catch {
logger.error(`Failed to check scheduled status for ${aid}`, "mq");
return false;
}
}
async function setSnapshotScheduled(aid: number, value: boolean, exp: number) {
try {
if (value) {
await redis.set(`cvsa:snapshot:${aid}`, 1, "EX", exp);
} else {
await redis.del(`cvsa:snapshot:${aid}`);
}
} catch {
logger.error(`Failed to set scheduled status to ${value} for ${aid}`, "mq");
}
}
interface SongNearMilestone {
aid: number;
id: number;
created_at: string;
views: number;
coins: number;
likes: number;
favorites: number;
shares: number;
danmakus: number;
replies: number;
}
async function processMilestoneSnapshots(client: Client, vidoesNearMilestone: SongNearMilestone[]) {
let i = 0;
for (const snapshot of vidoesNearMilestone) {
if (await snapshotScheduled(snapshot.aid)) {
continue;
}
const timeFromLastSnapshot = await getIntervalFromLastSnapshotToNow(client, snapshot.aid);
const lastSnapshotLessThan8Hrs = timeFromLastSnapshot && timeFromLastSnapshot * SECOND < 8 * HOUR;
const notEligible = await songEligibleForMilestoneSnapshot(client, snapshot.aid);
if (notEligible && lastSnapshotLessThan8Hrs) {
continue;
}
const factor = Math.floor(i / 8);
const delayTime = factor * SECOND * 2;
await SnapshotQueue.add("snapshotMilestoneVideo", {
aid: snapshot.aid,
currentViews: snapshot.views,
snapshotedAt: snapshot.created_at,
}, { delay: delayTime, priority: 1 });
await setSnapshotScheduled(snapshot.aid, true, 20 * 60);
i++;
}
}
async function processUnsnapshotedVideos(unsnapshotedVideos: number[]) {
let i = 0;
for (const aid of unsnapshotedVideos) {
if (await snapshotScheduled(aid)) {
logger.silly(`Video ${aid} is already scheduled for snapshot`, "mq", "fn:processUnsnapshotedVideos");
continue;
}
const factor = Math.floor(i / 5);
const delayTime = factor * SECOND * 4;
await SnapshotQueue.add("snapshotVideo", {
aid,
}, { delay: delayTime, priority: 3 });
await setSnapshotScheduled(aid, true, 6 * 60 * 60);
i++;
}
}
export const snapshotTickWorker = async (_job: Job) => {
const client = await db.connect();
try {
const vidoesNearMilestone = await getSongsNearMilestone(client);
await processMilestoneSnapshots(client, vidoesNearMilestone);
const unsnapshotedVideos = await getUnsnapshotedSongs(client);
await processUnsnapshotedVideos(unsnapshotedVideos);
} finally {
client.release();
}
};
const log = (a: number, b: number = 10) => Math.log(a) / Math.log(b);
export const takeSnapshotForMilestoneVideoWorker = async (job: Job) => {
const client = await db.connect();
await setSnapshotScheduled(job.data.aid, true, 20 * 60);
try {
const aid: number = job.data.aid;
const currentViews: number = job.data.currentViews;
const lastSnapshoted: string = job.data.snapshotedAt;
const stat = await insertVideoStats(client, aid, "snapshotMilestoneVideo");
if (typeof stat === "number") {
if (stat === -404 || stat === 62002 || stat == 62012) {
await setSnapshotScheduled(aid, true, 6 * 60 * 60);
} else {
await setSnapshotScheduled(aid, false, 0);
}
return;
}
const nextMilestone = currentViews >= 100000 ? 1000000 : 100000;
if (stat.views >= nextMilestone) {
await setSnapshotScheduled(aid, false, 0);
return;
}
let eta = await getShortTermEtaPrediction(client, aid);
let factor = 3;
if (eta === null) {
const DELTA = 0.001;
const intervalSeconds = (Date.now() - parseTimestampFromPsql(lastSnapshoted)) / SECOND;
const viewsIncrement = stat.views - currentViews;
const incrementSpeed = viewsIncrement / (intervalSeconds + DELTA);
const viewsToIncrease = nextMilestone - stat.views;
eta = viewsToIncrease / (incrementSpeed + DELTA);
factor = log(2.97 / log(viewsToIncrease + 1), 1.14);
}
const scheduledNextSnapshotDelay = eta * SECOND / factor;
const maxInterval = 60 * MINUTE;
const minInterval = 1 * SECOND;
const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval);
await SnapshotQueue.add("snapshotMilestoneVideo", {
aid,
currentViews: stat.views,
snapshotedAt: stat.time,
}, { delay, priority: 1 });
await job.updateData({
...job.data,
updatedViews: stat.views,
updatedTime: new Date(stat.time).toISOString(),
etaInMins: eta / 60,
});
logger.log(
`Scheduled next milestone snapshot for ${aid} in ${
formatSeconds(delay / 1000)
}, current views: ${stat.views}`,
"mq",
);
} catch (e) {
if (e instanceof NetSchedulerError && e.code === "NO_AVAILABLE_PROXY") {
logger.warn(
`No available proxy for aid ${job.data.aid}.`,
"mq",
"fn:takeSnapshotForMilestoneVideoWorker",
);
await SnapshotQueue.add("snapshotMilestoneVideo", {
aid: job.data.aid,
currentViews: job.data.currentViews,
snapshotedAt: job.data.snapshotedAt,
}, { delay: 5 * SECOND, priority: 1 });
return;
}
throw e;
} finally {
client.release();
}
};
export const takeSnapshotForVideoWorker = async (job: Job) => {
const client = await db.connect();
await setSnapshotScheduled(job.data.aid, true, 6 * 60 * 60);
try {
const { aid } = job.data;
const stat = await insertVideoStats(client, aid, "getVideoInfo");
if (typeof stat === "number") {
if (stat === -404 || stat === 62002 || stat == 62012) {
await setSnapshotScheduled(aid, true, 6 * 60 * 60);
} else {
await setSnapshotScheduled(aid, false, 0);
}
return;
}
logger.log(`Taken snapshot for ${aid}`, "mq");
if (stat == null) {
setSnapshotScheduled(aid, false, 0);
return;
}
await job.updateData({
...job.data,
updatedViews: stat.views,
updatedTime: new Date(stat.time).toISOString(),
});
const nearMilestone = (stat.views >= 90000 && stat.views < 100000) ||
(stat.views >= 900000 && stat.views < 1000000);
if (nearMilestone) {
await SnapshotQueue.add("snapshotMilestoneVideo", {
aid,
currentViews: stat.views,
snapshotedAt: stat.time,
}, { delay: 0, priority: 1 });
}
await setSnapshotScheduled(aid, false, 0);
} catch (e) {
if (e instanceof NetSchedulerError && e.code === "NO_AVAILABLE_PROXY") {
await setSnapshotScheduled(job.data.aid, false, 0);
return;
}
throw e;
} finally {
client.release();
}
};

1
lib/mq/executors.ts Normal file
View File

@ -0,0 +1 @@
export * from "lib/mq/exec/getLatestVideos.ts";

7
lib/mq/index.ts Normal file
View File

@ -0,0 +1,7 @@
import { Queue } from "bullmq";
export const LatestVideosQueue = new Queue("latestVideos");
export const ClassifyVideoQueue = new Queue("classifyVideo");
export const SnapshotQueue = new Queue("snapshot");

24
lib/mq/init.ts Normal file
View File

@ -0,0 +1,24 @@
import { MINUTE } from "$std/datetime/constants.ts";
import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "lib/mq/index.ts";
import logger from "lib/log/logger.ts";
export async function initMQ() {
await LatestVideosQueue.upsertJobScheduler("getLatestVideos", {
every: 1 * MINUTE,
immediately: true,
});
await ClassifyVideoQueue.upsertJobScheduler("classifyVideos", {
every: 5 * MINUTE,
immediately: true,
});
await LatestVideosQueue.upsertJobScheduler("collectSongs", {
every: 3 * MINUTE,
immediately: true,
});
await SnapshotQueue.upsertJobScheduler("scheduleSnapshotTick", {
every: 3 * MINUTE,
immediately: true,
});
logger.log("Message queue initialized.");
}

View File

@ -1,5 +1,5 @@
import { Redis } from "ioredis";
import { redis } from "@core/db/redis.ts";
import { redis } from "lib/db/redis.ts";
class LockManager {
private redis: Redis;

Some files were not shown because too many files have changed in this diff Show More