Compare commits

...

62 Commits

Author SHA1 Message Date
92c3c8eefe
update: dependency version 2025-06-10 15:08:08 +08:00
497ea031d8
fix: missing field in db schema 2025-06-10 15:03:39 +08:00
39ca394a56
fix: several bugs about type 2025-06-10 14:46:28 +08:00
0bd1771f35
improve: date formatting in frontend 2025-06-10 14:38:55 +08:00
328c73c209
fix: 'close' is missing in DialogButtonGroup in ErrorDialog 2025-06-08 18:24:15 +08:00
5ac952ec13
merge: branch 'feat/frontend' into main 2025-06-08 18:09:49 +08:00
2cf5923b28
add: logout in frontend 2025-06-08 18:06:46 +08:00
75973c72ee
add: login, user profile page & license 2025-06-07 19:42:36 +08:00
b40d24721c
add: milestone monitoring for videos close to N million views 2025-06-06 21:37:55 +08:00
0a6ecc6314
add: route for getting videos 2025-06-06 21:03:52 +08:00
b4a0320e3e
version: cvsa/3.15.34 2025-06-06 17:40:32 +08:00
8cf9395354
fix: don't adjust start time for schedule when no proxy is available 2025-06-06 17:11:44 +08:00
1e8d28e194
fix: incorrectly ignored type when collecting videos for archive snapshots 2025-06-06 16:52:27 +08:00
c0340677a1
merge: pull request #8 from ref/next 2025-06-05 22:31:06 +08:00
54a2de0a11
Merge branch 'main' into ref/next 2025-06-05 22:30:19 +08:00
3abd6666c0
add: missing dependency in backend 2025-06-02 05:33:50 +08:00
44e13724fc
fix: missing dependency in backend 2025-06-02 05:17:46 +08:00
dd7e2242a0
add: docker support for nextjs frontend 2025-06-02 05:04:26 +08:00
503a93a09f
update: .tokeignore 2025-06-02 04:06:56 +08:00
507f2c331e
add: several pages, synced with old frontend 2025-06-02 04:04:17 +08:00
a1a4abff46
fix: missing import in @cvsa/core 2025-06-01 21:31:37 +08:00
c6b7736dac
fix: import from the core package in next frontend 2025-06-01 18:48:50 +08:00
fa5ab258da
add: login status detection in frontend 2025-06-01 17:21:09 +08:00
9dd06fa7bc
add: set cookie after signing up 2025-06-01 16:18:01 +08:00
bb7f846305
improve: the structure of the error handling in sign up page 2025-06-01 14:36:55 +08:00
7f9563a2a6
ref: use axios for API requests
add: a useCaptcha hook
2025-06-01 01:53:06 +08:00
d0d9c21aba
update: as-nned URL prefix for i18n routing 2025-05-31 21:56:15 +08:00
44bc99dd9d
add: i18n 2025-05-31 21:29:47 +08:00
2c83b79881
update: termination condition to time-based in classifyVideosWorker 2025-05-31 12:23:01 +08:00
1a20d5afe0
update: schedule archive snapshots to next Saturday midnight
fix: no expire when acquiring lock for classifyVideos
ref: format
2025-05-31 12:13:56 +08:00
ae338f88ee
update: the error dialog 2025-05-31 11:47:45 +08:00
96903dec2b
fix: prevent scrolling of body when model dialog is shown 2025-05-30 03:24:37 +08:00
58b4e2613c
add: button components in dialog 2025-05-28 23:01:43 +08:00
2b0497c83a
fix: incorrect timezone when adding to bilibili metadata 2025-05-27 22:40:53 +08:00
3bc72720d1
fix: remove bull-board from all scripts to prevent crashing 2025-05-27 22:35:44 +08:00
557a013b42
add: some components, fonts 2025-05-27 22:33:28 +08:00
16cfae8bad
fix: unexpectedly set prop variant of TextField to required 2025-05-25 03:15:51 +08:00
cbd46d4030
improve: code style and some subtle changes 2025-05-25 03:15:10 +08:00
6b93a781b7
add: sign up page 2025-05-24 17:13:26 +08:00
fe2fd4fe36
feat: home page with header 2025-05-24 03:08:32 +08:00
dd70543594
init: next for frontend 2025-05-22 03:15:07 +08:00
1ff71ab241
improve: code quality 2025-05-19 00:13:01 +08:00
cf7a285f57
fix: incorrect import for psql.d.ts, remove unnecessary benchmark files 2025-05-19 00:11:53 +08:00
79a37d927a
fix: forget to specify type when collecting videos without active schedules;
missing return when eta too long for milestone snapshot
2025-05-19 00:10:33 +08:00
f003e77d52
add: text fields and button in signup page 2025-05-18 20:55:51 +08:00
4addadb035
update: the style in content.css 2025-05-18 15:34:12 +08:00
23917b2976
merge: branch 'feat/backend' into main 2025-05-18 03:53:22 +08:00
6d946f74df
update: rate limiter 2025-05-18 03:52:42 +08:00
c5ba673069
fix: several bugs in captcha rate limiting 2025-05-18 03:28:27 +08:00
fa5ccce83f
fix: incorrect import of type Psql 2025-05-18 00:36:39 +08:00
7786d66dbb
add: complete guarding under uCaptcha 2025-05-18 00:33:43 +08:00
b18b45078f
add: tokenID added to JWT in endpoint GET /captcha/:id/result 2025-05-17 02:15:05 +08:00
1633e56b1e
update: returns JWT in the /captcha/:id/result endpoint 2025-05-11 19:40:24 +08:00
a063f2401b
update: rename the captcha endpoint 2025-05-11 19:14:19 +08:00
44f68993a0
fix: missing env during building in frontend 2025-05-11 16:20:22 +08:00
980dd542ee
add: video info page in frontend 2025-05-11 03:57:26 +08:00
137c19d74e
ref: move from sliding window to token bucket in rate limiter 2025-05-11 01:50:02 +08:00
5fb1355346
ref: rename endpoint to POST /validation/session 2025-05-10 22:41:10 +08:00
8456bb7485
add: route for validation 2025-05-10 21:48:02 +08:00
01f5e57864
ref: dir structure of backend package 2025-05-10 00:20:20 +08:00
bf00918c00
merge: branch 'main' into feat/backend 2025-05-10 00:19:55 +08:00
2772849933
update: .gitignore 2025-04-29 04:52:58 +08:00
271 changed files with 16520 additions and 1969 deletions

2
.gitignore vendored
View File

@ -40,3 +40,5 @@ dist/
build/
docker-compose.yml
ucaptcha-config.yaml

View File

@ -1,6 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="TypeScriptCompiler">
<option name="useTypesFromServer" value="true" />
</component>
</project>

View File

@ -28,6 +28,8 @@
<excludeFolder url="file://$MODULE_DIR$/packages/crawler/logs" />
<excludeFolder url="file://$MODULE_DIR$/data" />
<excludeFolder url="file://$MODULE_DIR$/redis" />
<excludeFolder url="file://$MODULE_DIR$/ml" />
<excludeFolder url="file://$MODULE_DIR$/src" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />

View File

@ -6,3 +6,6 @@ data
*config*
Inter.css
MiSans.css
*.yaml
*.yml
*.mdx

View File

@ -1,5 +1,7 @@
FROM oven/bun
ARG BACKEND_URL
WORKDIR /app
COPY . .
@ -12,6 +14,7 @@ RUN bun run build
ENV HOST=0.0.0.0
ENV PORT=4321
ENV BACKEND_URL=${BACKEND_URL}
EXPOSE 4321

14
Dockerfile.next Normal file
View File

@ -0,0 +1,14 @@
FROM node:lts-slim AS production
WORKDIR /app
COPY ./packages/next/.next ./.next
COPY ./packages/next/public ./public
COPY ./packages/next/package.json ./package.json
COPY ./packages/next/node_modules ./node_modules
ENV NODE_ENV production
EXPOSE 7400
CMD ["npm", "start"]

506
bun.lock

File diff suppressed because it is too large Load Diff

View File

@ -1,16 +0,0 @@
{
"lock": false,
"workspace": ["./packages/crawler", "./packages/core"],
"nodeModulesDir": "auto",
"tasks": {
"crawler": "deno task --filter 'crawler' all",
"backend": "deno task --filter 'backend' start"
},
"fmt": {
"useTabs": true,
"lineWidth": 120,
"indentWidth": 4,
"semiColons": true,
"proseWrap": "always"
}
}

View File

@ -1,6 +1,6 @@
{
"name": "cvsa",
"version": "2.13.22",
"version": "3.15.34",
"private": false,
"type": "module",
"workspaces": [
@ -10,9 +10,12 @@
"packages/crawler"
],
"dependencies": {
"arg": "^5.0.2",
"postgres": "^3.4.5"
},
"devDependencies": {
"@types/bun": "^1.2.15",
"prettier": "^3.5.3",
"vite-tsconfig-paths": "^5.1.4",
"vitest": "^3.1.2",
"vitest-tsconfig-paths": "^3.4.1"

View File

@ -1,46 +0,0 @@
const requiredEnvVars = ["DB_HOST", "DB_NAME", "DB_USER", "DB_PASSWORD", "DB_PORT", "DB_NAME_CRED"];
const unsetVars = requiredEnvVars.filter((key) => process.env[key] === undefined);
if (unsetVars.length > 0) {
throw new Error(`Missing required environment variables: ${unsetVars.join(", ")}`);
}
const databaseHost = process.env["DB_HOST"]!;
const databaseName = process.env["DB_NAME"];
const databaseNameCred = process.env["DB_NAME_CRED"]!;
const databaseUser = process.env["DB_USER"]!;
const databasePassword = process.env["DB_PASSWORD"]!;
const databasePort = process.env["DB_PORT"]!;
export const postgresConfig = {
hostname: databaseHost,
port: parseInt(databasePort),
database: databaseName,
user: databaseUser,
password: databasePassword
};
export const postgresConfigNpm = {
host: databaseHost,
port: parseInt(databasePort),
database: databaseName,
username: databaseUser,
password: databasePassword
};
export const postgresCredConfigNpm = {
host: databaseHost,
port: parseInt(databasePort),
database: databaseNameCred,
username: databaseUser,
password: databasePassword
}
export const postgresConfigCred = {
hostname: databaseHost,
port: parseInt(databasePort),
database: databaseNameCred,
user: databaseUser,
password: databasePassword
};

View File

@ -1,5 +0,0 @@
import postgres from "postgres";
import { postgresConfigNpm, postgresCredConfigNpm } from "./config";
export const sql = postgres(postgresConfigNpm);
export const sqlCred = postgres(postgresCredConfigNpm)

View File

@ -0,0 +1,13 @@
import { sql } from "@core/db/dbNew";
import type { LatestSnapshotType } from "@core/db/schema.d.ts";
export async function getVideosInViewsRange(minViews: number, maxViews: number) {
return sql<LatestSnapshotType[]>`
SELECT *
FROM latest_video_snapshot
WHERE views >= ${minViews}
AND views <= ${maxViews}
ORDER BY views DESC
LIMIT 5000
`;
}

View File

@ -1,4 +1,4 @@
import { sql } from "./db";
import { sql } from "@core/db/dbNew";
import type { VideoSnapshotType } from "@core/db/schema.d.ts";
export async function getVideoSnapshots(

View File

@ -0,0 +1,62 @@
import { Psql } from "@core/db/psql";
import { SlidingWindow } from "@core/mq/slidingWindow.ts";
import { redis } from "@core/db/redis.ts";
import { getIdentifier } from "@/middleware/rateLimiters.ts";
import { Context } from "hono";
type seconds = number;
export interface CaptchaDifficultyConfig {
global: boolean;
duration: seconds;
threshold: number;
difficulty: number;
}
export const getCaptchaDifficultyConfigByRoute = async (sql: Psql, route: string): Promise<CaptchaDifficultyConfig[]> => {
return sql<CaptchaDifficultyConfig[]>`
SELECT duration, threshold, difficulty, global
FROM captcha_difficulty_settings
WHERE CONCAT(method, '-', path) = ${route}
ORDER BY duration
`;
};
export const getCaptchaConfigMaxDuration = async (sql: Psql, route: string): Promise<seconds> => {
const rows = await sql<{max: number}[]>`
SELECT MAX(duration)
FROM captcha_difficulty_settings
WHERE CONCAT(method, '-', path) = ${route}
`;
if (rows.length < 1){
return Number.MAX_SAFE_INTEGER;
}
return rows[0].max;
}
export const getCurrentCaptchaDifficulty = async (sql: Psql, c: Context | string): Promise<number | null> => {
const isRoute = typeof c === "string";
const route = isRoute ? c : `${c.req.method}-${c.req.path}`
const configs = await getCaptchaDifficultyConfigByRoute(sql, route);
if (configs.length < 1) {
return null
}
else if (configs.length == 1) {
return configs[0].difficulty
}
const maxDuration = configs.reduce((max, config) =>
Math.max(max, config.duration), 0);
const slidingWindow = new SlidingWindow(redis, maxDuration);
for (let i = 1; i < configs.length; i++) {
const config = configs[i];
const lastConfig = configs[i - 1];
const identifier = isRoute ? c : getIdentifier(c, config.global);
const count = await slidingWindow.count(`captcha-${identifier}`, config.duration);
if (count >= config.threshold) {
continue;
}
return lastConfig.difficulty
}
return configs[configs.length-1].difficulty;
}

View File

@ -0,0 +1,14 @@
import { ErrorResponse } from "src/schema";
export const getJWTsecret = () => {
const secret = process.env["JWT_SECRET"];
if (!secret) {
const response: ErrorResponse = {
message: "JWT_SECRET is not set",
code: "SERVER_ERROR",
errors: []
};
return [response, true];
}
return [secret, null];
};

View File

@ -0,0 +1,120 @@
import { Context, Next } from "hono";
import { ErrorResponse } from "src/schema";
import { SlidingWindow } from "@core/mq/slidingWindow.ts";
import { getCaptchaConfigMaxDuration, getCurrentCaptchaDifficulty } from "@/lib/auth/captchaDifficulty.ts";
import { sqlCred } from "@core/db/dbNew.ts";
import { redis } from "@core/db/redis.ts";
import { verify } from "hono/jwt";
import { JwtTokenInvalid, JwtTokenExpired } from "hono/utils/jwt/types";
import { getJWTsecret } from "@/lib/auth/getJWTsecret.ts";
import { lockManager } from "@core/mq/lockManager.ts";
import { object, string, number, ValidationError } from "yup";
import { getIdentifier } from "@/middleware/rateLimiters.ts";
const tokenSchema = object({
exp: number().integer(),
id: string().length(6),
difficulty: number().integer().moreThan(0)
});
export const captchaMiddleware = async (c: Context, next: Next) => {
const authHeader = c.req.header("Authorization");
if (!authHeader) {
const response: ErrorResponse = {
message: "'Authorization' header is missing.",
code: "UNAUTHORIZED",
errors: []
};
return c.json<ErrorResponse>(response, 401);
}
const authIsBearer = authHeader.startsWith("Bearer ");
if (!authIsBearer || authHeader.length < 8) {
const response: ErrorResponse = {
message: "'Authorization' header is invalid.",
code: "INVALID_HEADER",
errors: []
};
return c.json<ErrorResponse>(response, 400);
}
const [r, err] = getJWTsecret();
if (err) {
return c.json<ErrorResponse>(r as ErrorResponse, 500);
}
const jwtSecret = r as string;
const token = authHeader.substring(7);
const path = c.req.path;
const method = c.req.method;
const route = `${method}-${path}`;
const requiredDifficulty = await getCurrentCaptchaDifficulty(sqlCred, c);
try {
const decodedPayload = await verify(token, jwtSecret);
const payload = await tokenSchema.validate(decodedPayload);
const difficulty = payload.difficulty;
const tokenID = payload.id;
const consumed = await lockManager.isLocked(tokenID);
if (consumed) {
const response: ErrorResponse = {
message: "Token has already been used.",
code: "INVALID_CREDENTIALS",
errors: []
};
return c.json<ErrorResponse>(response, 401);
}
if (difficulty < requiredDifficulty) {
const response: ErrorResponse = {
message: "Token too weak.",
code: "UNAUTHORIZED",
errors: []
};
return c.json<ErrorResponse>(response, 401);
}
const EXPIRE_FIVE_MINUTES = 300;
await lockManager.acquireLock(tokenID, EXPIRE_FIVE_MINUTES);
} catch (e) {
if (e instanceof JwtTokenInvalid) {
const response: ErrorResponse = {
message: "Failed to verify the token.",
code: "INVALID_CREDENTIALS",
errors: []
};
return c.json<ErrorResponse>(response, 400);
} else if (e instanceof JwtTokenExpired) {
const response: ErrorResponse = {
message: "Token expired.",
code: "INVALID_CREDENTIALS",
errors: []
};
return c.json<ErrorResponse>(response, 400);
} else if (e instanceof ValidationError) {
const response: ErrorResponse = {
code: "INVALID_QUERY_PARAMS",
message: "Invalid query parameters",
errors: e.errors
};
return c.json<ErrorResponse>(response, 400);
} else {
const response: ErrorResponse = {
message: "Unknown error.",
code: "UNKNOWN_ERROR",
errors: []
};
return c.json<ErrorResponse>(response, 500);
}
}
const duration = await getCaptchaConfigMaxDuration(sqlCred, route);
const window = new SlidingWindow(redis, duration);
const identifierWithIP = getIdentifier(c, true);
const identifier = getIdentifier(c, false);
await window.event(`captcha-${identifier}`);
await window.event(`captcha-${identifierWithIP}`);
await next();
};

View File

@ -0,0 +1,14 @@
import { cors } from "hono/cors";
import { Context, Next } from "hono";
export const corsMiddleware = async (c: Context, next: Next) => {
if (c.req.path.startsWith("/user") || c.req.path.startsWith("/login")) {
const corsMiddlewareHandler = cors({
origin: c.req.header("Origin"),
credentials: true
});
return corsMiddlewareHandler(c, next);
}
const corsMiddlewareHandler = cors();
return corsMiddlewareHandler(c, next);
};

View File

@ -1,19 +0,0 @@
import { Hono } from "hono";
import { Variables } from "hono/types";
import { bodyLimitForPing } from "./bodyLimits.ts";
import { pingHandler } from "../routes/ping.ts";
import { registerRateLimiter } from "./rateLimiters.ts";
import { preetifyResponse } from "./preetifyResponse.ts";
import { logger } from "./logger.ts";
import { timing } from "hono/timing";
import { contentType } from "./contentType.ts";
export function configureMiddleWares(app: Hono<{Variables: Variables }>) {
app.use("*", contentType);
app.use(timing());
app.use("*", preetifyResponse);
app.use("*", logger({}));
app.post("/user", registerRateLimiter);
app.all("/ping", bodyLimitForPing, ...pingHandler);
}

View File

@ -77,7 +77,7 @@ const defaultFormatter = (params) => {
`${methodColor} ${params.method.padEnd(6)}${reset} ${params.path}`
);
};
type Ctx = Context
type Ctx = Context;
export const logger = (config) => {
const { formatter = defaultFormatter, output = console, skipPaths = [], skip = null } = config;

View File

@ -1,27 +1,53 @@
import { rateLimiter, Store } from "hono-rate-limiter";
import type { BlankEnv } from "hono/types";
import { MINUTE } from "@core/const/time.ts";
import { getConnInfo } from "hono/bun";
import type { Context } from "hono";
import { Context, Next } from "hono";
import { generateRandomId } from "@core/lib/randomID.ts";
import { RateLimiter } from "@koshnic/ratelimit";
import { ErrorResponse } from "@/src/schema";
import { redis } from "@core/db/redis.ts";
import { RedisStore } from "rate-limit-redis";
export const registerRateLimiter = rateLimiter<BlankEnv, "/user", {}>({
windowMs: 60 * MINUTE,
limit: 10,
standardHeaders: "draft-6",
keyGenerator: (c) => {
const info = getConnInfo(c as unknown as Context<BlankEnv, "/user", {}>);
if (!info.remote || !info.remote.address) {
return crypto.randomUUID();
export const getUserIP = (c: Context) => {
let ipAddr = null;
const info = getConnInfo(c);
if (info.remote && info.remote.address) {
ipAddr = info.remote.address;
}
const addr = info.remote.address;
const path = new URL(c.req.url).pathname;
const forwardedFor = c.req.header("X-Forwarded-For");
if (forwardedFor) {
ipAddr = forwardedFor.split(",")[0];
}
return ipAddr;
};
export const getIdentifier = (c: Context, includeIP: boolean = true) => {
let ipAddr = generateRandomId(6);
if (getUserIP(c)) {
ipAddr = getUserIP(c);
}
const path = c.req.path;
const method = c.req.method;
return `${method}-${path}@${addr}`;
},
store: new RedisStore({
// @ts-expect-error - Known issue: the `c`all` function is not present in @types/ioredis
sendCommand: (...args: string[]) => redis.call(...args)
}) as unknown as Store
});
const ipIdentifier = includeIP ? `@${ipAddr}` : "";
return `${method}-${path}${ipIdentifier}`;
};
export const registerRateLimiter = async (c: Context<BlankEnv, "/user", {}>, next: Next) => {
const limiter = new RateLimiter(redis);
const identifier = getIdentifier(c, true);
const { allowed, retryAfter } = await limiter.allow(identifier, {
burst: 5,
ratePerPeriod: 5,
period: 120,
cost: 1
});
if (!allowed) {
const response: ErrorResponse = {
message: `Too many requests, please retry after ${Math.round(retryAfter)} seconds.`,
code: "RATE_LIMIT_EXCEEDED",
errors: []
};
return c.json<ErrorResponse>(response, 429);
}
await next();
};

View File

@ -1,15 +1,21 @@
{
"name": "backend",
"name": "@cvsa/backend",
"private": false,
"version": "0.6.0",
"scripts": {
"format": "prettier --write .",
"dev": "NODE_ENV=development bun run --hot src/main.ts",
"start": "NODE_ENV=production bun run src/main.ts"
"start": "NODE_ENV=production bun run src/main.ts",
"build": "bun build ./src/main.ts --target bun --outdir ./dist"
},
"dependencies": {
"@koshnic/ratelimit": "^1.0.3",
"@rabbit-company/argon2id": "^2.1.0",
"chalk": "^5.4.1",
"hono": "^4.7.8",
"hono-rate-limiter": "^0.4.2",
"ioredis": "^5.6.1",
"limiter": "^3.0.0",
"postgres": "^3.4.5",
"rate-limit-redis": "^4.2.0",
"yup": "^1.6.1",
@ -18,5 +24,7 @@
"devDependencies": {
"@types/bun": "^1.2.11",
"prettier": "^3.5.3"
}
},
"main": "./dist/main.js",
"types": "./src/types.d.ts"
}

View File

@ -0,0 +1,99 @@
import { Context } from "hono";
import { Bindings, BlankEnv } from "hono/types";
import { ErrorResponse } from "src/schema";
import { createHandlers } from "src/utils.ts";
import { sign } from "hono/jwt";
import { generateRandomId } from "@core/lib/randomID.ts";
import { getJWTsecret } from "lib/auth/getJWTsecret.ts";
interface CaptchaResponse {
success: boolean;
difficulty?: number;
error?: string;
}
const getChallengeVerificationResult = async (id: string, ans: string) => {
const baseURL = process.env["UCAPTCHA_URL"];
const url = new URL(baseURL);
url.pathname = `/challenge/${id}/validation`;
return await fetch(url.toString(), {
method: "POST",
headers: {
"Content-Type": "application/json"
},
body: JSON.stringify({
y: ans
})
});
};
export const verifyChallengeHandler = createHandlers(
async (c: Context<BlankEnv & { Bindings: Bindings }, "/captcha/:id/result">) => {
const id = c.req.param("id");
const ans = c.req.query("ans");
if (!ans) {
const response: ErrorResponse = {
message: "Missing required query parameter: ans",
code: "INVALID_QUERY_PARAMS",
errors: []
};
return c.json<ErrorResponse>(response, 400);
}
const res = await getChallengeVerificationResult(id, ans);
const data: CaptchaResponse = await res.json();
if (data.error && res.status === 404) {
const response: ErrorResponse = {
message: data.error,
code: "ENTITY_NOT_FOUND",
i18n: {
key: "backend.error.captcha_not_found"
},
errors: []
};
return c.json<ErrorResponse>(response, 401);
} else if (data.error && res.status === 400) {
const response: ErrorResponse = {
message: data.error,
code: "INVALID_QUERY_PARAMS",
errors: []
};
return c.json<ErrorResponse>(response, 400);
} else if (data.error) {
const response: ErrorResponse = {
message: data.error,
code: "UNKNOWN_ERROR",
errors: []
};
return c.json<ErrorResponse>(response, 500);
}
if (!data.success) {
const response: ErrorResponse = {
message: "Incorrect answer",
code: "INVALID_CREDENTIALS",
errors: []
};
return c.json<ErrorResponse>(response, 401);
}
const [r, err] = getJWTsecret();
if (err) {
return c.json<ErrorResponse>(r as ErrorResponse, 500);
}
const jwtSecret = r as string;
const tokenID = generateRandomId(6);
const NOW = Math.floor(Date.now() / 1000);
const FIVE_MINUTES_LATER = NOW + 60 * 5;
const jwt = await sign(
{
difficulty: data.difficulty!,
id: tokenID,
exp: FIVE_MINUTES_LATER
},
jwtSecret
);
return c.json({
token: jwt
});
}
);

View File

@ -0,0 +1,44 @@
import { createHandlers } from "src/utils.ts";
import { object, string, ValidationError } from "yup";
import { ErrorResponse } from "src/schema";
import { getCurrentCaptchaDifficulty } from "@/lib/auth/captchaDifficulty.ts";
import { sqlCred } from "@core/db/dbNew.ts";
const queryParamsSchema = object({
route: string().matches(/(?:GET|POST|PUT|PATCH|DELETE)-\/.*/g)
});
export const getCaptchaDifficultyHandler = createHandlers(async (c) => {
try {
const queryParams = await queryParamsSchema.validate(c.req.query());
const { route } = queryParams;
const difficulty = await getCurrentCaptchaDifficulty(sqlCred, route);
if (!difficulty) {
const response: ErrorResponse<unknown> = {
code: "ENTITY_NOT_FOUND",
message: "No difficulty configs found for this route.",
errors: []
};
return c.json<ErrorResponse<unknown>>(response, 404);
}
return c.json({
difficulty: difficulty
});
} catch (e: unknown) {
if (e instanceof ValidationError) {
const response: ErrorResponse = {
code: "INVALID_QUERY_PARAMS",
message: "Invalid query parameters",
errors: e.errors
};
return c.json<ErrorResponse>(response, 400);
} else {
const response: ErrorResponse<unknown> = {
code: "UNKNOWN_ERROR",
message: "Unknown error",
errors: [e]
};
return c.json<ErrorResponse<unknown>>(response, 500);
}
}
});

View File

@ -0,0 +1,2 @@
export * from "./session/POST.ts";
export * from "./[id]/result/GET.ts";

View File

@ -0,0 +1,51 @@
import { createHandlers } from "src/utils.ts";
import { getCurrentCaptchaDifficulty } from "@/lib/auth/captchaDifficulty.ts";
import { sqlCred } from "@core/db/dbNew.ts";
import { object, string, ValidationError } from "yup";
import { CaptchaSessionResponse, ErrorResponse } from "@/src/schema";
import type { ContentfulStatusCode } from "hono/utils/http-status";
const bodySchema = object({
route: string().matches(/(?:GET|POST|PUT|PATCH|DELETE)-\/.*/g)
});
const createNewChallenge = async (difficulty: number) => {
const baseURL = process.env["UCAPTCHA_URL"];
const url = new URL(baseURL);
url.pathname = "/challenge";
return await fetch(url.toString(), {
method: "POST",
headers: {
"Content-Type": "application/json"
},
body: JSON.stringify({
difficulty: difficulty
})
});
};
export const createCaptchaSessionHandler = createHandlers(async (c) => {
try {
const requestBody = await bodySchema.validate(await c.req.json());
const { route } = requestBody;
const difficuly = await getCurrentCaptchaDifficulty(sqlCred, route);
const res = await createNewChallenge(difficuly);
return c.json<CaptchaSessionResponse | unknown>(await res.json(), res.status as ContentfulStatusCode);
} catch (e: unknown) {
if (e instanceof ValidationError) {
const response: ErrorResponse = {
code: "INVALID_QUERY_PARAMS",
message: "Invalid query parameters",
errors: e.errors
};
return c.json<ErrorResponse>(response, 400);
} else {
const response: ErrorResponse<unknown> = {
code: "UNKNOWN_ERROR",
message: "Unknown error",
errors: [e]
};
return c.json<ErrorResponse<unknown>>(response, 500);
}
}
});

View File

@ -1,17 +1,29 @@
import { rootHandler } from "./root/root.ts";
import { pingHandler } from "./ping.ts";
import { getSnapshotsHanlder } from "./snapshots.ts";
import { registerHandler } from "./user.ts";
import { videoInfoHandler } from "db/videoInfo.ts";
import { Hono } from "hono";
import { Variables } from "hono/types";
import { getSingerForBirthday, pickSinger, pickSpecialSinger, type Singer } from "lib/const/singers.ts";
import { VERSION } from "src/main.ts";
import { createHandlers } from "src/utils.ts";
export function configureRoutes(app: Hono<{Variables: Variables }>) {
app.get("/", ...rootHandler);
app.all("/ping", ...pingHandler);
app.get("/video/:id/snapshots", ...getSnapshotsHanlder);
app.post("/user", ...registerHandler);
app.get("/video/:id/info", ...videoInfoHandler);
}
export const rootHandler = createHandlers((c) => {
let singer: Singer | Singer[];
const shouldShowSpecialSinger = Math.random() < 0.016;
if (getSingerForBirthday().length !== 0) {
singer = JSON.parse(JSON.stringify(getSingerForBirthday())) as Singer[];
for (const s of singer) {
delete s.birthday;
s.message = `${s.name}生日快乐~`;
}
} else if (shouldShowSpecialSinger) {
singer = pickSpecialSinger();
} else {
singer = pickSinger();
}
return c.json({
project: {
name: "中V档案馆",
motto: "一起唱吧,心中的歌!"
},
status: 200,
version: VERSION,
time: Date.now(),
singer: singer
});
});

View File

@ -0,0 +1,105 @@
import { Context } from "hono";
import { Bindings, BlankEnv } from "hono/types";
import { ErrorResponse, LoginResponse } from "src/schema";
import { createHandlers } from "src/utils.ts";
import { sqlCred } from "@core/db/dbNew";
import { object, string, ValidationError } from "yup";
import { setCookie } from "hono/cookie";
import Argon2id from "@rabbit-company/argon2id";
import { createLoginSession } from "routes/user/POST";
import { UserType } from "@core/db/schema";
const LoginBodySchema = object({
username: string().trim().required("Username is required").max(50, "Username cannot exceed 50 characters"),
password: string().required("Password is required")
});
export const loginHandler = createHandlers(
async (c: Context<BlankEnv & { Bindings: Bindings }, "/user/session/:id">) => {
try {
const body = await LoginBodySchema.validate(await c.req.json());
const { username, password: submittedPassword } = body;
const result = await sqlCred<UserType[]>`
SELECT *
FROM users
WHERE username = ${username}
`;
if (result.length === 0) {
const response: ErrorResponse<string> = {
message: `User does not exist.`,
errors: [`User ${username} does not exist.`],
code: "ENTITY_NOT_FOUND"
};
return c.json<ErrorResponse<string>>(response, 400);
}
const storedPassword = result[0].password;
const uid = result[0].id;
const nickname = result[0].nickname;
const role = result[0].role;
const passwordAreSame = await Argon2id.verify(storedPassword, submittedPassword);
if (!passwordAreSame) {
const response: ErrorResponse<string> = {
message: "Incorrect password.",
errors: [],
i18n: {
key: "backend.error.incorrect_password"
},
code: "INVALID_CREDENTIALS"
};
return c.json<ErrorResponse<string>>(response, 401);
}
const sessionID = await createLoginSession(uid, c);
const response: LoginResponse = {
uid: uid,
username: username,
nickname: nickname,
role: role,
token: sessionID
};
const A_YEAR = 365 * 86400;
const isDev = process.env.NODE_ENV === "development";
setCookie(c, "session_id", sessionID, {
path: "/",
maxAge: A_YEAR,
domain: process.env.DOMAIN,
secure: isDev ? true : true,
sameSite: isDev ? "None" : "Lax",
httpOnly: true
});
return c.json<LoginResponse>(response, 200);
} catch (e) {
if (e instanceof ValidationError) {
const response: ErrorResponse<string> = {
message: "Invalid registration data.",
errors: e.errors,
code: "INVALID_PAYLOAD"
};
return c.json<ErrorResponse<string>>(response, 400);
} else if (e instanceof SyntaxError) {
const response: ErrorResponse<string> = {
message: "Invalid JSON payload.",
errors: [e.message],
code: "INVALID_FORMAT"
};
return c.json<ErrorResponse<string>>(response, 400);
} else {
const response: ErrorResponse<string> = {
message: "Unknown error.",
errors: [(e as Error).message],
code: "UNKNOWN_ERROR"
};
return c.json<ErrorResponse<string>>(response, 500);
}
}
}
);

View File

@ -1,24 +0,0 @@
import { getClientIP } from "middleware/logger.ts";
import { createHandlers } from "../src/utils.ts";
import { VERSION } from "../src/main.ts";
export const pingHandler = createHandlers(async (c) => {
const requestHeaders = c.req.raw.headers;
return c.json({
"message": "pong",
"request": {
"headers": requestHeaders,
"ip": getClientIP(c),
"mode": c.req.raw.mode,
"method": c.req.method,
"query": new URL(c.req.url).searchParams,
"body": await c.req.text(),
"url": c.req.raw.url
},
"response": {
"time": new Date().getTime(),
"status": 200,
"version": VERSION,
}
});
});

View File

@ -0,0 +1,24 @@
import { getClientIP } from "middleware/logger.ts";
import { createHandlers } from "src/utils.ts";
import { VERSION } from "src/main.ts";
export const pingHandler = createHandlers(async (c) => {
const requestHeaders = c.req.raw.headers;
return c.json({
message: "pong",
request: {
headers: requestHeaders,
ip: getClientIP(c),
mode: c.req.raw.mode,
method: c.req.method,
query: new URL(c.req.url).searchParams,
body: await c.req.text(),
url: c.req.raw.url
},
response: {
time: new Date().getTime(),
status: 200,
version: VERSION
}
});
});

View File

@ -1,29 +0,0 @@
import { getSingerForBirthday, pickSinger, pickSpecialSinger, type Singer } from "./singers.ts";
import { VERSION } from "../../src/main.ts";
import { createHandlers } from "../../src/utils.ts";
export const rootHandler = createHandlers((c) => {
let singer: Singer | Singer[];
const shouldShowSpecialSinger = Math.random() < 0.016;
if (getSingerForBirthday().length !== 0) {
singer = JSON.parse(JSON.stringify(getSingerForBirthday())) as Singer[];
for (const s of singer) {
delete s.birthday;
s.message = `${s.name}生日快乐~`;
}
} else if (shouldShowSpecialSinger) {
singer = pickSpecialSinger();
} else {
singer = pickSinger();
}
return c.json({
project: {
name: "中V档案馆",
motto: "一起唱吧,心中的歌!"
},
status: 200,
version: VERSION,
time: Date.now(),
singer: singer
});
});

View File

@ -0,0 +1,75 @@
import { Context } from "hono";
import { Bindings, BlankEnv } from "hono/types";
import { ErrorResponse } from "src/schema";
import { createHandlers } from "src/utils.ts";
import { sqlCred } from "@core/db/dbNew";
import { object, string, ValidationError } from "yup";
import { setCookie } from "hono/cookie";
const loginSessionExists = async (sessionID: string) => {
const result = await sqlCred`
SELECT 1
FROM login_sessions
WHERE id = ${sessionID}
`;
return result.length > 0;
};
export const logoutHandler = createHandlers(async (c: Context<BlankEnv & { Bindings: Bindings }, "/session/:id">) => {
try {
const session_id = c.req.param("id");
const exists = loginSessionExists(session_id);
if (!exists) {
const response: ErrorResponse<string> = {
message: "Cannot found given session_id.",
errors: [`Session ${session_id} not found`],
code: "ENTITY_NOT_FOUND"
};
return c.json<ErrorResponse<string>>(response, 404);
}
await sqlCred`
UPDATE login_sessions
SET deactivated_at = CURRENT_TIMESTAMP
WHERE id = ${session_id}
`;
const isDev = process.env.NODE_ENV === "development";
setCookie(c, "session_id", "", {
path: "/",
maxAge: 0,
domain: process.env.DOMAIN,
secure: isDev ? true : true,
sameSite: isDev ? "None" : "Lax",
httpOnly: true
});
return c.body(null, 204);
} catch (e) {
if (e instanceof ValidationError) {
const response: ErrorResponse<string> = {
message: "Invalid registration data.",
errors: e.errors,
code: "INVALID_PAYLOAD"
};
return c.json<ErrorResponse<string>>(response, 400);
} else if (e instanceof SyntaxError) {
const response: ErrorResponse<string> = {
message: "Invalid JSON payload.",
errors: [e.message],
code: "INVALID_FORMAT"
};
return c.json<ErrorResponse<string>>(response, 400);
} else {
const response: ErrorResponse<string> = {
message: "Unknown error.",
errors: [(e as Error).message],
code: "UNKNOWN_ERROR"
};
return c.json<ErrorResponse<string>>(response, 500);
}
}
});

View File

@ -0,0 +1 @@
export * from "./[id]/DELETE";

View File

@ -1,74 +0,0 @@
import { createHandlers } from "src/utils.ts";
import Argon2id from "@rabbit-company/argon2id";
import { object, string, ValidationError } from "yup";
import type { Context } from "hono";
import type { Bindings, BlankEnv, BlankInput } from "hono/types";
import { sqlCred } from "db/db.ts";
import { ErrorResponse, StatusResponse } from "src/schema";
const RegistrationBodySchema = object({
username: string().trim().required("Username is required").max(50, "Username cannot exceed 50 characters"),
password: string().required("Password is required"),
nickname: string().optional()
});
type ContextType = Context<BlankEnv & { Bindings: Bindings }, "/user", BlankInput>;
export const userExists = async (username: string) => {
const result = await sqlCred`
SELECT 1
FROM users
WHERE username = ${username}
`;
return result.length > 0;
};
export const registerHandler = createHandlers(async (c: ContextType) => {
try {
const body = await RegistrationBodySchema.validate(await c.req.json());
const { username, password, nickname } = body;
if (await userExists(username)) {
const response: StatusResponse = {
message: `User "${username}" already exists.`
};
return c.json<StatusResponse>(response, 400);
}
const hash = await Argon2id.hashEncoded(password);
await sqlCred`
INSERT INTO users (username, password, nickname)
VALUES (${username}, ${hash}, ${nickname ? nickname : null})
`;
const response: StatusResponse = {
message: `User '${username}' registered successfully.`
}
return c.json<StatusResponse>(response, 201);
} catch (e) {
if (e instanceof ValidationError) {
const response: ErrorResponse<string> = {
message: "Invalid registration data.",
errors: e.errors,
code: "INVALID_PAYLOAD"
}
return c.json<ErrorResponse<string>>(response, 400);
} else if (e instanceof SyntaxError) {
const response: ErrorResponse<string> = {
message: "Invalid JSON payload.",
errors: [e.message],
code: "INVALID_FORMAT"
}
return c.json<ErrorResponse<string>>(response, 400);
} else {
const response: ErrorResponse<string> = {
message: "Invalid JSON payload.",
errors: [(e as Error).message],
code: "UNKNOWN_ERR"
}
return c.json<ErrorResponse<string>>(response, 500);
}
}
});

View File

@ -0,0 +1,140 @@
import { createHandlers } from "src/utils.ts";
import Argon2id from "@rabbit-company/argon2id";
import { object, string, ValidationError } from "yup";
import type { Context } from "hono";
import type { Bindings, BlankEnv, BlankInput } from "hono/types";
import { sqlCred } from "@core/db/dbNew.ts";
import { ErrorResponse, SignUpResponse } from "src/schema";
import { generateRandomId } from "@core/lib/randomID";
import { getUserIP } from "@/middleware/rateLimiters";
import { setCookie } from "hono/cookie";
const RegistrationBodySchema = object({
username: string().trim().required("Username is required").max(50, "Username cannot exceed 50 characters"),
password: string().required("Password is required"),
nickname: string().optional()
});
type ContextType = Context<BlankEnv & { Bindings: Bindings }, "/user", BlankInput>;
export const userExists = async (username: string) => {
const result = await sqlCred`
SELECT 1
FROM users
WHERE username = ${username}
`;
return result.length > 0;
};
export const createLoginSession = async (uid: number, c: Context): Promise<string> => {
const ipAddress = getUserIP(c) || null;
const userAgent = c.req.header("User-Agent") || null;
const id = generateRandomId(24);
await sqlCred`
INSERT INTO login_sessions (id, uid, expire_at, ip_address, user_agent)
VALUES (${id}, ${uid}, CURRENT_TIMESTAMP + INTERVAL '1 year', ${ipAddress}, ${userAgent})
`;
return id;
};
const getUserIDByName = async (username: string) => {
const result = await sqlCred<{ id: number }[]>`
SELECT id
FROM users
WHERE username = ${username}
`;
if (result.length === 0) {
return null;
}
return result[0].id;
};
export const registerHandler = createHandlers(async (c: ContextType) => {
try {
const body = await RegistrationBodySchema.validate(await c.req.json());
const { username, password, nickname } = body;
if (await userExists(username)) {
const response: ErrorResponse = {
message: `User "${username}" already exists.`,
code: "ENTITY_EXISTS",
errors: [],
i18n: {
key: "backend.error.user_exists",
values: {
username: username
}
}
};
return c.json<ErrorResponse>(response, 400);
}
const hash = await Argon2id.hashEncoded(password);
await sqlCred`
INSERT INTO users (username, password, nickname)
VALUES (${username}, ${hash}, ${nickname ? nickname : null})
`;
const uid = await getUserIDByName(username);
if (!uid) {
const response: ErrorResponse<string> = {
message: "Cannot find registered user.",
errors: [`Cannot find user ${username} in table 'users'.`],
code: "ENTITY_NOT_FOUND",
i18n: {
key: "backend.error.user_not_found_after_register",
values: {
username: username
}
}
};
return c.json<ErrorResponse<string>>(response, 500);
}
const sessionID = await createLoginSession(uid, c);
const response: SignUpResponse = {
username: username,
token: sessionID
};
const A_YEAR = 365 * 86400;
const isDev = process.env.NODE_ENV === "development";
setCookie(c, "session_id", sessionID, {
path: "/",
maxAge: A_YEAR,
domain: process.env.DOMAIN,
secure: isDev ? false : true,
sameSite: "Lax",
httpOnly: true
});
return c.json<SignUpResponse>(response, 201);
} catch (e) {
if (e instanceof ValidationError) {
const response: ErrorResponse<string> = {
message: "Invalid registration data.",
errors: e.errors,
code: "INVALID_PAYLOAD"
};
return c.json<ErrorResponse<string>>(response, 400);
} else if (e instanceof SyntaxError) {
const response: ErrorResponse<string> = {
message: "Invalid JSON payload.",
errors: [e.message],
code: "INVALID_FORMAT"
};
return c.json<ErrorResponse<string>>(response, 400);
} else {
const response: ErrorResponse<string> = {
message: "Unknown error.",
errors: [(e as Error).message],
code: "UNKNOWN_ERROR"
};
return c.json<ErrorResponse<string>>(response, 500);
}
}
});

View File

@ -0,0 +1,2 @@
export * from "./POST.ts";
export * from "./session/[id]/GET.ts";

View File

@ -0,0 +1,32 @@
import { Context } from "hono";
import { Bindings, BlankEnv } from "hono/types";
import { ErrorResponse } from "src/schema";
import { createHandlers } from "src/utils.ts";
import { sqlCred } from "@core/db/dbNew";
import { UserType } from "@core/db/schema";
export const getUserByLoginSessionHandler = createHandlers(
async (c: Context<BlankEnv & { Bindings: Bindings }, "/user/session/:id">) => {
const id = c.req.param("id");
const users = await sqlCred<UserType[]>`
SELECT u.*
FROM users u
JOIN login_sessions ls ON u.id = ls.uid
WHERE ls.id = ${id};
`;
if (users.length === 0) {
const response: ErrorResponse = {
message: "Cannot find user",
code: "ENTITY_NOT_FOUND",
errors: []
};
return c.json<ErrorResponse>(response, 404);
}
const user = users[0];
return c.json({
username: user.username,
nickname: user.nickname,
role: user.role
});
}
);

View File

@ -1,15 +1,15 @@
import logger from "@core/log/logger.ts";
import { redis } from "@core/db/redis.ts";
import { sql } from "./db.ts";
import { sql } from "@core/db/dbNew.ts";
import { number, ValidationError } from "yup";
import { createHandlers } from "../src/utils.ts";
import { createHandlers } from "@/src/utils.ts";
import { getVideoInfo, getVideoInfoByBV } from "@core/net/getVideoInfo.ts";
import { idSchema } from "../routes/snapshots.ts";
import { idSchema } from "./snapshots.ts";
import { NetSchedulerError } from "@core/net/delegate.ts";
import type { Context } from "hono";
import type { BlankEnv, BlankInput } from "hono/types";
import type { VideoInfoData } from "@core/net/bilibili.d.ts";
import { startTime, endTime } from 'hono/timing'
import { startTime, endTime } from "hono/timing";
const CACHE_EXPIRATION_SECONDS = 60;
@ -34,7 +34,7 @@ async function insertVideoSnapshot(data: VideoInfoData) {
}
export const videoInfoHandler = createHandlers(async (c: ContextType) => {
startTime(c, 'parse', 'Parse the request');
startTime(c, "parse", "Parse the request");
try {
const id = await idSchema.validate(c.req.param("id"));
let videoId: string | number = id as string;
@ -45,33 +45,33 @@ export const videoInfoHandler = createHandlers(async (c: ContextType) => {
}
const cacheKey = `cvsa:videoInfo:${videoId}`;
endTime(c, 'parse');
startTime(c, 'cache', 'Check for cached data');
endTime(c, "parse");
startTime(c, "cache", "Check for cached data");
const cachedData = await redis.get(cacheKey);
endTime(c, 'cache');
endTime(c, "cache");
if (cachedData) {
return c.json(JSON.parse(cachedData));
}
startTime(c, 'net', 'Fetch data');
startTime(c, "net", "Fetch data");
let result: VideoInfoData | number;
if (typeof videoId === "number") {
result = await getVideoInfo(videoId, "getVideoInfo");
} else {
result = await getVideoInfoByBV(videoId, "getVideoInfo");
}
endTime(c, 'net');
endTime(c, "net");
if (typeof result === "number") {
return c.json({ message: "Error fetching video info", code: result }, 500);
}
startTime(c, 'db', 'Write data to database');
startTime(c, "db", "Write data to database");
await redis.setex(cacheKey, CACHE_EXPIRATION_SECONDS, JSON.stringify(result));
await insertVideoSnapshot(result);
endTime(c, 'db');
endTime(c, "db");
return c.json(result);
} catch (e) {
if (e instanceof ValidationError) {

View File

@ -1,10 +1,10 @@
import type { Context } from "hono";
import { createHandlers } from "../src/utils.ts";
import { createHandlers } from "src/utils.ts";
import type { BlankEnv, BlankInput } from "hono/types";
import { getVideoSnapshots, getVideoSnapshotsByBV } from "../db/videoSnapshot.ts";
import { getVideoSnapshots, getVideoSnapshotsByBV } from "db/snapshots.ts";
import type { VideoSnapshotType } from "@core/db/schema.d.ts";
import { boolean, mixed, number, object, ValidationError } from "yup";
import { ErrorResponse } from "../src/schema";
import { ErrorResponse } from "src/schema";
import { startTime, endTime } from "hono/timing";
const SnapshotQueryParamsSchema = object({
@ -96,7 +96,7 @@ export const getSnapshotsHanlder = createHandlers(async (c: ContextType) => {
return c.json<ErrorResponse<string>>(response, 400);
} else {
const response: ErrorResponse<unknown> = {
code: "UNKNOWN_ERR",
code: "UNKNOWN_ERROR",
message: "Unhandled error",
errors: [e]
};

View File

@ -0,0 +1,2 @@
export * from "./[id]/info";
export * from "./[id]/snapshots";

View File

@ -0,0 +1,65 @@
import type { Context } from "hono";
import { createHandlers } from "src/utils.ts";
import type { BlankEnv, BlankInput } from "hono/types";
import { number, object, ValidationError } from "yup";
import { ErrorResponse } from "src/schema";
import { startTime, endTime } from "hono/timing";
import { getVideosInViewsRange } from "@/db/latestSnapshots";
const SnapshotQueryParamsSchema = object({
min_views: number().integer().optional().positive(),
max_views: number().integer().optional().positive()
});
type ContextType = Context<BlankEnv, "/videos", BlankInput>;
export const getVideosHanlder = createHandlers(async (c: ContextType) => {
startTime(c, "parse", "Parse the request");
try {
const queryParams = await SnapshotQueryParamsSchema.validate(c.req.query());
const { min_views, max_views } = queryParams;
if (!min_views && !max_views) {
const response: ErrorResponse<string> = {
code: "INVALID_QUERY_PARAMS",
message: "Invalid query parameters",
errors: ["Must provide one of these query parameters: min_views, max_views"]
};
return c.json<ErrorResponse<string>>(response, 400);
}
endTime(c, "parse");
startTime(c, "db", "Query the database");
const minViews = min_views ? min_views : 0;
const maxViews = max_views ? max_views : 2147483647;
const result = await getVideosInViewsRange(minViews, maxViews);
endTime(c, "db");
const rows = result.map((row) => ({
...row,
aid: Number(row.aid)
}));
return c.json(rows);
} catch (e: unknown) {
if (e instanceof ValidationError) {
const response: ErrorResponse<string> = {
code: "INVALID_QUERY_PARAMS",
message: "Invalid query parameters",
errors: e.errors
};
return c.json<ErrorResponse<string>>(response, 400);
} else {
const response: ErrorResponse<unknown> = {
code: "UNKNOWN_ERROR",
message: "Unhandled error",
errors: [e]
};
return c.json<ErrorResponse<unknown>>(response, 500);
}
}
});

View File

@ -0,0 +1 @@
export * from "./GET.ts";

View File

@ -1,8 +1,8 @@
import { Hono } from "hono";
import type { TimingVariables } from "hono/timing";
import { startServer } from "./startServer.ts";
import { configureRoutes } from "routes";
import { configureMiddleWares } from "middleware";
import { configureRoutes } from "./routing.ts";
import { configureMiddleWares } from "./middleware.ts";
import { notFoundRoute } from "routes/404.ts";
type Variables = TimingVariables;
@ -15,4 +15,4 @@ configureRoutes(app);
await startServer(app);
export const VERSION = "0.4.6";
export const VERSION = "0.6.0";

View File

@ -0,0 +1,24 @@
import { Hono } from "hono";
import { timing } from "hono/timing";
import { Variables } from "hono/types";
import { pingHandler } from "routes/ping";
import { logger } from "middleware/logger.ts";
import { corsMiddleware } from "@/middleware/cors";
import { contentType } from "middleware/contentType.ts";
import { captchaMiddleware } from "middleware/captcha.ts";
import { bodyLimitForPing } from "middleware/bodyLimits.ts";
import { registerRateLimiter } from "middleware/rateLimiters.ts";
import { preetifyResponse } from "middleware/preetifyResponse.ts";
export function configureMiddleWares(app: Hono<{ Variables: Variables }>) {
app.use("*", corsMiddleware);
app.use("*", contentType);
app.use(timing());
app.use("*", preetifyResponse);
app.use("*", logger({}));
app.post("/user", registerRateLimiter);
app.post("/user", captchaMiddleware);
app.all("/ping", bodyLimitForPing, ...pingHandler);
}

View File

@ -0,0 +1,32 @@
import { rootHandler } from "routes";
import { pingHandler } from "routes/ping";
import { getUserByLoginSessionHandler, registerHandler } from "routes/user";
import { videoInfoHandler, getSnapshotsHanlder } from "routes/video";
import { Hono } from "hono";
import { Variables } from "hono/types";
import { createCaptchaSessionHandler, verifyChallengeHandler } from "routes/captcha";
import { getCaptchaDifficultyHandler } from "routes/captcha/difficulty/GET.ts";
import { getVideosHanlder } from "@/routes/videos";
import { loginHandler } from "@/routes/login/session/POST";
import { logoutHandler } from "@/routes/session";
export function configureRoutes(app: Hono<{ Variables: Variables }>) {
app.get("/", ...rootHandler);
app.all("/ping", ...pingHandler);
app.get("/videos", ...getVideosHanlder);
app.get("/video/:id/snapshots", ...getSnapshotsHanlder);
app.get("/video/:id/info", ...videoInfoHandler);
app.post("/login/session", ...loginHandler);
app.delete("/session/:id", ...logoutHandler);
app.post("/user", ...registerHandler);
app.get("/user/session/:id", ...getUserByLoginSessionHandler);
app.post("/captcha/session", ...createCaptchaSessionHandler);
app.get("/captcha/:id/result", ...verifyChallengeHandler);
app.get("/captcha/difficulty", ...getCaptchaDifficultyHandler);
}

View File

@ -1,11 +1,66 @@
type ErrorCode = "INVALID_QUERY_PARAMS" | "UNKNOWN_ERR" | "INVALID_PAYLOAD" | "INVALID_FORMAT" | "BODY_TOO_LARGE";
export type ErrorCode =
| "INVALID_QUERY_PARAMS"
| "UNKNOWN_ERROR"
| "INVALID_PAYLOAD"
| "INVALID_FORMAT"
| "INVALID_HEADER"
| "BODY_TOO_LARGE"
| "UNAUTHORIZED"
| "INVALID_CREDENTIALS"
| "ENTITY_NOT_FOUND"
| "SERVER_ERROR"
| "RATE_LIMIT_EXCEEDED"
| "ENTITY_EXISTS";
export interface ErrorResponse<E> {
code: ErrorCode
export interface ErrorResponse<E = string> {
code: ErrorCode;
message: string;
errors: E[];
errors: E[] = [];
i18n?: {
key: string;
values?: {
[key: string]: string | number | Date;
};
};
}
export interface StatusResponse {
message: string;
}
export type CaptchaSessionResponse = ErrorResponse | CaptchaSessionRawResponse;
interface CaptchaSessionRawResponse {
success: boolean;
id: string;
g: string;
n: string;
t: number;
}
export interface LoginResponse {
uid: number;
username: string;
nickname: string | null;
role: string;
token: string;
}
export interface SignUpResponse {
username: string;
token: string;
}
export interface UserResponse {
username: string;
nickname: string | null;
role: string;
}
export type CaptchaVerificationRawResponse = {
token: string;
}
export type CaptchaVerificationResponse =
| ErrorResponse
| CaptchaVerificationRawResponse;

View File

@ -32,7 +32,7 @@ function logStartup(hostname: string, port: number, wasAutoIncremented: boolean,
console.log("\nPress Ctrl+C to quit.");
}
export async function startServer(app: Hono<{Variables: Variables }>) {
export async function startServer(app: Hono<{ Variables: Variables }>) {
const NODE_ENV = process.env.NODE_ENV || "production";
const HOST = process.env.HOST ?? (NODE_ENV === "development" ? "0.0.0.0" : "127.0.0.1");
const PORT = process.env.PORT ? parseInt(process.env.PORT, 10) : undefined;

1
packages/backend/src/types.d.ts vendored Normal file
View File

@ -0,0 +1 @@
export * from "./schema";

View File

@ -10,6 +10,7 @@
"skipLibCheck": true,
"paths": {
"@core/*": ["../core/*"],
"@/*": ["./*"],
"@crawler/*": ["../crawler/*"]
},
"allowSyntheticDefaultImports": true,

View File

@ -1,6 +1,8 @@
import postgres from "postgres";
import { postgresConfigNpm } from "./pgConfigNew";
import { postgresConfigCred, postgresConfig } from "./pgConfigNew";
export const sql = postgres(postgresConfigNpm);
export const sql = postgres(postgresConfig);
export const sqlTest = postgres(postgresConfigNpm);
export const sqlCred = postgres(postgresConfigCred);
export const sqlTest = postgres(postgresConfig);

View File

@ -1,27 +1,23 @@
const requiredEnvVars = ["DB_HOST", "DB_NAME", "DB_USER", "DB_PASSWORD", "DB_PORT", "DB_NAME_CRED"];
const unsetVars = requiredEnvVars.filter((key) => process.env[key] === undefined);
const getEnvVar = (key: string) => {
return process.env[key] || import.meta.env[key];
};
const unsetVars = requiredEnvVars.filter((key) => getEnvVar(key) === undefined);
if (unsetVars.length > 0) {
throw new Error(`Missing required environment variables: ${unsetVars.join(", ")}`);
}
const databaseHost = process.env["DB_HOST"]!;
const databaseName = process.env["DB_NAME"];
const databaseNameCred = process.env["DB_NAME_CRED"]!;
const databaseUser = process.env["DB_USER"]!;
const databasePassword = process.env["DB_PASSWORD"]!;
const databasePort = process.env["DB_PORT"]!;
const databaseHost = getEnvVar("DB_HOST")!;
const databaseName = getEnvVar("DB_NAME");
const databaseNameCred = getEnvVar("DB_NAME_CRED")!;
const databaseUser = getEnvVar("DB_USER")!;
const databasePassword = getEnvVar("DB_PASSWORD")!;
const databasePort = getEnvVar("DB_PORT")!;
export const postgresConfig = {
hostname: databaseHost,
port: parseInt(databasePort),
database: databaseName,
user: databaseUser,
password: databasePassword
};
export const postgresConfigNpm = {
host: databaseHost,
port: parseInt(databasePort),
database: databaseName,

View File

@ -1,3 +1,3 @@
import type postgres from "postgres";
export type Psql = postgres.Sql<{}>;
export type Psql = postgres.Sql;

View File

@ -1,16 +1,3 @@
export interface AllDataType {
id: number;
aid: number;
bvid: string | null;
description: string | null;
uid: number | null;
tags: string | null;
title: string | null;
published_at: string | null;
duration: number;
created_at: string | null;
}
export interface BiliUserType {
id: number;
uid: number;
@ -21,7 +8,7 @@ export interface BiliUserType {
export interface VideoSnapshotType {
id: number;
created_at: string;
created_at: Date;
views: number;
coins: number;
likes: number;
@ -48,8 +35,33 @@ export interface SnapshotScheduleType {
id: number;
aid: number;
type?: string;
created_at: string;
started_at?: string;
finished_at?: string;
created_at: Date;
started_at?: Date;
finished_at?: Date;
status: string;
}
export interface UserType {
id: number;
username: string;
nickname: string | null;
password: string;
unq_id: string;
role: string;
created_at: Date;
}
export interface BiliVideoMetadataType {
id: number;
aid: number;
bvid: string | null;
description: string | null;
uid: number | null;
tags: string | null;
title: string | null;
published_at: Date | null;
duration: number | null;
created_at: Date;
status: number;
cover_url: string | null;
}

View File

@ -1,62 +0,0 @@
import type { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import type { VideoSnapshotType } from "./schema.d.ts";
export async function getVideoSnapshots(
client: Client,
aid: number,
limit: number,
pageOrOffset: number,
reverse: boolean,
mode: "page" | "offset" = "page",
) {
const offset = mode === "page" ? (pageOrOffset - 1) * limit : pageOrOffset;
const queryDesc: string = `
SELECT *
FROM video_snapshot
WHERE aid = $1
ORDER BY created_at DESC
LIMIT $2
OFFSET $3
`;
const queryAsc: string = `
SELECT *
FROM video_snapshot
WHERE aid = $1
ORDER BY created_at
LIMIT $2 OFFSET $3
`;
const query = reverse ? queryAsc : queryDesc;
const queryResult = await client.queryObject<VideoSnapshotType>(query, [aid, limit, offset]);
return queryResult.rows;
}
export async function getVideoSnapshotsByBV(
client: Client,
bv: string,
limit: number,
pageOrOffset: number,
reverse: boolean,
mode: "page" | "offset" = "page",
) {
const offset = mode === "page" ? (pageOrOffset - 1) * limit : pageOrOffset;
const queryAsc = `
SELECT vs.*
FROM video_snapshot vs
JOIN bilibili_metadata bm ON vs.aid = bm.aid
WHERE bm.bvid = $1
ORDER BY vs.created_at
LIMIT $2
OFFSET $3
`;
const queryDesc: string = `
SELECT *
FROM video_snapshot vs
JOIN bilibili_metadata bm ON vs.aid = bm.aid
WHERE bm.bvid = $1
ORDER BY vs.created_at DESC
LIMIT $2 OFFSET $3
`;
const query = reverse ? queryAsc : queryDesc;
const queryResult = await client.queryObject<VideoSnapshotType>(query, [bv, limit, offset]);
return queryResult.rows;
}

View File

@ -1,17 +0,0 @@
{
"name": "@cvsa/core",
"exports": "./main.ts",
"imports": {
"ioredis": "npm:ioredis",
"log/": "./log/",
"db/": "./db/",
"$std/": "https://deno.land/std@0.216.0/",
"mq/": "./mq/",
"chalk": "npm:chalk",
"winston": "npm:winston",
"logform": "npm:logform",
"@core/": "./",
"child_process": "node:child_process",
"util": "node:util"
}
}

1
packages/core/index.ts Normal file
View File

@ -0,0 +1 @@
export * from "./db/dbNew";

View File

@ -0,0 +1,15 @@
export function generateRandomId(length: number): string {
const characters = 'abcdefghijkmnpqrstuvwxyzABCDEFGHJKLMNPQRSTUVWXYZ23456789';
const charactersLength = characters.length;
const randomBytes = new Uint8Array(length);
crypto.getRandomValues(randomBytes);
let result = '';
for (let i = 0; i < length; i++) {
const randomIndex = randomBytes[i] % charactersLength;
result += characters.charAt(randomIndex);
}
return result;
}

View File

@ -1 +0,0 @@
export const DB_VERSION = 10;

View File

@ -1,5 +1,5 @@
import { Redis } from "ioredis";
import { redis } from "../../core/db/redis.ts";
import { redis } from "@core/db/redis.ts";
class LockManager {
private redis: Redis;

View File

@ -0,0 +1,55 @@
import { RateLimiter as Limiter } from "@koshnic/ratelimit";
import { redis } from "@core/db/redis.ts";
export interface RateLimiterConfig {
duration: number;
max: number;
}
export class RateLimiterError extends Error {
public code: string;
constructor(message: string) {
super(message);
this.name = "RateLimiterError";
this.code = "RATE_LIMIT_EXCEEDED";
}
}
export class MultipleRateLimiter {
private readonly name: string;
private readonly configs: RateLimiterConfig[] = [];
private readonly limiter: Limiter;
/*
* @param name The name of the rate limiter
* @param configs The configuration of the rate limiter, containing:
* - duration: The duration of window in seconds
* - max: The maximum number of tokens allowed in the window
*/
constructor(
name: string,
configs: RateLimiterConfig[]
) {
this.configs = configs;
this.limiter = new Limiter(redis);
this.name = name;
}
/*
* Trigger an event in the rate limiter
*/
async trigger(shouldThrow = true): Promise<void> {
for (let i = 0; i < this.configs.length; i++) {
const { duration, max } = this.configs[i];
const { allowed } = await this.limiter.allow(`cvsa:${this.name}_${i}`, {
burst: max,
ratePerPeriod: max,
period: duration,
cost: 1
});
if (!allowed && shouldThrow) {
throw new RateLimiterError("Rate limit exceeded")
}
}
}
}

View File

@ -1,56 +0,0 @@
import type { SlidingWindow } from "./slidingWindow.ts";
export interface RateLimiterConfig {
window: SlidingWindow;
max: number;
}
export class RateLimiter {
private readonly configs: RateLimiterConfig[];
private readonly configEventNames: string[];
/*
* @param name The name of the rate limiter
* @param configs The configuration of the rate limiter, containing:
* - window: The sliding window to use
* - max: The maximum number of events allowed in the window
*/
constructor(name: string, configs: RateLimiterConfig[]) {
this.configs = configs;
this.configEventNames = configs.map((_, index) => `${name}_config_${index}`);
}
/*
* Check if the event has reached the rate limit
*/
async getAvailability(): Promise<boolean> {
for (let i = 0; i < this.configs.length; i++) {
const config = this.configs[i];
const eventName = this.configEventNames[i];
const count = await config.window.count(eventName);
if (count >= config.max) {
return false;
}
}
return true;
}
/*
* Trigger an event in the rate limiter
*/
async trigger(): Promise<void> {
for (let i = 0; i < this.configs.length; i++) {
const config = this.configs[i];
const eventName = this.configEventNames[i];
await config.window.event(eventName);
}
}
async clear(): Promise<void> {
for (let i = 0; i < this.configs.length; i++) {
const config = this.configs[i];
const eventName = this.configEventNames[i];
await config.window.clear(eventName);
}
}
}

View File

@ -32,14 +32,20 @@ export class SlidingWindow {
/*
* Count the number of events in the sliding window
* @param eventName The name of the event
* @param {string} eventName The name of the event
* @param {number} [duration] The duration of the window in seconds
*/
async count(eventName: string): Promise<number> {
async count(eventName: string, duration?: number): Promise<number> {
const key = `cvsa:sliding_window:${eventName}`;
const now = Date.now();
// Remove timestamps outside the window
await this.redis.zremrangebyscore(key, 0, now - this.windowSize);
if (duration) {
return this.redis.zcount(key, now - duration * 1000, now);
}
// Get the number of timestamps in the window
return this.redis.zcard(key);
}

View File

@ -38,6 +38,10 @@ interface VideoInfoData {
ctime: number;
desc: string;
desc_v2: string;
tname: string;
tid: number;
tid_v2: number;
tname_v2: string;
state: number;
duration: number;
owner: {

View File

@ -1,9 +1,7 @@
import logger from "@core/log/logger.ts";
import { RateLimiter, type RateLimiterConfig } from "mq/rateLimiter.ts";
import { SlidingWindow } from "mq/slidingWindow.ts";
import { redis } from "db/redis.ts";
import { MultipleRateLimiter, RateLimiterError, type RateLimiterConfig } from "@core/mq/multipleRateLimiter.ts";
import { ReplyError } from "ioredis";
import { SECOND } from "../const/time.ts";
import { SECOND } from "@core/const/time.ts";
import { spawn, SpawnOptions } from "child_process";
export function spawnPromise(
@ -73,11 +71,11 @@ export class NetSchedulerError extends Error {
}
type LimiterMap = {
[name: string]: RateLimiter;
[name: string]: MultipleRateLimiter;
};
type OptionalLimiterMap = {
[name: string]: RateLimiter | null;
[name: string]: MultipleRateLimiter | null;
};
type TaskMap = {
@ -121,20 +119,23 @@ class NetworkDelegate {
const proxies = this.getTaskProxies(taskName);
for (const proxyName of proxies) {
const limiterId = "proxy-" + proxyName + "-" + taskName;
this.proxyLimiters[limiterId] = config ? new RateLimiter(limiterId, config) : null;
this.proxyLimiters[limiterId] = config ? new MultipleRateLimiter(limiterId, config) : null;
}
}
async triggerLimiter(task: string, proxy: string): Promise<void> {
async triggerLimiter(task: string, proxy: string, force: boolean = false): Promise<void> {
const limiterId = "proxy-" + proxy + "-" + task;
const providerLimiterId = "provider-" + proxy + "-" + this.tasks[task].provider;
try {
await this.proxyLimiters[limiterId]?.trigger();
await this.providerLimiters[providerLimiterId]?.trigger();
await this.proxyLimiters[limiterId]?.trigger(!force);
await this.providerLimiters[providerLimiterId]?.trigger(!force);
} catch (e) {
const error = e as Error;
if (e instanceof ReplyError) {
logger.error(error, "redis");
} else if (e instanceof RateLimiterError) {
// Re-throw it to ensure this.request can catch it
throw e;
}
logger.warn(`Unhandled error: ${error.message}`, "mq", "proxyRequest");
}
@ -149,7 +150,7 @@ class NetworkDelegate {
}
for (const proxyName of bindProxies) {
const limiterId = "provider-" + proxyName + "-" + providerName;
this.providerLimiters[limiterId] = new RateLimiter(limiterId, config);
this.providerLimiters[limiterId] = new MultipleRateLimiter(limiterId, config);
}
}
@ -168,9 +169,15 @@ class NetworkDelegate {
// find a available proxy
const proxiesNames = this.getTaskProxies(task);
for (const proxyName of shuffleArray(proxiesNames)) {
if (await this.getProxyAvailability(proxyName, task)) {
try {
return await this.proxyRequest<R>(url, proxyName, task, method);
}
catch (e) {
if (e instanceof RateLimiterError) {
continue;
}
throw e;
}
}
throw new NetSchedulerError("No proxy is available currently.", "NO_PROXY_AVAILABLE");
}
@ -202,16 +209,8 @@ class NetworkDelegate {
throw new NetSchedulerError(`Proxy "${proxyName}" not found`, "PROXY_NOT_FOUND");
}
if (!force) {
const isAvailable = await this.getProxyAvailability(proxyName, task);
const limiter = "proxy-" + proxyName + "-" + task;
if (!isAvailable) {
throw new NetSchedulerError(`Proxy "${limiter}" is rate limited`, "PROXY_RATE_LIMITED");
}
}
await this.triggerLimiter(task, proxyName, force);
const result = await this.makeRequest<R>(url, proxy, method);
await this.triggerLimiter(task, proxyName);
return result;
}
@ -226,32 +225,6 @@ class NetworkDelegate {
}
}
private async getProxyAvailability(proxyName: string, taskName: string): Promise<boolean> {
try {
const task = this.tasks[taskName];
const provider = task.provider;
const proxyLimiterId = "proxy-" + proxyName + "-" + task;
const providerLimiterId = "provider-" + proxyName + "-" + provider;
if (!this.proxyLimiters[proxyLimiterId]) {
const providerLimiter = this.providerLimiters[providerLimiterId];
return await providerLimiter.getAvailability();
}
const proxyLimiter = this.proxyLimiters[proxyLimiterId];
const providerLimiter = this.providerLimiters[providerLimiterId];
const providerAvailable = await providerLimiter.getAvailability();
const proxyAvailable = await proxyLimiter.getAvailability();
return providerAvailable && proxyAvailable;
} catch (e) {
const error = e as Error;
if (e instanceof ReplyError) {
logger.error(error, "redis");
return false;
}
logger.error(error, "mq", "getProxyAvailability");
return false;
}
}
private async nativeRequest<R>(url: string, method: string): Promise<R> {
try {
const controller = new AbortController();
@ -316,37 +289,37 @@ class NetworkDelegate {
const networkDelegate = new NetworkDelegate();
const videoInfoRateLimiterConfig: RateLimiterConfig[] = [
{
window: new SlidingWindow(redis, 0.3),
duration: 0.3,
max: 1,
},
{
window: new SlidingWindow(redis, 3),
duration: 3,
max: 5,
},
{
window: new SlidingWindow(redis, 30),
duration: 30,
max: 30,
},
{
window: new SlidingWindow(redis, 2 * 60),
duration: 2 * 60,
max: 50,
},
];
const biliLimiterConfig: RateLimiterConfig[] = [
{
window: new SlidingWindow(redis, 1),
duration: 1,
max: 6,
},
{
window: new SlidingWindow(redis, 5),
duration: 5,
max: 20,
},
{
window: new SlidingWindow(redis, 30),
duration: 30,
max: 100,
},
{
window: new SlidingWindow(redis, 5 * 60),
duration: 5 * 60,
max: 200,
},
];

View File

@ -1,6 +1,13 @@
{
"name": "core",
"name": "@cvsa/core",
"private": false,
"version": "0.0.10",
"scripts": {
"test": "bun --env-file=.env.test run vitest",
"build": "bun build ./index.ts --target node --outdir ./dist"
},
"dependencies": {
"@koshnic/ratelimit": "^1.0.3",
"chalk": "^5.4.1",
"ioredis": "^5.6.1",
"logform": "^2.7.0",
@ -9,5 +16,7 @@
},
"devDependencies": {
"@types/ioredis": "^5.0.0"
}
},
"main": "./dist/index.js",
"types": "./types.d.ts"
}

View File

@ -0,0 +1,18 @@
import { describe, expect, it } from "vitest";
import { generateRandomId } from "@core/lib/randomID.ts";
describe("generateRandomId", () => {
it("should generate an ID of the specified length", () => {
const length = 15;
const id = generateRandomId(length);
expect(id).toHaveLength(length);
});
it("should generate an ID containing only allowed characters", () => {
const allowedChars = "abcdefghijkmnpqrstuvwxyzABCDEFGHJKLMNPQRSTUVWXYZ23456789";
const id = generateRandomId(20);
for (const char of id) {
expect(allowedChars).toContain(char);
}
});
});

3
packages/core/types.d.ts vendored Normal file
View File

@ -0,0 +1,3 @@
export * from "./db/schema";
export * from "./index";
export * from "./net/bilibili";

View File

@ -1,5 +1,5 @@
import type { Psql } from "global.d.ts";
import { AllDataType, BiliUserType } from "@core/db/schema";
import type { Psql } from "@core/db/psql.d.ts";
import { BiliVideoMetadataType, BiliUserType } from "@core/db/schema";
import { AkariModelVersion } from "ml/const";
export async function videoExistsInAllData(sql: Psql, aid: number) {
@ -35,7 +35,7 @@ export async function getVideoInfoFromAllData(sql: Psql, aid: number) {
`;
const row = rows[0];
let authorInfo = "";
if (row.uid && await userExistsInBiliUsers(sql, row.uid)) {
if (row.uid && (await userExistsInBiliUsers(sql, row.uid))) {
const userRows = await sql<BiliUserType[]>`
SELECT * FROM bilibili_user WHERE uid = ${row.uid}
`;
@ -48,7 +48,7 @@ export async function getVideoInfoFromAllData(sql: Psql, aid: number) {
title: row.title,
description: row.description,
tags: row.tags,
author_info: authorInfo,
author_info: authorInfo
};
}

View File

@ -1,6 +1,6 @@
import { LatestSnapshotType } from "@core/db/schema";
import { SnapshotNumber } from "mq/task/getVideoStats.ts";
import type { Psql } from "global.d.ts";
import type { Psql } from "@core/db/psql.d.ts";
export async function getVideosNearMilestone(sql: Psql) {
const queryResult = await sql<LatestSnapshotType[]>`
@ -10,19 +10,19 @@ export async function getVideosNearMilestone(sql: Psql) {
WHERE
(views >= 50000 AND views < 100000) OR
(views >= 900000 AND views < 1000000) OR
(views >= 9900000 AND views < 10000000)
(views >= CEIL(views::float/1000000::float)*1000000-100000 AND views < CEIL(views::float/1000000::float)*1000000)
UNION
SELECT ls.*
FROM latest_video_snapshot ls
WHERE
(views >= 90000 AND views < 100000) OR
(views >= 900000 AND views < 1000000) OR
(views >= 9900000 AND views < 10000000)
(views >= CEIL(views::float/1000000::float)*1000000-100000 AND views < CEIL(views::float/1000000::float)*1000000)
`;
return queryResult.map((row) => {
return {
...row,
aid: Number(row.aid),
aid: Number(row.aid)
};
});
}
@ -40,7 +40,7 @@ export async function getLatestVideoSnapshot(sql: Psql, aid: number): Promise<nu
return {
...row,
aid: Number(row.aid),
time: new Date(row.time).getTime(),
time: new Date(row.time).getTime()
};
})[0];
}

View File

@ -4,7 +4,7 @@ import { MINUTE } from "@core/const/time.ts";
import { redis } from "@core/db/redis.ts";
import { Redis } from "ioredis";
import { parseTimestampFromPsql } from "../utils/formatTimestampToPostgre.ts";
import type { Psql } from "global.d.ts";
import type { Psql } from "@core/db/psql.d.ts";
const REDIS_KEY = "cvsa:snapshot_window_counts";
@ -63,18 +63,6 @@ export async function snapshotScheduleExists(sql: Psql, id: number) {
return rows.length > 0;
}
export async function videoHasActiveSchedule(sql: Psql, aid: number) {
const rows = await sql<{ status: string }[]>`
SELECT status
FROM snapshot_schedule
WHERE aid = ${aid}
AND (status = 'pending'
OR status = 'processing'
)
`
return rows.length > 0;
}
export async function videoHasActiveScheduleWithType(sql: Psql, aid: number, type: string) {
const rows = await sql<{ status: string }[]>`
SELECT status FROM snapshot_schedule
@ -91,7 +79,7 @@ export async function videoHasProcessingSchedule(sql: Psql, aid: number) {
FROM snapshot_schedule
WHERE aid = ${aid}
AND status = 'processing'
`
`;
return rows.length > 0;
}
@ -102,7 +90,7 @@ export async function bulkGetVideosWithoutProcessingSchedules(sql: Psql, aids: n
WHERE aid = ANY(${aids})
AND status != 'processing'
GROUP BY aid
`
`;
return rows.map((row) => Number(row.aid));
}
@ -194,7 +182,8 @@ export async function scheduleSnapshot(
aid: number,
type: string,
targetTime: number,
force: boolean = false
force: boolean = false,
adjustTime: boolean = true
) {
let adjustedTime = new Date(targetTime);
const hashActiveSchedule = await videoHasActiveScheduleWithType(sql, aid, type);
@ -216,7 +205,7 @@ export async function scheduleSnapshot(
}
}
if (hashActiveSchedule && !force) return;
if (type !== "milestone" && type !== "new") {
if (type !== "milestone" && type !== "new" && adjustTime) {
adjustedTime = await adjustSnapshotTime(new Date(targetTime), 2000, redis);
}
logger.log(`Scheduled snapshot for ${aid} at ${adjustedTime.toISOString()}`, "mq", "fn:scheduleSnapshot");
@ -236,10 +225,11 @@ export async function bulkScheduleSnapshot(
aids: number[],
type: string,
targetTime: number,
force: boolean = false
force: boolean = false,
adjustTime: boolean = true
) {
for (const aid of aids) {
await scheduleSnapshot(sql, aid, type, targetTime, force);
await scheduleSnapshot(sql, aid, type, targetTime, force, adjustTime);
}
}
@ -292,23 +282,23 @@ export async function adjustSnapshotTime(
}
export async function getSnapshotsInNextSecond(sql: Psql) {
const rows = await sql<SnapshotScheduleType[]>`
return sql<SnapshotScheduleType[]>`
SELECT *
FROM snapshot_schedule
WHERE started_at <= NOW() + INTERVAL '1 seconds' AND status = 'pending' AND type != 'normal'
ORDER BY
CASE
WHERE started_at <= NOW() + INTERVAL '1 seconds'
AND status = 'pending'
AND type != 'normal'
ORDER BY CASE
WHEN type = 'milestone' THEN 0
ELSE 1
END,
started_at
LIMIT 10;
`
return rows;
`;
}
export async function getBulkSnapshotsInNextSecond(sql: Psql) {
const rows = await sql<SnapshotScheduleType[]>`
return sql<SnapshotScheduleType[]>`
SELECT *
FROM snapshot_schedule
WHERE (started_at <= NOW() + INTERVAL '15 seconds')
@ -320,38 +310,34 @@ export async function getBulkSnapshotsInNextSecond(sql: Psql) {
END,
started_at
LIMIT 1000;
`
return rows;
`;
}
export async function setSnapshotStatus(sql: Psql, id: number, status: string) {
return await sql`
UPDATE snapshot_schedule SET status = ${status} WHERE id = ${id}
return sql`
UPDATE snapshot_schedule
SET status = ${status}
WHERE id = ${id}
`;
}
export async function bulkSetSnapshotStatus(sql: Psql, ids: number[], status: string) {
return await sql`
UPDATE snapshot_schedule SET status = ${status} WHERE id = ANY(${ids})
return sql`
UPDATE snapshot_schedule
SET status = ${status}
WHERE id = ANY (${ids})
`;
}
export async function getVideosWithoutActiveSnapshotSchedule(sql: Psql) {
export async function getVideosWithoutActiveSnapshotScheduleByType(sql: Psql, type: string) {
const rows = await sql<{ aid: string }[]>`
SELECT s.aid
FROM songs s
LEFT JOIN snapshot_schedule ss ON s.aid = ss.aid AND (ss.status = 'pending' OR ss.status = 'processing')
LEFT JOIN snapshot_schedule ss ON
s.aid = ss.aid AND
(ss.status = 'pending' OR ss.status = 'processing') AND
ss.type = ${type}
WHERE ss.aid IS NULL
`;
return rows.map((r) => Number(r.aid));
}
export async function getAllVideosWithoutActiveSnapshotSchedule(psql: Psql) {
const rows = await psql<{ aid: number }[]>`
SELECT s.aid
FROM bilibili_metadata s
LEFT JOIN snapshot_schedule ss ON s.aid = ss.aid AND (ss.status = 'pending' OR ss.status = 'processing')
WHERE ss.aid IS NULL
`
return rows.map((r) => Number(r.aid));
}

View File

@ -1,4 +1,4 @@
import type { Psql } from "global.d.ts";
import type { Psql } from "@core/db/psql.d.ts";
import { parseTimestampFromPsql } from "utils/formatTimestampToPostgre.ts";
export async function getNotCollectedSongs(sql: Psql) {

View File

@ -16,8 +16,8 @@ class AkariProto extends AIManager {
constructor() {
super();
this.models = {
"classifier": onnxClassifierPath,
"embedding": onnxEmbeddingPath,
classifier: onnxClassifierPath,
embedding: onnxEmbeddingPath
};
}
@ -55,7 +55,7 @@ class AkariProto extends AIManager {
const { input_ids } = await tokenizer(texts, {
add_special_tokens: false,
return_tensor: false,
return_tensor: false
});
const cumsum = (arr: number[]): number[] =>
@ -66,9 +66,9 @@ class AkariProto extends AIManager {
const inputs = {
input_ids: new ort.Tensor("int64", new BigInt64Array(flattened_input_ids.map(BigInt)), [
flattened_input_ids.length,
flattened_input_ids.length
]),
offsets: new ort.Tensor("int64", new BigInt64Array(offsets.map(BigInt)), [offsets.length]),
offsets: new ort.Tensor("int64", new BigInt64Array(offsets.map(BigInt)), [offsets.length])
};
const { embeddings } = await session.run(inputs);
@ -77,21 +77,14 @@ class AkariProto extends AIManager {
private async runClassification(embeddings: number[]): Promise<number[]> {
const session = this.getModelSession("classifier");
const inputTensor = new ort.Tensor(
Float32Array.from(embeddings),
[1, 3, 1024],
);
const inputTensor = new ort.Tensor(Float32Array.from(embeddings), [1, 3, 1024]);
const { logits } = await session.run({ channel_features: inputTensor });
return this.softmax(logits.data as Float32Array);
}
public async classifyVideo(title: string, description: string, tags: string, aid?: number): Promise<number> {
const embeddings = await this.getJinaEmbeddings1024([
title,
description,
tags,
]);
const embeddings = await this.getJinaEmbeddings1024([title, description, tags]);
const probabilities = await this.runClassification(embeddings);
if (aid) {
logger.log(`Prediction result for aid: ${aid}: [${probabilities.map((p) => p.toFixed(5))}]`, "ml");

View File

@ -1,179 +0,0 @@
import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers";
import * as ort from "onnxruntime";
function softmax(logits: Float32Array): number[] {
const maxLogit = Math.max(...logits);
const exponents = logits.map((logit) => Math.exp(logit - maxLogit));
const sumOfExponents = exponents.reduce((sum, exp) => sum + exp, 0);
return Array.from(exponents.map((exp) => exp / sumOfExponents));
}
// 配置参数
const sentenceTransformerModelName = "alikia2x/jina-embedding-v3-m2v-1024";
const onnxClassifierPath = "./model/video_classifier_v3_17.onnx";
const onnxEmbeddingPath = "./model/embedding_original.onnx";
const testDataPath = "./data/filter/test1.jsonl";
// 初始化会话
const [sessionClassifier, sessionEmbedding] = await Promise.all([
ort.InferenceSession.create(onnxClassifierPath),
ort.InferenceSession.create(onnxEmbeddingPath),
]);
let tokenizer: PreTrainedTokenizer;
// 初始化分词器
async function loadTokenizer() {
const tokenizerConfig = { local_files_only: true };
tokenizer = await AutoTokenizer.from_pretrained(sentenceTransformerModelName, tokenizerConfig);
}
// 新的嵌入生成函数使用ONNX
async function getONNXEmbeddings(texts: string[], session: ort.InferenceSession): Promise<number[]> {
const { input_ids } = await tokenizer(texts, {
add_special_tokens: false,
return_tensor: false,
});
// 构造输入参数
const cumsum = (arr: number[]): number[] =>
arr.reduce((acc: number[], num: number, i: number) => [...acc, num + (acc[i - 1] || 0)], []);
const offsets: number[] = [0, ...cumsum(input_ids.slice(0, -1).map((x: string) => x.length))];
const flattened_input_ids = input_ids.flat();
// 准备ONNX输入
const inputs = {
input_ids: new ort.Tensor("int64", new BigInt64Array(flattened_input_ids.map(BigInt)), [
flattened_input_ids.length,
]),
offsets: new ort.Tensor("int64", new BigInt64Array(offsets.map(BigInt)), [offsets.length]),
};
// 执行推理
const { embeddings } = await session.run(inputs);
return Array.from(embeddings.data as Float32Array);
}
// 分类推理函数
async function runClassification(embeddings: number[]): Promise<number[]> {
const inputTensor = new ort.Tensor(
Float32Array.from(embeddings),
[1, 3, 1024],
);
const { logits } = await sessionClassifier.run({ channel_features: inputTensor });
return softmax(logits.data as Float32Array);
}
// 指标计算函数
function calculateMetrics(labels: number[], predictions: number[], elapsedTime: number): {
accuracy: number;
precision: number;
recall: number;
f1: number;
"Class 0 Prec": number;
speed: string;
} {
// 输出label和prediction不一样的index列表
const arr = [];
for (let i = 0; i < labels.length; i++) {
if (labels[i] !== predictions[i] && predictions[i] == 0) {
arr.push([i + 1, labels[i], predictions[i]]);
}
}
console.log(arr);
// 初始化混淆矩阵
const classCount = Math.max(...labels, ...predictions) + 1;
const matrix = Array.from({ length: classCount }, () => Array.from({ length: classCount }, () => 0));
// 填充矩阵
labels.forEach((trueLabel, i) => {
matrix[trueLabel][predictions[i]]++;
});
// 计算各指标
let totalTP = 0, totalFP = 0, totalFN = 0;
for (let c = 0; c < classCount; c++) {
const TP = matrix[c][c];
const FP = matrix.flatMap((row, i) => i === c ? [] : [row[c]]).reduce((a, b) => a + b, 0);
const FN = matrix[c].filter((_, i) => i !== c).reduce((a, b) => a + b, 0);
totalTP += TP;
totalFP += FP;
totalFN += FN;
}
const precision = totalTP / (totalTP + totalFP);
const recall = totalTP / (totalTP + totalFN);
const f1 = 2 * (precision * recall) / (precision + recall) || 0;
// 计算Class 0 Precision
const class0TP = matrix[0][0];
const class0FP = matrix.flatMap((row, i) => i === 0 ? [] : [row[0]]).reduce((a, b) => a + b, 0);
const class0Precision = class0TP / (class0TP + class0FP) || 0;
return {
accuracy: labels.filter((l, i) => l === predictions[i]).length / labels.length,
precision,
recall,
f1,
speed: `${(labels.length / (elapsedTime / 1000)).toFixed(1)} samples/sec`,
"Class 0 Prec": class0Precision,
};
}
// 改造后的评估函数
async function evaluateModel(session: ort.InferenceSession): Promise<{
accuracy: number;
precision: number;
recall: number;
f1: number;
"Class 0 Prec": number;
}> {
const data = await Deno.readTextFile(testDataPath);
const samples = data.split("\n")
.map((line) => {
try {
return JSON.parse(line);
} catch {
return null;
}
})
.filter(Boolean);
const allPredictions: number[] = [];
const allLabels: number[] = [];
const t = new Date().getTime();
for (const sample of samples) {
try {
const embeddings = await getONNXEmbeddings([
sample.title,
sample.description,
sample.tags.join(","),
], session);
const probabilities = await runClassification(embeddings);
allPredictions.push(probabilities.indexOf(Math.max(...probabilities)));
allLabels.push(sample.label);
} catch (error) {
console.error("Processing error:", error);
}
}
const elapsed = new Date().getTime() - t;
return calculateMetrics(allLabels, allPredictions, elapsed);
}
// 主函数
async function main() {
await loadTokenizer();
const metrics = await evaluateModel(sessionEmbedding);
console.log("Model Metrics:");
console.table(metrics);
}
await main();

View File

@ -6,8 +6,7 @@ export class AIManager {
public sessions: { [key: string]: ort.InferenceSession } = {};
public models: { [key: string]: string } = {};
constructor() {
}
constructor() {}
public async init() {
const modelKeys = Object.keys(this.models);

View File

@ -1,171 +0,0 @@
import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers";
import * as ort from "onnxruntime";
function softmax(logits: Float32Array): number[] {
const maxLogit = Math.max(...logits);
const exponents = logits.map((logit) => Math.exp(logit - maxLogit));
const sumOfExponents = exponents.reduce((sum, exp) => sum + exp, 0);
return Array.from(exponents.map((exp) => exp / sumOfExponents));
}
// 配置参数
const sentenceTransformerModelName = "alikia2x/jina-embedding-v3-m2v-1024";
const onnxClassifierPath = "./model/video_classifier_v3_11.onnx";
const onnxEmbeddingOriginalPath = "./model/embedding_original.onnx";
const onnxEmbeddingQuantizedPath = "./model/embedding_original.onnx";
// 初始化会话
const [sessionClassifier, sessionEmbeddingOriginal, sessionEmbeddingQuantized] = await Promise.all([
ort.InferenceSession.create(onnxClassifierPath),
ort.InferenceSession.create(onnxEmbeddingOriginalPath),
ort.InferenceSession.create(onnxEmbeddingQuantizedPath),
]);
let tokenizer: PreTrainedTokenizer;
// 初始化分词器
async function loadTokenizer() {
const tokenizerConfig = { local_files_only: true };
tokenizer = await AutoTokenizer.from_pretrained(sentenceTransformerModelName, tokenizerConfig);
}
// 新的嵌入生成函数使用ONNX
async function getONNXEmbeddings(texts: string[], session: ort.InferenceSession): Promise<number[]> {
const { input_ids } = await tokenizer(texts, {
add_special_tokens: false,
return_tensor: false,
});
// 构造输入参数
const cumsum = (arr: number[]): number[] =>
arr.reduce((acc: number[], num: number, i: number) => [...acc, num + (acc[i - 1] || 0)], []);
const offsets: number[] = [0, ...cumsum(input_ids.slice(0, -1).map((x: string) => x.length))];
const flattened_input_ids = input_ids.flat();
// 准备ONNX输入
const inputs = {
input_ids: new ort.Tensor("int64", new BigInt64Array(flattened_input_ids.map(BigInt)), [
flattened_input_ids.length,
]),
offsets: new ort.Tensor("int64", new BigInt64Array(offsets.map(BigInt)), [offsets.length]),
};
// 执行推理
const { embeddings } = await session.run(inputs);
return Array.from(embeddings.data as Float32Array);
}
// 分类推理函数
async function runClassification(embeddings: number[]): Promise<number[]> {
const inputTensor = new ort.Tensor(
Float32Array.from(embeddings),
[1, 4, 1024],
);
const { logits } = await sessionClassifier.run({ channel_features: inputTensor });
return softmax(logits.data as Float32Array);
}
// 指标计算函数
function calculateMetrics(labels: number[], predictions: number[], elapsedTime: number): {
accuracy: number;
precision: number;
recall: number;
f1: number;
speed: string;
} {
// 初始化混淆矩阵
const classCount = Math.max(...labels, ...predictions) + 1;
const matrix = Array.from({ length: classCount }, () => Array.from({ length: classCount }, () => 0));
// 填充矩阵
labels.forEach((trueLabel, i) => {
matrix[trueLabel][predictions[i]]++;
});
// 计算各指标
let totalTP = 0, totalFP = 0, totalFN = 0;
for (let c = 0; c < classCount; c++) {
const TP = matrix[c][c];
const FP = matrix.flatMap((row, i) => i === c ? [] : [row[c]]).reduce((a, b) => a + b, 0);
const FN = matrix[c].filter((_, i) => i !== c).reduce((a, b) => a + b, 0);
totalTP += TP;
totalFP += FP;
totalFN += FN;
}
const precision = totalTP / (totalTP + totalFP);
const recall = totalTP / (totalTP + totalFN);
const f1 = 2 * (precision * recall) / (precision + recall) || 0;
return {
accuracy: labels.filter((l, i) => l === predictions[i]).length / labels.length,
precision,
recall,
f1,
speed: `${(labels.length / (elapsedTime / 1000)).toFixed(1)} samples/sec`,
};
}
// 改造后的评估函数
async function evaluateModel(session: ort.InferenceSession): Promise<{
accuracy: number;
precision: number;
recall: number;
f1: number;
}> {
const data = await Deno.readTextFile("./data/filter/test1.jsonl");
const samples = data.split("\n")
.map((line) => {
try {
return JSON.parse(line);
} catch {
return null;
}
})
.filter(Boolean);
const allPredictions: number[] = [];
const allLabels: number[] = [];
const t = new Date().getTime();
for (const sample of samples) {
try {
const embeddings = await getONNXEmbeddings([
sample.title,
sample.description,
sample.tags.join(","),
sample.author_info,
], session);
const probabilities = await runClassification(embeddings);
allPredictions.push(probabilities.indexOf(Math.max(...probabilities)));
allLabels.push(sample.label);
} catch (error) {
console.error("Processing error:", error);
}
}
const elapsed = new Date().getTime() - t;
return calculateMetrics(allLabels, allPredictions, elapsed);
}
// 主函数
async function main() {
await loadTokenizer();
// 评估原始模型
const originalMetrics = await evaluateModel(sessionEmbeddingOriginal);
console.log("Original Model Metrics:");
console.table(originalMetrics);
// 评估量化模型
const quantizedMetrics = await evaluateModel(sessionEmbeddingQuantized);
console.log("Quantized Model Metrics:");
console.table(quantizedMetrics);
}
await main();

View File

@ -1,11 +1,28 @@
import { Job } from "bullmq";
import { getAllVideosWithoutActiveSnapshotSchedule, scheduleSnapshot } from "db/snapshotSchedule.ts";
import { getVideosWithoutActiveSnapshotScheduleByType, scheduleSnapshot } from "db/snapshotSchedule.ts";
import logger from "@core/log/logger.ts";
import { lockManager } from "mq/lockManager.ts";
import { lockManager } from "@core/mq/lockManager.ts";
import { getLatestVideoSnapshot } from "db/snapshot.ts";
import { HOUR, MINUTE } from "@core/const/time.ts";
import { MINUTE } from "@core/const/time.ts";
import { sql } from "@core/db/dbNew";
function getNextSaturdayMidnightTimestamp(): number {
const now = new Date();
const currentDay = now.getDay();
let daysUntilNextSaturday = (6 - currentDay + 7) % 7;
if (daysUntilNextSaturday === 0) {
daysUntilNextSaturday = 7;
}
const nextSaturday = new Date(now);
nextSaturday.setDate(nextSaturday.getDate() + daysUntilNextSaturday);
nextSaturday.setHours(0, 0, 0, 0);
return nextSaturday.getTime();
}
export const archiveSnapshotsWorker = async (_job: Job) => {
try {
const startedAt = Date.now();
@ -14,21 +31,22 @@ export const archiveSnapshotsWorker = async (_job: Job) => {
return;
}
await lockManager.acquireLock("dispatchArchiveSnapshots", 30 * 60);
const aids = await getAllVideosWithoutActiveSnapshotSchedule(sql);
const aids = await getVideosWithoutActiveSnapshotScheduleByType(sql, "archive");
for (const rawAid of aids) {
const aid = Number(rawAid);
const latestSnapshot = await getLatestVideoSnapshot(sql, aid);
const now = Date.now();
const lastSnapshotedAt = latestSnapshot?.time ?? now;
const interval = 168;
const nextSatMidnight = getNextSaturdayMidnightTimestamp();
const interval = nextSatMidnight - now;
logger.log(
`Scheduled archive snapshot for aid ${aid} in ${interval} hours.`,
"mq",
"fn:archiveSnapshotsWorker"
);
const targetTime = lastSnapshotedAt + interval * HOUR;
const targetTime = lastSnapshotedAt + interval;
await scheduleSnapshot(sql, aid, "archive", targetTime);
if (now - startedAt > 250 * MINUTE) {
if (now - startedAt > 30 * MINUTE) {
return;
}
}

View File

@ -3,7 +3,7 @@ import { getUnlabelledVideos, getVideoInfoFromAllData, insertVideoLabel } from "
import Akari from "ml/akari.ts";
import { ClassifyVideoQueue } from "mq/index.ts";
import logger from "@core/log/logger.ts";
import { lockManager } from "mq/lockManager.ts";
import { lockManager } from "@core/mq/lockManager.ts";
import { aidExistsInSongs } from "db/songs.ts";
import { insertIntoSongs } from "mq/task/collectSongs.ts";
import { scheduleSnapshot } from "db/snapshotSchedule.ts";
@ -34,7 +34,7 @@ export const classifyVideoWorker = async (job: Job) => {
await job.updateData({
...job.data,
label: label,
label: label
});
return 0;
@ -46,19 +46,19 @@ export const classifyVideosWorker = async () => {
return;
}
await lockManager.acquireLock("classifyVideos");
await lockManager.acquireLock("classifyVideos", 5 * 60);
const videos = await getUnlabelledVideos(sql);
logger.log(`Found ${videos.length} unlabelled videos`);
let i = 0;
const startTime = new Date().getTime();
for (const aid of videos) {
if (i > 200) {
const now = new Date().getTime();
if (now - startTime > 4.2 * MINUTE) {
await lockManager.releaseLock("classifyVideos");
return 10000 + i;
return 1;
}
await ClassifyVideoQueue.add("classifyVideo", { aid: Number(aid) });
i++;
}
await lockManager.releaseLock("classifyVideos");
return 0;

View File

@ -1,7 +1,7 @@
import { Job } from "bullmq";
import { collectSongs } from "mq/task/collectSongs.ts";
export const collectSongsWorker = async (_job: Job): Promise<void> =>{
export const collectSongsWorker = async (_job: Job): Promise<void> => {
await collectSongs();
return;
}
};

View File

@ -16,8 +16,8 @@ export const dispatchMilestoneSnapshotsWorker = async (_job: Job) => {
if (eta > 144) continue;
const now = Date.now();
const scheduledNextSnapshotDelay = eta * HOUR;
const maxInterval = 1 * HOUR;
const minInterval = 1 * SECOND;
const maxInterval = 1.2 * HOUR;
const minInterval = 2 * SECOND;
const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval);
const targetTime = now + delay;
await scheduleSnapshot(sql, aid, "milestone", targetTime);
@ -25,5 +25,5 @@ export const dispatchMilestoneSnapshotsWorker = async (_job: Job) => {
}
} catch (e) {
logger.error(e as Error, "mq", "fn:dispatchMilestoneSnapshotsWorker");
};
}
}
};

View File

@ -1,10 +1,10 @@
import { Job } from "bullmq";
import { getLatestVideoSnapshot } from "db/snapshot.ts";
import { truncate } from "utils/truncate.ts";
import { getVideosWithoutActiveSnapshotSchedule, scheduleSnapshot } from "db/snapshotSchedule.ts";
import { getVideosWithoutActiveSnapshotScheduleByType, scheduleSnapshot } from "db/snapshotSchedule.ts";
import logger from "@core/log/logger.ts";
import { HOUR, MINUTE, WEEK } from "@core/const/time.ts";
import { lockManager } from "mq/lockManager.ts";
import { lockManager } from "@core/mq/lockManager.ts";
import { getRegularSnapshotInterval } from "mq/task/regularSnapshotInterval.ts";
import { sql } from "@core/db/dbNew.ts";
@ -17,7 +17,7 @@ export const dispatchRegularSnapshotsWorker = async (_job: Job): Promise<void> =
}
await lockManager.acquireLock("dispatchRegularSnapshots", 30 * 60);
const aids = await getVideosWithoutActiveSnapshotSchedule(sql);
const aids = await getVideosWithoutActiveSnapshotScheduleByType(sql, "normal");
for (const rawAid of aids) {
const aid = Number(rawAid);
const latestSnapshot = await getLatestVideoSnapshot(sql, aid);

View File

@ -2,6 +2,6 @@ import { sql } from "@core/db/dbNew";
import { Job } from "bullmq";
import { queueLatestVideos } from "mq/task/queueLatestVideo.ts";
export const getLatestVideosWorker = async (_job: Job): Promise<void> =>{
export const getLatestVideosWorker = async (_job: Job): Promise<void> => {
await queueLatestVideos(sql);
}
};

View File

@ -10,4 +10,4 @@ export const getVideoInfoWorker = async (job: Job): Promise<void> => {
return;
}
await insertVideoInfo(sql, aid);
}
};

View File

@ -5,15 +5,15 @@ import {
getBulkSnapshotsInNextSecond,
getSnapshotsInNextSecond,
setSnapshotStatus,
videoHasProcessingSchedule,
videoHasProcessingSchedule
} from "db/snapshotSchedule.ts";
import logger from "@core/log/logger.ts";
import { SnapshotQueue } from "mq/index.ts";
import { sql } from "@core/db/dbNew";
const priorityMap: { [key: string]: number } = {
"milestone": 1,
"normal": 3,
milestone: 1,
normal: 3
};
export const bulkSnapshotTickWorker = async (_job: Job) => {
@ -35,12 +35,16 @@ export const bulkSnapshotTickWorker = async (_job: Job) => {
created_at: schedule.created_at,
started_at: schedule.started_at,
finished_at: schedule.finished_at,
status: schedule.status,
status: schedule.status
};
});
await SnapshotQueue.add("bulkSnapshotVideo", {
schedules: schedulesData,
}, { priority: 3 });
await SnapshotQueue.add(
"bulkSnapshotVideo",
{
schedules: schedulesData
},
{ priority: 3 }
);
}
return `OK`;
} catch (e) {
@ -61,11 +65,15 @@ export const snapshotTickWorker = async (_job: Job) => {
}
const aid = Number(schedule.aid);
await setSnapshotStatus(sql, schedule.id, "processing");
await SnapshotQueue.add("snapshotVideo", {
await SnapshotQueue.add(
"snapshotVideo",
{
aid: Number(aid),
id: Number(schedule.id),
type: schedule.type ?? "normal",
}, { priority });
type: schedule.type ?? "normal"
},
{ priority }
);
}
return `OK`;
} catch (e) {
@ -76,5 +84,5 @@ export const snapshotTickWorker = async (_job: Job) => {
export const closetMilestone = (views: number) => {
if (views < 100000) return 100000;
if (views < 1000000) return 1000000;
return 10000000;
return Math.ceil(views / 1000000) * 1000000;
};

View File

@ -1,19 +1,19 @@
import { Job } from "bullmq";
import { scheduleSnapshot, setSnapshotStatus, snapshotScheduleExists } from "db/snapshotSchedule.ts";
import { getLatestSnapshot, scheduleSnapshot, setSnapshotStatus, snapshotScheduleExists } from "db/snapshotSchedule.ts";
import logger from "@core/log/logger.ts";
import { HOUR, MINUTE, SECOND } from "@core/const/time.ts";
import { lockManager } from "mq/lockManager.ts";
import { getBiliVideoStatus, setBiliVideoStatus } from "../../db/bilibili_metadata.ts";
import { insertVideoSnapshot } from "mq/task/getVideoStats.ts";
import { getSongsPublihsedAt } from "db/songs.ts";
import { getAdjustedShortTermETA } from "mq/scheduling.ts";
import { NetSchedulerError } from "@core/net/delegate.ts";
import { sql } from "@core/db/dbNew.ts";
import { closetMilestone } from "./snapshotTick.ts";
const snapshotTypeToTaskMap: { [key: string]: string } = {
"milestone": "snapshotMilestoneVideo",
"normal": "snapshotVideo",
"new": "snapshotMilestoneVideo",
milestone: "snapshotMilestoneVideo",
normal: "snapshotVideo",
new: "snapshotMilestoneVideo"
};
export const snapshotVideoWorker = async (job: Job): Promise<void> => {
@ -22,6 +22,7 @@ export const snapshotVideoWorker = async (job: Job): Promise<void> => {
const type = job.data.type;
const task = snapshotTypeToTaskMap[type] ?? "snapshotVideo";
const retryInterval = type === "milestone" ? 5 * SECOND : 2 * MINUTE;
const latestSnapshot = await getLatestSnapshot(sql, aid);
try {
const exists = await snapshotScheduleExists(sql, id);
if (!exists) {
@ -32,7 +33,7 @@ export const snapshotVideoWorker = async (job: Job): Promise<void> => {
logger.warn(
`Video ${aid} has status ${status} in the database. Abort snapshoting.`,
"mq",
"fn:dispatchRegularSnapshotsWorker",
"fn:dispatchRegularSnapshotsWorker"
);
return;
}
@ -44,7 +45,7 @@ export const snapshotVideoWorker = async (job: Job): Promise<void> => {
logger.warn(
`Bilibili return status ${status} when snapshoting for ${aid}.`,
"mq",
"fn:dispatchRegularSnapshotsWorker",
"fn:dispatchRegularSnapshotsWorker"
);
return;
}
@ -52,7 +53,7 @@ export const snapshotVideoWorker = async (job: Job): Promise<void> => {
if (type === "new") {
const publihsedAt = await getSongsPublihsedAt(sql, aid);
const timeSincePublished = stat.time - publihsedAt!;
const viewsPerHour = stat.views / timeSincePublished * HOUR;
const viewsPerHour = (stat.views / timeSincePublished) * HOUR;
if (timeSincePublished > 48 * HOUR) {
return;
}
@ -72,46 +73,41 @@ export const snapshotVideoWorker = async (job: Job): Promise<void> => {
await scheduleSnapshot(sql, aid, type, Date.now() + intervalMins * MINUTE, true);
}
if (type !== "milestone") return;
const alreadyAchievedMilestone = stat.views > closetMilestone(latestSnapshot.views);
if (alreadyAchievedMilestone) {
return;
}
const eta = await getAdjustedShortTermETA(sql, aid);
if (eta > 144) {
const etaHoursString = eta.toFixed(2) + " hrs";
logger.warn(
`ETA (${etaHoursString}) too long for milestone snapshot. aid: ${aid}.`,
"mq",
"fn:dispatchRegularSnapshotsWorker",
"fn:snapshotVideoWorker"
);
return;
}
const now = Date.now();
const targetTime = now + eta * HOUR;
await scheduleSnapshot(sql, aid, type, targetTime);
await setSnapshotStatus(sql, id, "completed");
return;
}
catch (e) {
} catch (e) {
if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") {
logger.warn(
`No available proxy for aid ${job.data.aid}.`,
"mq",
"fn:takeSnapshotForVideoWorker",
);
logger.warn(`No available proxy for aid ${job.data.aid}.`, "mq", "fn:snapshotVideoWorker");
await setSnapshotStatus(sql, id, "no_proxy");
await scheduleSnapshot(sql, aid, type, Date.now() + retryInterval);
await scheduleSnapshot(sql, aid, type, Date.now() + retryInterval, false, true);
return;
}
else if (e instanceof NetSchedulerError && e.code === "ALICLOUD_PROXY_ERR") {
} else if (e instanceof NetSchedulerError && e.code === "ALICLOUD_PROXY_ERR") {
logger.warn(
`Failed to proxy request for aid ${job.data.aid}: ${e.message}`,
"mq",
"fn:takeSnapshotForVideoWorker",
"fn:snapshotVideoWorker"
);
await setSnapshotStatus(sql, id, "failed");
await scheduleSnapshot(sql, aid, type, Date.now() + retryInterval);
}
logger.error(e as Error, "mq", "fn:takeSnapshotForVideoWorker");
logger.error(e as Error, "mq", "fn:snapshotVideoWorker");
await setSnapshotStatus(sql, id, "failed");
}
finally {
await lockManager.releaseLock("dispatchRegularSnapshots");
};
return;
};

View File

@ -3,7 +3,7 @@ import {
bulkScheduleSnapshot,
bulkSetSnapshotStatus,
scheduleSnapshot,
snapshotScheduleExists,
snapshotScheduleExists
} from "db/snapshotSchedule.ts";
import { bulkGetVideoStats } from "net/bulkGetVideoStats.ts";
import logger from "@core/log/logger.ts";
@ -55,7 +55,7 @@ export const takeBulkSnapshotForVideosWorker = async (job: Job) => {
${shares},
${favorites}
)
`
`;
logger.log(`Taken snapshot for video ${aid} in bulk.`, "net", "fn:takeBulkSnapshotForVideosWorker");
}
@ -72,13 +72,16 @@ export const takeBulkSnapshotForVideosWorker = async (job: Job) => {
return `DONE`;
} catch (e) {
if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") {
logger.warn(
`No available proxy for bulk request now.`,
"mq",
"fn:takeBulkSnapshotForVideosWorker",
);
logger.warn(`No available proxy for bulk request now.`, "mq", "fn:takeBulkSnapshotForVideosWorker");
await bulkSetSnapshotStatus(sql, ids, "no_proxy");
await bulkScheduleSnapshot(sql, aidsToFetch, "normal", Date.now() + 20 * MINUTE * Math.random());
await bulkScheduleSnapshot(
sql,
aidsToFetch,
"normal",
Date.now() + 20 * MINUTE * Math.random(),
false,
true
);
return;
}
logger.error(e as Error, "mq", "fn:takeBulkSnapshotForVideosWorker");

View File

@ -62,8 +62,8 @@ export async function initMQ() {
});
await SnapshotQueue.upsertJobScheduler("dispatchArchiveSnapshots", {
every: 6 * HOUR,
immediately: true
every: 2 * HOUR,
immediately: false
});
await SnapshotQueue.upsertJobScheduler("scheduleCleanup", {

View File

@ -2,7 +2,7 @@ import { findClosestSnapshot, getLatestSnapshot, hasAtLeast2Snapshots } from "db
import { truncate } from "utils/truncate.ts";
import { closetMilestone } from "./exec/snapshotTick.ts";
import { HOUR, MINUTE } from "@core/const/time.ts";
import type { Psql } from "global.d.ts";
import type { Psql } from "@core/db/psql.d.ts";
const log = (value: number, base: number = 10) => Math.log(value) / Math.log(base);
@ -12,13 +12,12 @@ const getFactor = (x: number) => {
const c = 100;
const u = 0.601;
const g = 455;
if (x>g) {
return log(b/log(x+1),a);
if (x > g) {
return log(b / log(x + 1), a);
} else {
return log(b / log(x + c), a) + u;
}
else {
return log(b/log(x+c),a)+u;
}
}
};
/*
* Returns the minimum ETA in hours for the next snapshot
@ -34,7 +33,7 @@ export const getAdjustedShortTermETA = async (sql: Psql, aid: number) => {
if (!snapshotsEnough) return 0;
const currentTimestamp = new Date().getTime();
const timeIntervals = [3 * MINUTE, 20 * MINUTE, 1 * HOUR, 3 * HOUR, 6 * HOUR, 72 * HOUR];
const timeIntervals = [3 * MINUTE, 20 * MINUTE, HOUR, 3 * HOUR, 6 * HOUR, 72 * HOUR];
const DELTA = 0.00001;
let minETAHours = Infinity;

View File

@ -3,7 +3,7 @@ import { aidExistsInSongs, getNotCollectedSongs } from "db/songs.ts";
import logger from "@core/log/logger.ts";
import { scheduleSnapshot } from "db/snapshotSchedule.ts";
import { MINUTE } from "@core/const/time.ts";
import type { Psql } from "global.d.ts";
import type { Psql } from "@core/db/psql.d.ts";
export async function collectSongs() {
const aids = await getNotCollectedSongs(sql);
@ -25,5 +25,5 @@ export async function insertIntoSongs(sql: Psql, aid: number) {
(SELECT duration FROM bilibili_metadata WHERE aid = ${aid})
)
ON CONFLICT DO NOTHING
`
`;
}

View File

@ -4,7 +4,7 @@ import logger from "@core/log/logger.ts";
import { ClassifyVideoQueue } from "mq/index.ts";
import { userExistsInBiliUsers, videoExistsInAllData } from "../../db/bilibili_metadata.ts";
import { HOUR, SECOND } from "@core/const/time.ts";
import type { Psql } from "global.d.ts";
import type { Psql } from "@core/db/psql.d.ts";
export async function insertVideoInfo(sql: Psql, aid: number) {
const videoExists = await videoExistsInAllData(sql, aid);
@ -18,9 +18,9 @@ export async function insertVideoInfo(sql: Psql, aid: number) {
const bvid = data.View.bvid;
const desc = data.View.desc;
const uid = data.View.owner.mid;
const tags = data.Tags
.filter((tag) => !["old_channel", "topic"].indexOf(tag.tag_type))
.map((tag) => tag.tag_name).join(",");
const tags = data.Tags.filter((tag) => !["old_channel", "topic"].indexOf(tag.tag_type))
.map((tag) => tag.tag_name)
.join(",");
const title = data.View.title;
const published_at = formatTimestampToPsql(data.View.pubdate * SECOND + 8 * HOUR);
const duration = data.View.duration;
@ -55,7 +55,7 @@ export async function insertVideoInfo(sql: Psql, aid: number) {
${stat.share},
${stat.favorite}
)
`
`;
logger.log(`Inserted video metadata for aid: ${aid}`, "mq");
await ClassifyVideoQueue.add("classifyVideo", { aid });

View File

@ -1,6 +1,6 @@
import { getVideoInfo } from "@core/net/getVideoInfo.ts";
import logger from "@core/log/logger.ts";
import type { Psql } from "global.d.ts";
import type { Psql } from "@core/db/psql.d.ts";
export interface SnapshotNumber {
time: number;
@ -24,11 +24,7 @@ export interface SnapshotNumber {
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
*/
export async function insertVideoSnapshot(
sql: Psql,
aid: number,
task: string,
): Promise<number | SnapshotNumber> {
export async function insertVideoSnapshot(sql: Psql, aid: number, task: string): Promise<number | SnapshotNumber> {
const data = await getVideoInfo(aid, task);
if (typeof data == "number") {
return data;
@ -45,7 +41,7 @@ export async function insertVideoSnapshot(
await sql`
INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites)
VALUES (${aid}, ${views}, ${danmakus}, ${replies}, ${likes}, ${coins}, ${shares}, ${favorites})
`
`;
logger.log(`Taken snapshot for video ${aid}.`, "net", "fn:insertVideoSnapshot");
@ -58,6 +54,6 @@ export async function insertVideoSnapshot(
coins,
shares,
favorites,
time,
time
};
}

View File

@ -4,11 +4,9 @@ import { sleep } from "utils/sleep.ts";
import { SECOND } from "@core/const/time.ts";
import logger from "@core/log/logger.ts";
import { LatestVideosQueue } from "mq/index.ts";
import type { Psql } from "global.d.ts";
import type { Psql } from "@core/db/psql.d.ts";
export async function queueLatestVideos(
sql: Psql,
): Promise<number | null> {
export async function queueLatestVideos(sql: Psql): Promise<number | null> {
let page = 1;
let i = 0;
const videosFound = new Set();
@ -26,14 +24,18 @@ export async function queueLatestVideos(
if (videoExists) {
continue;
}
await LatestVideosQueue.add("getVideoInfo", { aid }, {
await LatestVideosQueue.add(
"getVideoInfo",
{ aid },
{
delay,
attempts: 100,
backoff: {
type: "fixed",
delay: SECOND * 5,
},
});
delay: SECOND * 5
}
}
);
videosFound.add(aid);
allExists = false;
delay += Math.random() * SECOND * 1.5;
@ -42,7 +44,7 @@ export async function queueLatestVideos(
logger.log(
`Page ${page} crawled, total: ${videosFound.size}/${i} videos added/observed.`,
"net",
"fn:queueLatestVideos()",
"fn:queueLatestVideos()"
);
if (allExists) {
return 0;

View File

@ -1,6 +1,6 @@
import { findClosestSnapshot, findSnapshotBefore, getLatestSnapshot } from "db/snapshotSchedule.ts";
import { HOUR } from "@core/const/time.ts";
import type { Psql } from "global.d.ts";
import type { Psql } from "@core/db/psql.d.ts";
export const getRegularSnapshotInterval = async (sql: Psql, aid: number) => {
const now = Date.now();
@ -14,7 +14,7 @@ export const getRegularSnapshotInterval = async (sql: Psql, aid: number) => {
if (hoursDiff < 8) return 24;
const viewsDiff = latestSnapshot.views - oldSnapshot.views;
if (viewsDiff === 0) return 72;
const speedPerDay = viewsDiff / (hoursDiff + 0.001) * 24;
const speedPerDay = (viewsDiff / (hoursDiff + 0.001)) * 24;
if (speedPerDay < 6) return 36;
if (speedPerDay < 120) return 24;
if (speedPerDay < 320) return 12;

View File

@ -2,11 +2,7 @@ import { sql } from "@core/db/dbNew";
import logger from "@core/log/logger.ts";
export async function removeAllTimeoutSchedules() {
logger.log(
"Too many timeout schedules, directly removing these schedules...",
"mq",
"fn:scheduleCleanupWorker",
);
logger.log("Too many timeout schedules, directly removing these schedules...", "mq", "fn:scheduleCleanupWorker");
return await sql`
DELETE FROM snapshot_schedule
WHERE status IN ('pending', 'processing')

View File

@ -1,5 +1,6 @@
{
"name": "crawler",
"version": "1.3.0",
"scripts": {
"test": "bun --env-file=.env.test run vitest",
"worker:main": "bun run ./src/worker.ts",
@ -7,7 +8,8 @@
"worker:filter": "bun run ./build/filterWorker.js",
"adder": "bun run ./src/jobAdder.ts",
"bullui": "bun run ./src/bullui.ts",
"all": "bun run concurrently --restart-tries -1 'bun run worker:main' 'bun run adder' 'bun run bullui' 'bun run worker:filter'"
"all": "bun run concurrently --restart-tries -1 'bun run worker:main' 'bun run adder' 'bun run worker:filter'",
"format": "prettier --write ."
},
"devDependencies": {
"concurrently": "^9.1.2"
@ -19,6 +21,7 @@
"bullmq": "^5.52.1",
"express": "^5.1.0",
"ioredis": "^5.6.1",
"postgres": "^3.4.5",
"onnxruntime-node": "1.19.2"
}
}

View File

@ -6,7 +6,6 @@ await Bun.build({
target: "node"
});
const file = Bun.file("./build/filterWorker.js");
const code = await file.text();

View File

@ -11,9 +11,9 @@ createBullBoard({
queues: [
new BullMQAdapter(LatestVideosQueue),
new BullMQAdapter(ClassifyVideoQueue),
new BullMQAdapter(SnapshotQueue),
new BullMQAdapter(SnapshotQueue)
],
serverAdapter: serverAdapter,
serverAdapter: serverAdapter
});
const app = express();

View File

@ -3,7 +3,7 @@ import { redis } from "@core/db/redis.ts";
import logger from "@core/log/logger.ts";
import { classifyVideosWorker, classifyVideoWorker } from "mq/exec/classifyVideo.ts";
import { WorkerError } from "mq/schema.ts";
import { lockManager } from "mq/lockManager.ts";
import { lockManager } from "@core/mq/lockManager.ts";
import Akari from "ml/akari.ts";
const shutdown = async (signal: string) => {
@ -12,8 +12,8 @@ const shutdown = async (signal: string) => {
process.exit(0);
};
process.on('SIGINT', () => shutdown('SIGINT'));
process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on("SIGINT", () => shutdown("SIGINT"));
process.on("SIGTERM", () => shutdown("SIGTERM"));
await Akari.init();
@ -29,7 +29,7 @@ const filterWorker = new Worker(
break;
}
},
{ connection: redis as ConnectionOptions, concurrency: 2, removeOnComplete: { count: 1000 } },
{ connection: redis as ConnectionOptions, concurrency: 2, removeOnComplete: { count: 1000 } }
);
filterWorker.on("active", () => {

View File

@ -10,11 +10,11 @@ import {
scheduleCleanupWorker,
snapshotTickWorker,
snapshotVideoWorker,
takeBulkSnapshotForVideosWorker,
takeBulkSnapshotForVideosWorker
} from "mq/exec/executors.ts";
import { redis } from "@core/db/redis.ts";
import logger from "@core/log/logger.ts";
import { lockManager } from "mq/lockManager.ts";
import { lockManager } from "@core/mq/lockManager.ts";
import { WorkerError } from "mq/schema.ts";
const releaseLockForJob = async (name: string) => {
@ -37,8 +37,8 @@ const shutdown = async (signal: string) => {
process.exit(0);
};
process.on('SIGINT', () => shutdown('SIGINT'));
process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on("SIGINT", () => shutdown("SIGINT"));
process.on("SIGTERM", () => shutdown("SIGTERM"));
const latestVideoWorker = new Worker(
"latestVideos",
@ -58,8 +58,8 @@ const latestVideoWorker = new Worker(
connection: redis as ConnectionOptions,
concurrency: 6,
removeOnComplete: { count: 1440 },
removeOnFail: { count: 0 },
},
removeOnFail: { count: 0 }
}
);
latestVideoWorker.on("active", () => {
@ -95,7 +95,7 @@ const snapshotWorker = new Worker(
break;
}
},
{ connection: redis as ConnectionOptions, concurrency: 50, removeOnComplete: { count: 2000 } },
{ connection: redis as ConnectionOptions, concurrency: 50, removeOnComplete: { count: 2000 } }
);
snapshotWorker.on("error", (err) => {

View File

@ -56,26 +56,26 @@ const databasePreparationQuery = `
CREATE INDEX idx_snapshot_schedule_status ON snapshot_schedule USING btree (status);
CREATE INDEX idx_snapshot_schedule_type ON snapshot_schedule USING btree (type);
CREATE UNIQUE INDEX snapshot_schedule_pkey ON snapshot_schedule USING btree (id);
`
`;
const cleanUpQuery = `
DROP SEQUENCE IF EXISTS "snapshot_schedule_id_seq" CASCADE;
DROP TABLE IF EXISTS "snapshot_schedule" CASCADE;
`
`;
async function testMocking() {
await sql.begin(async tx => {
await sql.begin(async (tx) => {
await tx.unsafe(cleanUpQuery).simple();
await tx.unsafe(databasePreparationQuery).simple();
await tx`
INSERT INTO snapshot_schedule
${sql(mockSnapshotSchedules, 'aid', 'created_at', 'finished_at', 'id', 'started_at', 'status', 'type')}
${sql(mockSnapshotSchedules, "aid", "created_at", "finished_at", "id", "started_at", "status", "type")}
`;
await tx`
ROLLBACK;
`
`;
await tx.unsafe(cleanUpQuery).simple();
return;
@ -83,26 +83,26 @@ async function testMocking() {
}
async function testBulkSetSnapshotStatus() {
return await sql.begin(async tx => {
return await sql.begin(async (tx) => {
await tx.unsafe(cleanUpQuery).simple();
await tx.unsafe(databasePreparationQuery).simple();
await tx`
INSERT INTO snapshot_schedule
${sql(mockSnapshotSchedules, 'aid', 'created_at', 'finished_at', 'id', 'started_at', 'status', 'type')}
${sql(mockSnapshotSchedules, "aid", "created_at", "finished_at", "id", "started_at", "status", "type")}
`;
const ids = [1, 2, 3];
await bulkSetSnapshotStatus(tx, ids, 'pending')
await bulkSetSnapshotStatus(tx, ids, "pending");
const rows = tx<{status: string}[]>`
const rows = tx<{ status: string }[]>`
SELECT status FROM snapshot_schedule WHERE id = 1;
`.execute();
await tx`
ROLLBACK;
`
`;
await tx.unsafe(cleanUpQuery).simple();
return rows;
@ -116,5 +116,5 @@ test("data mocking works", async () => {
test("bulkSetSnapshotStatus core logic works smoothly", async () => {
const rows = await testBulkSetSnapshotStatus();
expect(rows.every(item => item.status === 'pending')).toBe(true);
expect(rows.every((item) => item.status === "pending")).toBe(true);
});

View File

@ -1,6 +1,6 @@
export function formatTimestampToPsql(timestamp: number) {
const date = new Date(timestamp);
return date.toISOString().slice(0, 23).replace("T", " ");
return date.toISOString().slice(0, 23).replace("T", " ") + "+08";
}
export function parseTimestampFromPsql(timestamp: string) {

Some files were not shown because too many files have changed in this diff Show More