Merge branch 'main' into ref/next

This commit is contained in:
alikia2x (寒寒) 2025-06-05 22:30:19 +08:00 committed by GitHub
commit 54a2de0a11
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
37 changed files with 232 additions and 222 deletions

View File

@ -3,6 +3,15 @@
"workspaces": { "workspaces": {
"": { "": {
"name": "cvsa", "name": "cvsa",
"dependencies": {
"postgres": "^3.4.5",
},
"devDependencies": {
"prettier": "^3.5.3",
"vite-tsconfig-paths": "^5.1.4",
"vitest": "^3.1.2",
"vitest-tsconfig-paths": "^3.4.1",
},
}, },
"packages/backend": { "packages/backend": {
"name": "@cvsa/backend", "name": "@cvsa/backend",

View File

@ -1,12 +1,21 @@
{ {
"name": "cvsa", "name": "cvsa",
"version": "2.13.22", "version": "2.13.22",
"private": true, "private": false,
"type": "module", "type": "module",
"workspaces": [ "workspaces": [
"packages/frontend", "packages/frontend",
"packages/core", "packages/core",
"packages/backend", "packages/backend",
"packages/crawler" "packages/crawler"
] ],
"dependencies": {
"postgres": "^3.4.5"
},
"devDependencies": {
"vite-tsconfig-paths": "^5.1.4",
"vitest": "^3.1.2",
"vitest-tsconfig-paths": "^3.4.1",
"prettier": "^3.5.3"
}
} }

View File

@ -30,9 +30,9 @@ export async function insertVideoLabel(sql: Psql, aid: number, label: number) {
} }
export async function getVideoInfoFromAllData(sql: Psql, aid: number) { export async function getVideoInfoFromAllData(sql: Psql, aid: number) {
const rows = await sql<BiliVideoMetadataType[]>` const rows = await sql<AllDataType[]>`
SELECT * FROM bilibili_metadata WHERE aid = ${aid} SELECT * FROM bilibili_metadata WHERE aid = ${aid}
`; `;
const row = rows[0]; const row = rows[0];
let authorInfo = ""; let authorInfo = "";
if (row.uid && (await userExistsInBiliUsers(sql, row.uid))) { if (row.uid && (await userExistsInBiliUsers(sql, row.uid))) {

View File

@ -22,7 +22,7 @@ export async function getVideosNearMilestone(sql: Psql) {
return queryResult.map((row) => { return queryResult.map((row) => {
return { return {
...row, ...row,
aid: Number(row.aid), aid: Number(row.aid)
}; };
}); });
} }
@ -40,7 +40,7 @@ export async function getLatestVideoSnapshot(sql: Psql, aid: number): Promise<nu
return { return {
...row, ...row,
aid: Number(row.aid), aid: Number(row.aid),
time: new Date(row.time).getTime(), time: new Date(row.time).getTime()
}; };
})[0]; })[0];
} }

View File

@ -63,18 +63,6 @@ export async function snapshotScheduleExists(sql: Psql, id: number) {
return rows.length > 0; return rows.length > 0;
} }
export async function videoHasActiveSchedule(sql: Psql, aid: number) {
const rows = await sql<{ status: string }[]>`
SELECT status
FROM snapshot_schedule
WHERE aid = ${aid}
AND (status = 'pending'
OR status = 'processing'
)
`
return rows.length > 0;
}
export async function videoHasActiveScheduleWithType(sql: Psql, aid: number, type: string) { export async function videoHasActiveScheduleWithType(sql: Psql, aid: number, type: string) {
const rows = await sql<{ status: string }[]>` const rows = await sql<{ status: string }[]>`
SELECT status FROM snapshot_schedule SELECT status FROM snapshot_schedule
@ -91,7 +79,7 @@ export async function videoHasProcessingSchedule(sql: Psql, aid: number) {
FROM snapshot_schedule FROM snapshot_schedule
WHERE aid = ${aid} WHERE aid = ${aid}
AND status = 'processing' AND status = 'processing'
` `;
return rows.length > 0; return rows.length > 0;
} }
@ -102,7 +90,7 @@ export async function bulkGetVideosWithoutProcessingSchedules(sql: Psql, aids: n
WHERE aid = ANY(${aids}) WHERE aid = ANY(${aids})
AND status != 'processing' AND status != 'processing'
GROUP BY aid GROUP BY aid
` `;
return rows.map((row) => Number(row.aid)); return rows.map((row) => Number(row.aid));
} }
@ -292,23 +280,23 @@ export async function adjustSnapshotTime(
} }
export async function getSnapshotsInNextSecond(sql: Psql) { export async function getSnapshotsInNextSecond(sql: Psql) {
const rows = await sql<SnapshotScheduleType[]>` return sql<SnapshotScheduleType[]>`
SELECT * SELECT *
FROM snapshot_schedule FROM snapshot_schedule
WHERE started_at <= NOW() + INTERVAL '1 seconds' AND status = 'pending' AND type != 'normal' WHERE started_at <= NOW() + INTERVAL '1 seconds'
ORDER BY AND status = 'pending'
CASE AND type != 'normal'
WHEN type = 'milestone' THEN 0 ORDER BY CASE
ELSE 1 WHEN type = 'milestone' THEN 0
END, ELSE 1
started_at END,
LIMIT 10; started_at
` LIMIT 10;
return rows; `;
} }
export async function getBulkSnapshotsInNextSecond(sql: Psql) { export async function getBulkSnapshotsInNextSecond(sql: Psql) {
const rows = await sql<SnapshotScheduleType[]>` return sql<SnapshotScheduleType[]>`
SELECT * SELECT *
FROM snapshot_schedule FROM snapshot_schedule
WHERE (started_at <= NOW() + INTERVAL '15 seconds') WHERE (started_at <= NOW() + INTERVAL '15 seconds')
@ -320,27 +308,33 @@ export async function getBulkSnapshotsInNextSecond(sql: Psql) {
END, END,
started_at started_at
LIMIT 1000; LIMIT 1000;
` `;
return rows;
} }
export async function setSnapshotStatus(sql: Psql, id: number, status: string) { export async function setSnapshotStatus(sql: Psql, id: number, status: string) {
return await sql` return sql`
UPDATE snapshot_schedule SET status = ${status} WHERE id = ${id} UPDATE snapshot_schedule
SET status = ${status}
WHERE id = ${id}
`; `;
} }
export async function bulkSetSnapshotStatus(sql: Psql, ids: number[], status: string) { export async function bulkSetSnapshotStatus(sql: Psql, ids: number[], status: string) {
return await sql` return sql`
UPDATE snapshot_schedule SET status = ${status} WHERE id = ANY(${ids}) UPDATE snapshot_schedule
SET status = ${status}
WHERE id = ANY (${ids})
`; `;
} }
export async function getVideosWithoutActiveSnapshotSchedule(sql: Psql) { export async function getVideosWithoutActiveSnapshotScheduleByType(sql: Psql, type: string) {
const rows = await sql<{ aid: string }[]>` const rows = await sql<{ aid: string }[]>`
SELECT s.aid SELECT s.aid
FROM songs s FROM songs s
LEFT JOIN snapshot_schedule ss ON s.aid = ss.aid AND (ss.status = 'pending' OR ss.status = 'processing') LEFT JOIN snapshot_schedule ss ON
s.aid = ss.aid AND
(ss.status = 'pending' OR ss.status = 'processing') AND
ss.type = ${type}
WHERE ss.aid IS NULL WHERE ss.aid IS NULL
`; `;
return rows.map((r) => Number(r.aid)); return rows.map((r) => Number(r.aid));
@ -352,6 +346,6 @@ export async function getAllVideosWithoutActiveSnapshotSchedule(psql: Psql) {
FROM bilibili_metadata s FROM bilibili_metadata s
LEFT JOIN snapshot_schedule ss ON s.aid = ss.aid AND (ss.status = 'pending' OR ss.status = 'processing') LEFT JOIN snapshot_schedule ss ON s.aid = ss.aid AND (ss.status = 'pending' OR ss.status = 'processing')
WHERE ss.aid IS NULL WHERE ss.aid IS NULL
` `;
return rows.map((r) => Number(r.aid)); return rows.map((r) => Number(r.aid));
} }

View File

@ -2,7 +2,7 @@ import type { Psql } from "@core/db/psql.d.ts";
import { parseTimestampFromPsql } from "utils/formatTimestampToPostgre.ts"; import { parseTimestampFromPsql } from "utils/formatTimestampToPostgre.ts";
export async function getNotCollectedSongs(sql: Psql) { export async function getNotCollectedSongs(sql: Psql) {
const rows = await sql<{ aid: number }[]>` const rows = await sql<{ aid: number }[]>`
SELECT lr.aid SELECT lr.aid
FROM labelling_result lr FROM labelling_result lr
WHERE lr.label != 0 WHERE lr.label != 0
@ -12,28 +12,28 @@ export async function getNotCollectedSongs(sql: Psql) {
WHERE s.aid = lr.aid WHERE s.aid = lr.aid
); );
`; `;
return rows.map((row) => row.aid); return rows.map((row) => row.aid);
} }
export async function aidExistsInSongs(sql: Psql, aid: number) { export async function aidExistsInSongs(sql: Psql, aid: number) {
const rows = await sql<{ exists: boolean }[]>` const rows = await sql<{ exists: boolean }[]>`
SELECT EXISTS ( SELECT EXISTS (
SELECT 1 SELECT 1
FROM songs FROM songs
WHERE aid = ${aid} WHERE aid = ${aid}
); );
`; `;
return rows[0].exists; return rows[0].exists;
} }
export async function getSongsPublihsedAt(sql: Psql, aid: number) { export async function getSongsPublihsedAt(sql: Psql, aid: number) {
const rows = await sql<{ published_at: string }[]>` const rows = await sql<{ published_at: string }[]>`
SELECT published_at SELECT published_at
FROM songs FROM songs
WHERE aid = ${aid}; WHERE aid = ${aid};
`; `;
if (rows.length === 0) { if (rows.length === 0) {
return null; return null;
} }
return parseTimestampFromPsql(rows[0].published_at); return parseTimestampFromPsql(rows[0].published_at);
} }

View File

@ -16,8 +16,8 @@ class AkariProto extends AIManager {
constructor() { constructor() {
super(); super();
this.models = { this.models = {
"classifier": onnxClassifierPath, classifier: onnxClassifierPath,
"embedding": onnxEmbeddingPath, embedding: onnxEmbeddingPath
}; };
} }
@ -55,7 +55,7 @@ class AkariProto extends AIManager {
const { input_ids } = await tokenizer(texts, { const { input_ids } = await tokenizer(texts, {
add_special_tokens: false, add_special_tokens: false,
return_tensor: false, return_tensor: false
}); });
const cumsum = (arr: number[]): number[] => const cumsum = (arr: number[]): number[] =>
@ -66,9 +66,9 @@ class AkariProto extends AIManager {
const inputs = { const inputs = {
input_ids: new ort.Tensor("int64", new BigInt64Array(flattened_input_ids.map(BigInt)), [ input_ids: new ort.Tensor("int64", new BigInt64Array(flattened_input_ids.map(BigInt)), [
flattened_input_ids.length, flattened_input_ids.length
]), ]),
offsets: new ort.Tensor("int64", new BigInt64Array(offsets.map(BigInt)), [offsets.length]), offsets: new ort.Tensor("int64", new BigInt64Array(offsets.map(BigInt)), [offsets.length])
}; };
const { embeddings } = await session.run(inputs); const { embeddings } = await session.run(inputs);
@ -77,21 +77,14 @@ class AkariProto extends AIManager {
private async runClassification(embeddings: number[]): Promise<number[]> { private async runClassification(embeddings: number[]): Promise<number[]> {
const session = this.getModelSession("classifier"); const session = this.getModelSession("classifier");
const inputTensor = new ort.Tensor( const inputTensor = new ort.Tensor(Float32Array.from(embeddings), [1, 3, 1024]);
Float32Array.from(embeddings),
[1, 3, 1024],
);
const { logits } = await session.run({ channel_features: inputTensor }); const { logits } = await session.run({ channel_features: inputTensor });
return this.softmax(logits.data as Float32Array); return this.softmax(logits.data as Float32Array);
} }
public async classifyVideo(title: string, description: string, tags: string, aid?: number): Promise<number> { public async classifyVideo(title: string, description: string, tags: string, aid?: number): Promise<number> {
const embeddings = await this.getJinaEmbeddings1024([ const embeddings = await this.getJinaEmbeddings1024([title, description, tags]);
title,
description,
tags,
]);
const probabilities = await this.runClassification(embeddings); const probabilities = await this.runClassification(embeddings);
if (aid) { if (aid) {
logger.log(`Prediction result for aid: ${aid}: [${probabilities.map((p) => p.toFixed(5))}]`, "ml"); logger.log(`Prediction result for aid: ${aid}: [${probabilities.map((p) => p.toFixed(5))}]`, "ml");

View File

@ -1 +1 @@
export const AkariModelVersion = "3.17"; export const AkariModelVersion = "3.17";

View File

@ -6,8 +6,7 @@ export class AIManager {
public sessions: { [key: string]: ort.InferenceSession } = {}; public sessions: { [key: string]: ort.InferenceSession } = {};
public models: { [key: string]: string } = {}; public models: { [key: string]: string } = {};
constructor() { constructor() {}
}
public async init() { public async init() {
const modelKeys = Object.keys(this.models); const modelKeys = Object.keys(this.models);

View File

@ -3,9 +3,26 @@ import { getAllVideosWithoutActiveSnapshotSchedule, scheduleSnapshot } from "db/
import logger from "@core/log/logger.ts"; import logger from "@core/log/logger.ts";
import { lockManager } from "@core/mq/lockManager.ts"; import { lockManager } from "@core/mq/lockManager.ts";
import { getLatestVideoSnapshot } from "db/snapshot.ts"; import { getLatestVideoSnapshot } from "db/snapshot.ts";
import { HOUR, MINUTE } from "@core/const/time.ts"; import { MINUTE } from "@core/const/time.ts";
import { sql } from "@core/db/dbNew"; import { sql } from "@core/db/dbNew";
function getNextSaturdayMidnightTimestamp(): number {
const now = new Date();
const currentDay = now.getDay();
let daysUntilNextSaturday = (6 - currentDay + 7) % 7;
if (daysUntilNextSaturday === 0) {
daysUntilNextSaturday = 7;
}
const nextSaturday = new Date(now);
nextSaturday.setDate(nextSaturday.getDate() + daysUntilNextSaturday);
nextSaturday.setHours(0, 0, 0, 0);
return nextSaturday.getTime();
}
export const archiveSnapshotsWorker = async (_job: Job) => { export const archiveSnapshotsWorker = async (_job: Job) => {
try { try {
const startedAt = Date.now(); const startedAt = Date.now();
@ -20,15 +37,16 @@ export const archiveSnapshotsWorker = async (_job: Job) => {
const latestSnapshot = await getLatestVideoSnapshot(sql, aid); const latestSnapshot = await getLatestVideoSnapshot(sql, aid);
const now = Date.now(); const now = Date.now();
const lastSnapshotedAt = latestSnapshot?.time ?? now; const lastSnapshotedAt = latestSnapshot?.time ?? now;
const interval = 168; const nextSatMidnight = getNextSaturdayMidnightTimestamp();
const interval = nextSatMidnight - now;
logger.log( logger.log(
`Scheduled archive snapshot for aid ${aid} in ${interval} hours.`, `Scheduled archive snapshot for aid ${aid} in ${interval} hours.`,
"mq", "mq",
"fn:archiveSnapshotsWorker" "fn:archiveSnapshotsWorker"
); );
const targetTime = lastSnapshotedAt + interval * HOUR; const targetTime = lastSnapshotedAt + interval;
await scheduleSnapshot(sql, aid, "archive", targetTime); await scheduleSnapshot(sql, aid, "archive", targetTime);
if (now - startedAt > 250 * MINUTE) { if (now - startedAt > 30 * MINUTE) {
return; return;
} }
} }

View File

@ -34,7 +34,7 @@ export const classifyVideoWorker = async (job: Job) => {
await job.updateData({ await job.updateData({
...job.data, ...job.data,
label: label, label: label
}); });
return 0; return 0;
@ -46,19 +46,19 @@ export const classifyVideosWorker = async () => {
return; return;
} }
await lockManager.acquireLock("classifyVideos"); await lockManager.acquireLock("classifyVideos", 5 * 60);
const videos = await getUnlabelledVideos(sql); const videos = await getUnlabelledVideos(sql);
logger.log(`Found ${videos.length} unlabelled videos`); logger.log(`Found ${videos.length} unlabelled videos`);
let i = 0; const startTime = new Date().getTime();
for (const aid of videos) { for (const aid of videos) {
if (i > 200) { const now = new Date().getTime();
if (now - startTime > 4.2 * MINUTE) {
await lockManager.releaseLock("classifyVideos"); await lockManager.releaseLock("classifyVideos");
return 10000 + i; return 1;
} }
await ClassifyVideoQueue.add("classifyVideo", { aid: Number(aid) }); await ClassifyVideoQueue.add("classifyVideo", { aid: Number(aid) });
i++;
} }
await lockManager.releaseLock("classifyVideos"); await lockManager.releaseLock("classifyVideos");
return 0; return 0;

View File

@ -1,7 +1,7 @@
import { Job } from "bullmq"; import { Job } from "bullmq";
import { collectSongs } from "mq/task/collectSongs.ts"; import { collectSongs } from "mq/task/collectSongs.ts";
export const collectSongsWorker = async (_job: Job): Promise<void> =>{ export const collectSongsWorker = async (_job: Job): Promise<void> => {
await collectSongs(); await collectSongs();
return; return;
} };

View File

@ -16,8 +16,8 @@ export const dispatchMilestoneSnapshotsWorker = async (_job: Job) => {
if (eta > 144) continue; if (eta > 144) continue;
const now = Date.now(); const now = Date.now();
const scheduledNextSnapshotDelay = eta * HOUR; const scheduledNextSnapshotDelay = eta * HOUR;
const maxInterval = 1 * HOUR; const maxInterval = 1.2 * HOUR;
const minInterval = 1 * SECOND; const minInterval = 2 * SECOND;
const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval); const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval);
const targetTime = now + delay; const targetTime = now + delay;
await scheduleSnapshot(sql, aid, "milestone", targetTime); await scheduleSnapshot(sql, aid, "milestone", targetTime);
@ -25,5 +25,5 @@ export const dispatchMilestoneSnapshotsWorker = async (_job: Job) => {
} }
} catch (e) { } catch (e) {
logger.error(e as Error, "mq", "fn:dispatchMilestoneSnapshotsWorker"); logger.error(e as Error, "mq", "fn:dispatchMilestoneSnapshotsWorker");
}; }
} };

View File

@ -1,7 +1,7 @@
import { Job } from "bullmq"; import { Job } from "bullmq";
import { getLatestVideoSnapshot } from "db/snapshot.ts"; import { getLatestVideoSnapshot } from "db/snapshot.ts";
import { truncate } from "utils/truncate.ts"; import { truncate } from "utils/truncate.ts";
import { getVideosWithoutActiveSnapshotSchedule, scheduleSnapshot } from "db/snapshotSchedule.ts"; import { getVideosWithoutActiveSnapshotScheduleByType, scheduleSnapshot } from "db/snapshotSchedule.ts";
import logger from "@core/log/logger.ts"; import logger from "@core/log/logger.ts";
import { HOUR, MINUTE, WEEK } from "@core/const/time.ts"; import { HOUR, MINUTE, WEEK } from "@core/const/time.ts";
import { lockManager } from "@core/mq/lockManager.ts"; import { lockManager } from "@core/mq/lockManager.ts";
@ -17,7 +17,7 @@ export const dispatchRegularSnapshotsWorker = async (_job: Job): Promise<void> =
} }
await lockManager.acquireLock("dispatchRegularSnapshots", 30 * 60); await lockManager.acquireLock("dispatchRegularSnapshots", 30 * 60);
const aids = await getVideosWithoutActiveSnapshotSchedule(sql); const aids = await getVideosWithoutActiveSnapshotScheduleByType(sql, "normal");
for (const rawAid of aids) { for (const rawAid of aids) {
const aid = Number(rawAid); const aid = Number(rawAid);
const latestSnapshot = await getLatestVideoSnapshot(sql, aid); const latestSnapshot = await getLatestVideoSnapshot(sql, aid);

View File

@ -7,4 +7,4 @@ export * from "./dispatchMilestoneSnapshots.ts";
export * from "./dispatchRegularSnapshots.ts"; export * from "./dispatchRegularSnapshots.ts";
export * from "./snapshotVideo.ts"; export * from "./snapshotVideo.ts";
export * from "./scheduleCleanup.ts"; export * from "./scheduleCleanup.ts";
export * from "./snapshotTick.ts"; export * from "./snapshotTick.ts";

View File

@ -2,6 +2,6 @@ import { sql } from "@core/db/dbNew";
import { Job } from "bullmq"; import { Job } from "bullmq";
import { queueLatestVideos } from "mq/task/queueLatestVideo.ts"; import { queueLatestVideos } from "mq/task/queueLatestVideo.ts";
export const getLatestVideosWorker = async (_job: Job): Promise<void> =>{ export const getLatestVideosWorker = async (_job: Job): Promise<void> => {
await queueLatestVideos(sql); await queueLatestVideos(sql);
} };

View File

@ -10,4 +10,4 @@ export const getVideoInfoWorker = async (job: Job): Promise<void> => {
return; return;
} }
await insertVideoInfo(sql, aid); await insertVideoInfo(sql, aid);
} };

View File

@ -5,15 +5,15 @@ import {
getBulkSnapshotsInNextSecond, getBulkSnapshotsInNextSecond,
getSnapshotsInNextSecond, getSnapshotsInNextSecond,
setSnapshotStatus, setSnapshotStatus,
videoHasProcessingSchedule, videoHasProcessingSchedule
} from "db/snapshotSchedule.ts"; } from "db/snapshotSchedule.ts";
import logger from "@core/log/logger.ts"; import logger from "@core/log/logger.ts";
import { SnapshotQueue } from "mq/index.ts"; import { SnapshotQueue } from "mq/index.ts";
import { sql } from "@core/db/dbNew"; import { sql } from "@core/db/dbNew";
const priorityMap: { [key: string]: number } = { const priorityMap: { [key: string]: number } = {
"milestone": 1, milestone: 1,
"normal": 3, normal: 3
}; };
export const bulkSnapshotTickWorker = async (_job: Job) => { export const bulkSnapshotTickWorker = async (_job: Job) => {
@ -35,12 +35,16 @@ export const bulkSnapshotTickWorker = async (_job: Job) => {
created_at: schedule.created_at, created_at: schedule.created_at,
started_at: schedule.started_at, started_at: schedule.started_at,
finished_at: schedule.finished_at, finished_at: schedule.finished_at,
status: schedule.status, status: schedule.status
}; };
}); });
await SnapshotQueue.add("bulkSnapshotVideo", { await SnapshotQueue.add(
schedules: schedulesData, "bulkSnapshotVideo",
}, { priority: 3 }); {
schedules: schedulesData
},
{ priority: 3 }
);
} }
return `OK`; return `OK`;
} catch (e) { } catch (e) {
@ -61,11 +65,15 @@ export const snapshotTickWorker = async (_job: Job) => {
} }
const aid = Number(schedule.aid); const aid = Number(schedule.aid);
await setSnapshotStatus(sql, schedule.id, "processing"); await setSnapshotStatus(sql, schedule.id, "processing");
await SnapshotQueue.add("snapshotVideo", { await SnapshotQueue.add(
aid: Number(aid), "snapshotVideo",
id: Number(schedule.id), {
type: schedule.type ?? "normal", aid: Number(aid),
}, { priority }); id: Number(schedule.id),
type: schedule.type ?? "normal"
},
{ priority }
);
} }
return `OK`; return `OK`;
} catch (e) { } catch (e) {

View File

@ -2,7 +2,6 @@ import { Job } from "bullmq";
import { scheduleSnapshot, setSnapshotStatus, snapshotScheduleExists } from "db/snapshotSchedule.ts"; import { scheduleSnapshot, setSnapshotStatus, snapshotScheduleExists } from "db/snapshotSchedule.ts";
import logger from "@core/log/logger.ts"; import logger from "@core/log/logger.ts";
import { HOUR, MINUTE, SECOND } from "@core/const/time.ts"; import { HOUR, MINUTE, SECOND } from "@core/const/time.ts";
import { lockManager } from "@core/mq/lockManager.ts";
import { getBiliVideoStatus, setBiliVideoStatus } from "../../db/bilibili_metadata.ts"; import { getBiliVideoStatus, setBiliVideoStatus } from "../../db/bilibili_metadata.ts";
import { insertVideoSnapshot } from "mq/task/getVideoStats.ts"; import { insertVideoSnapshot } from "mq/task/getVideoStats.ts";
import { getSongsPublihsedAt } from "db/songs.ts"; import { getSongsPublihsedAt } from "db/songs.ts";
@ -11,9 +10,9 @@ import { NetSchedulerError } from "@core/net/delegate.ts";
import { sql } from "@core/db/dbNew.ts"; import { sql } from "@core/db/dbNew.ts";
const snapshotTypeToTaskMap: { [key: string]: string } = { const snapshotTypeToTaskMap: { [key: string]: string } = {
"milestone": "snapshotMilestoneVideo", milestone: "snapshotMilestoneVideo",
"normal": "snapshotVideo", normal: "snapshotVideo",
"new": "snapshotMilestoneVideo", new: "snapshotMilestoneVideo"
}; };
export const snapshotVideoWorker = async (job: Job): Promise<void> => { export const snapshotVideoWorker = async (job: Job): Promise<void> => {
@ -32,7 +31,7 @@ export const snapshotVideoWorker = async (job: Job): Promise<void> => {
logger.warn( logger.warn(
`Video ${aid} has status ${status} in the database. Abort snapshoting.`, `Video ${aid} has status ${status} in the database. Abort snapshoting.`,
"mq", "mq",
"fn:dispatchRegularSnapshotsWorker", "fn:dispatchRegularSnapshotsWorker"
); );
return; return;
} }
@ -44,7 +43,7 @@ export const snapshotVideoWorker = async (job: Job): Promise<void> => {
logger.warn( logger.warn(
`Bilibili return status ${status} when snapshoting for ${aid}.`, `Bilibili return status ${status} when snapshoting for ${aid}.`,
"mq", "mq",
"fn:dispatchRegularSnapshotsWorker", "fn:dispatchRegularSnapshotsWorker"
); );
return; return;
} }
@ -52,7 +51,7 @@ export const snapshotVideoWorker = async (job: Job): Promise<void> => {
if (type === "new") { if (type === "new") {
const publihsedAt = await getSongsPublihsedAt(sql, aid); const publihsedAt = await getSongsPublihsedAt(sql, aid);
const timeSincePublished = stat.time - publihsedAt!; const timeSincePublished = stat.time - publihsedAt!;
const viewsPerHour = stat.views / timeSincePublished * HOUR; const viewsPerHour = (stat.views / timeSincePublished) * HOUR;
if (timeSincePublished > 48 * HOUR) { if (timeSincePublished > 48 * HOUR) {
return; return;
} }
@ -78,40 +77,31 @@ export const snapshotVideoWorker = async (job: Job): Promise<void> => {
logger.warn( logger.warn(
`ETA (${etaHoursString}) too long for milestone snapshot. aid: ${aid}.`, `ETA (${etaHoursString}) too long for milestone snapshot. aid: ${aid}.`,
"mq", "mq",
"fn:dispatchRegularSnapshotsWorker", "fn:snapshotVideoWorker"
); );
return;
} }
const now = Date.now(); const now = Date.now();
const targetTime = now + eta * HOUR; const targetTime = now + eta * HOUR;
await scheduleSnapshot(sql, aid, type, targetTime); await scheduleSnapshot(sql, aid, type, targetTime);
await setSnapshotStatus(sql, id, "completed"); await setSnapshotStatus(sql, id, "completed");
return; return;
} } catch (e) {
catch (e) {
if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") { if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") {
logger.warn( logger.warn(`No available proxy for aid ${job.data.aid}.`, "mq", "fn:snapshotVideoWorker");
`No available proxy for aid ${job.data.aid}.`,
"mq",
"fn:takeSnapshotForVideoWorker",
);
await setSnapshotStatus(sql, id, "no_proxy"); await setSnapshotStatus(sql, id, "no_proxy");
await scheduleSnapshot(sql, aid, type, Date.now() + retryInterval); await scheduleSnapshot(sql, aid, type, Date.now() + retryInterval);
return; return;
} } else if (e instanceof NetSchedulerError && e.code === "ALICLOUD_PROXY_ERR") {
else if (e instanceof NetSchedulerError && e.code === "ALICLOUD_PROXY_ERR") {
logger.warn( logger.warn(
`Failed to proxy request for aid ${job.data.aid}: ${e.message}`, `Failed to proxy request for aid ${job.data.aid}: ${e.message}`,
"mq", "mq",
"fn:takeSnapshotForVideoWorker", "fn:snapshotVideoWorker"
); );
await setSnapshotStatus(sql, id, "failed"); await setSnapshotStatus(sql, id, "failed");
await scheduleSnapshot(sql, aid, type, Date.now() + retryInterval); await scheduleSnapshot(sql, aid, type, Date.now() + retryInterval);
} }
logger.error(e as Error, "mq", "fn:takeSnapshotForVideoWorker"); logger.error(e as Error, "mq", "fn:snapshotVideoWorker");
await setSnapshotStatus(sql, id, "failed"); await setSnapshotStatus(sql, id, "failed");
} }
finally {
await lockManager.releaseLock("dispatchRegularSnapshots");
};
return;
}; };

View File

@ -3,7 +3,7 @@ import {
bulkScheduleSnapshot, bulkScheduleSnapshot,
bulkSetSnapshotStatus, bulkSetSnapshotStatus,
scheduleSnapshot, scheduleSnapshot,
snapshotScheduleExists, snapshotScheduleExists
} from "db/snapshotSchedule.ts"; } from "db/snapshotSchedule.ts";
import { bulkGetVideoStats } from "net/bulkGetVideoStats.ts"; import { bulkGetVideoStats } from "net/bulkGetVideoStats.ts";
import logger from "@core/log/logger.ts"; import logger from "@core/log/logger.ts";
@ -55,7 +55,7 @@ export const takeBulkSnapshotForVideosWorker = async (job: Job) => {
${shares}, ${shares},
${favorites} ${favorites}
) )
` `;
logger.log(`Taken snapshot for video ${aid} in bulk.`, "net", "fn:takeBulkSnapshotForVideosWorker"); logger.log(`Taken snapshot for video ${aid} in bulk.`, "net", "fn:takeBulkSnapshotForVideosWorker");
} }
@ -72,11 +72,7 @@ export const takeBulkSnapshotForVideosWorker = async (job: Job) => {
return `DONE`; return `DONE`;
} catch (e) { } catch (e) {
if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") { if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") {
logger.warn( logger.warn(`No available proxy for bulk request now.`, "mq", "fn:takeBulkSnapshotForVideosWorker");
`No available proxy for bulk request now.`,
"mq",
"fn:takeBulkSnapshotForVideosWorker",
);
await bulkSetSnapshotStatus(sql, ids, "no_proxy"); await bulkSetSnapshotStatus(sql, ids, "no_proxy");
await bulkScheduleSnapshot(sql, aidsToFetch, "normal", Date.now() + 20 * MINUTE * Math.random()); await bulkScheduleSnapshot(sql, aidsToFetch, "normal", Date.now() + 20 * MINUTE * Math.random());
return; return;

View File

@ -2,13 +2,13 @@ import { Queue, ConnectionOptions } from "bullmq";
import { redis } from "@core/db/redis.ts"; import { redis } from "@core/db/redis.ts";
export const LatestVideosQueue = new Queue("latestVideos", { export const LatestVideosQueue = new Queue("latestVideos", {
connection: redis as ConnectionOptions connection: redis as ConnectionOptions
}); });
export const ClassifyVideoQueue = new Queue("classifyVideo", { export const ClassifyVideoQueue = new Queue("classifyVideo", {
connection: redis as ConnectionOptions connection: redis as ConnectionOptions
}); });
export const SnapshotQueue = new Queue("snapshot", { export const SnapshotQueue = new Queue("snapshot", {
connection: redis as ConnectionOptions connection: redis as ConnectionOptions
}); });

View File

@ -62,8 +62,8 @@ export async function initMQ() {
}); });
await SnapshotQueue.upsertJobScheduler("dispatchArchiveSnapshots", { await SnapshotQueue.upsertJobScheduler("dispatchArchiveSnapshots", {
every: 6 * HOUR, every: 2 * HOUR,
immediately: true immediately: false
}); });
await SnapshotQueue.upsertJobScheduler("scheduleCleanup", { await SnapshotQueue.upsertJobScheduler("scheduleCleanup", {

View File

@ -33,7 +33,7 @@ export const getAdjustedShortTermETA = async (sql: Psql, aid: number) => {
if (!snapshotsEnough) return 0; if (!snapshotsEnough) return 0;
const currentTimestamp = new Date().getTime(); const currentTimestamp = new Date().getTime();
const timeIntervals = [3 * MINUTE, 20 * MINUTE, 1 * HOUR, 3 * HOUR, 6 * HOUR, 72 * HOUR]; const timeIntervals = [3 * MINUTE, 20 * MINUTE, HOUR, 3 * HOUR, 6 * HOUR, 72 * HOUR];
const DELTA = 0.00001; const DELTA = 0.00001;
let minETAHours = Infinity; let minETAHours = Infinity;

View File

@ -25,5 +25,5 @@ export async function insertIntoSongs(sql: Psql, aid: number) {
(SELECT duration FROM bilibili_metadata WHERE aid = ${aid}) (SELECT duration FROM bilibili_metadata WHERE aid = ${aid})
) )
ON CONFLICT DO NOTHING ON CONFLICT DO NOTHING
` `;
} }

View File

@ -18,9 +18,9 @@ export async function insertVideoInfo(sql: Psql, aid: number) {
const bvid = data.View.bvid; const bvid = data.View.bvid;
const desc = data.View.desc; const desc = data.View.desc;
const uid = data.View.owner.mid; const uid = data.View.owner.mid;
const tags = data.Tags const tags = data.Tags.filter((tag) => !["old_channel", "topic"].indexOf(tag.tag_type))
.filter((tag) => !["old_channel", "topic"].indexOf(tag.tag_type)) .map((tag) => tag.tag_name)
.map((tag) => tag.tag_name).join(","); .join(",");
const title = data.View.title; const title = data.View.title;
const published_at = formatTimestampToPsql(data.View.pubdate * SECOND + 8 * HOUR); const published_at = formatTimestampToPsql(data.View.pubdate * SECOND + 8 * HOUR);
const duration = data.View.duration; const duration = data.View.duration;
@ -55,7 +55,7 @@ export async function insertVideoInfo(sql: Psql, aid: number) {
${stat.share}, ${stat.share},
${stat.favorite} ${stat.favorite}
) )
` `;
logger.log(`Inserted video metadata for aid: ${aid}`, "mq"); logger.log(`Inserted video metadata for aid: ${aid}`, "mq");
await ClassifyVideoQueue.add("classifyVideo", { aid }); await ClassifyVideoQueue.add("classifyVideo", { aid });

View File

@ -24,11 +24,7 @@ export interface SnapshotNumber {
* - The native `fetch` function threw an error: with error code `FETCH_ERROR` * - The native `fetch` function threw an error: with error code `FETCH_ERROR`
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR` * - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
*/ */
export async function insertVideoSnapshot( export async function insertVideoSnapshot(sql: Psql, aid: number, task: string): Promise<number | SnapshotNumber> {
sql: Psql,
aid: number,
task: string,
): Promise<number | SnapshotNumber> {
const data = await getVideoInfo(aid, task); const data = await getVideoInfo(aid, task);
if (typeof data == "number") { if (typeof data == "number") {
return data; return data;
@ -42,10 +38,10 @@ export async function insertVideoSnapshot(
const shares = data.stat.share; const shares = data.stat.share;
const favorites = data.stat.favorite; const favorites = data.stat.favorite;
await sql` await sql`
INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites) INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites)
VALUES (${aid}, ${views}, ${danmakus}, ${replies}, ${likes}, ${coins}, ${shares}, ${favorites}) VALUES (${aid}, ${views}, ${danmakus}, ${replies}, ${likes}, ${coins}, ${shares}, ${favorites})
` `;
logger.log(`Taken snapshot for video ${aid}.`, "net", "fn:insertVideoSnapshot"); logger.log(`Taken snapshot for video ${aid}.`, "net", "fn:insertVideoSnapshot");
@ -58,6 +54,6 @@ export async function insertVideoSnapshot(
coins, coins,
shares, shares,
favorites, favorites,
time, time
}; };
} }

View File

@ -6,9 +6,7 @@ import logger from "@core/log/logger.ts";
import { LatestVideosQueue } from "mq/index.ts"; import { LatestVideosQueue } from "mq/index.ts";
import type { Psql } from "@core/db/psql.d.ts"; import type { Psql } from "@core/db/psql.d.ts";
export async function queueLatestVideos( export async function queueLatestVideos(sql: Psql): Promise<number | null> {
sql: Psql,
): Promise<number | null> {
let page = 1; let page = 1;
let i = 0; let i = 0;
const videosFound = new Set(); const videosFound = new Set();
@ -26,14 +24,18 @@ export async function queueLatestVideos(
if (videoExists) { if (videoExists) {
continue; continue;
} }
await LatestVideosQueue.add("getVideoInfo", { aid }, { await LatestVideosQueue.add(
delay, "getVideoInfo",
attempts: 100, { aid },
backoff: { {
type: "fixed", delay,
delay: SECOND * 5, attempts: 100,
}, backoff: {
}); type: "fixed",
delay: SECOND * 5
}
}
);
videosFound.add(aid); videosFound.add(aid);
allExists = false; allExists = false;
delay += Math.random() * SECOND * 1.5; delay += Math.random() * SECOND * 1.5;
@ -42,7 +44,7 @@ export async function queueLatestVideos(
logger.log( logger.log(
`Page ${page} crawled, total: ${videosFound.size}/${i} videos added/observed.`, `Page ${page} crawled, total: ${videosFound.size}/${i} videos added/observed.`,
"net", "net",
"fn:queueLatestVideos()", "fn:queueLatestVideos()"
); );
if (allExists) { if (allExists) {
return 0; return 0;

View File

@ -1,6 +1,6 @@
import { findClosestSnapshot, findSnapshotBefore, getLatestSnapshot } from "db/snapshotSchedule.ts"; import { findClosestSnapshot, findSnapshotBefore, getLatestSnapshot } from "db/snapshotSchedule.ts";
import { HOUR } from "@core/const/time.ts"; import { HOUR } from "@core/const/time.ts";
import type { Psql } from "@core/db/psql"; import type { Psql } from "@core/db/psql.d.ts";
export const getRegularSnapshotInterval = async (sql: Psql, aid: number) => { export const getRegularSnapshotInterval = async (sql: Psql, aid: number) => {
const now = Date.now(); const now = Date.now();

View File

@ -2,14 +2,10 @@ import { sql } from "@core/db/dbNew";
import logger from "@core/log/logger.ts"; import logger from "@core/log/logger.ts";
export async function removeAllTimeoutSchedules() { export async function removeAllTimeoutSchedules() {
logger.log( logger.log("Too many timeout schedules, directly removing these schedules...", "mq", "fn:scheduleCleanupWorker");
"Too many timeout schedules, directly removing these schedules...",
"mq",
"fn:scheduleCleanupWorker",
);
return await sql` return await sql`
DELETE FROM snapshot_schedule DELETE FROM snapshot_schedule
WHERE status IN ('pending', 'processing') WHERE status IN ('pending', 'processing')
AND started_at < NOW() - INTERVAL '30 minutes' AND started_at < NOW() - INTERVAL '30 minutes'
`; `;
} }

View File

@ -7,7 +7,8 @@
"worker:filter": "bun run ./build/filterWorker.js", "worker:filter": "bun run ./build/filterWorker.js",
"adder": "bun run ./src/jobAdder.ts", "adder": "bun run ./src/jobAdder.ts",
"bullui": "bun run ./src/bullui.ts", "bullui": "bun run ./src/bullui.ts",
"all": "bun run concurrently --restart-tries -1 'bun run worker:main' 'bun run adder' 'bun run bullui' 'bun run worker:filter'" "all": "bun run concurrently --restart-tries -1 'bun run worker:main' 'bun run adder' 'bun run worker:filter'",
"format": "prettier --write ."
}, },
"devDependencies": { "devDependencies": {
"concurrently": "^9.1.2" "concurrently": "^9.1.2"

View File

@ -3,13 +3,12 @@ import Bun from "bun";
await Bun.build({ await Bun.build({
entrypoints: ["./src/filterWorker.ts"], entrypoints: ["./src/filterWorker.ts"],
outdir: "./build", outdir: "./build",
target: "node" target: "node"
}); });
const file = Bun.file("./build/filterWorker.js"); const file = Bun.file("./build/filterWorker.js");
const code = await file.text(); const code = await file.text();
const modifiedCode = code.replaceAll("../bin/napi-v3/", "../../../node_modules/onnxruntime-node/bin/napi-v3/"); const modifiedCode = code.replaceAll("../bin/napi-v3/", "../../../node_modules/onnxruntime-node/bin/napi-v3/");
await Bun.write("./build/filterWorker.js", modifiedCode); await Bun.write("./build/filterWorker.js", modifiedCode);

View File

@ -11,9 +11,9 @@ createBullBoard({
queues: [ queues: [
new BullMQAdapter(LatestVideosQueue), new BullMQAdapter(LatestVideosQueue),
new BullMQAdapter(ClassifyVideoQueue), new BullMQAdapter(ClassifyVideoQueue),
new BullMQAdapter(SnapshotQueue), new BullMQAdapter(SnapshotQueue)
], ],
serverAdapter: serverAdapter, serverAdapter: serverAdapter
}); });
const app = express(); const app = express();

View File

@ -7,13 +7,13 @@ import { lockManager } from "@core/mq/lockManager.ts";
import Akari from "ml/akari.ts"; import Akari from "ml/akari.ts";
const shutdown = async (signal: string) => { const shutdown = async (signal: string) => {
logger.log(`${signal} Received: Shutting down workers...`, "mq"); logger.log(`${signal} Received: Shutting down workers...`, "mq");
await filterWorker.close(true); await filterWorker.close(true);
process.exit(0); process.exit(0);
}; };
process.on('SIGINT', () => shutdown('SIGINT')); process.on("SIGINT", () => shutdown("SIGINT"));
process.on('SIGTERM', () => shutdown('SIGTERM')); process.on("SIGTERM", () => shutdown("SIGTERM"));
await Akari.init(); await Akari.init();
@ -29,7 +29,7 @@ const filterWorker = new Worker(
break; break;
} }
}, },
{ connection: redis as ConnectionOptions, concurrency: 2, removeOnComplete: { count: 1000 } }, { connection: redis as ConnectionOptions, concurrency: 2, removeOnComplete: { count: 1000 } }
); );
filterWorker.on("active", () => { filterWorker.on("active", () => {

View File

@ -10,7 +10,7 @@ import {
scheduleCleanupWorker, scheduleCleanupWorker,
snapshotTickWorker, snapshotTickWorker,
snapshotVideoWorker, snapshotVideoWorker,
takeBulkSnapshotForVideosWorker, takeBulkSnapshotForVideosWorker
} from "mq/exec/executors.ts"; } from "mq/exec/executors.ts";
import { redis } from "@core/db/redis.ts"; import { redis } from "@core/db/redis.ts";
import logger from "@core/log/logger.ts"; import logger from "@core/log/logger.ts";
@ -30,15 +30,15 @@ const releaseAllLocks = async () => {
}; };
const shutdown = async (signal: string) => { const shutdown = async (signal: string) => {
logger.log(`${signal} Received: Shutting down workers...`, "mq"); logger.log(`${signal} Received: Shutting down workers...`, "mq");
await releaseAllLocks(); await releaseAllLocks();
await latestVideoWorker.close(true); await latestVideoWorker.close(true);
await snapshotWorker.close(true); await snapshotWorker.close(true);
process.exit(0); process.exit(0);
}; };
process.on('SIGINT', () => shutdown('SIGINT')); process.on("SIGINT", () => shutdown("SIGINT"));
process.on('SIGTERM', () => shutdown('SIGTERM')); process.on("SIGTERM", () => shutdown("SIGTERM"));
const latestVideoWorker = new Worker( const latestVideoWorker = new Worker(
"latestVideos", "latestVideos",
@ -58,8 +58,8 @@ const latestVideoWorker = new Worker(
connection: redis as ConnectionOptions, connection: redis as ConnectionOptions,
concurrency: 6, concurrency: 6,
removeOnComplete: { count: 1440 }, removeOnComplete: { count: 1440 },
removeOnFail: { count: 0 }, removeOnFail: { count: 0 }
}, }
); );
latestVideoWorker.on("active", () => { latestVideoWorker.on("active", () => {
@ -95,7 +95,7 @@ const snapshotWorker = new Worker(
break; break;
} }
}, },
{ connection: redis as ConnectionOptions, concurrency: 50, removeOnComplete: { count: 2000 } }, { connection: redis as ConnectionOptions, concurrency: 50, removeOnComplete: { count: 2000 } }
); );
snapshotWorker.on("error", (err) => { snapshotWorker.on("error", (err) => {

View File

@ -56,65 +56,65 @@ const databasePreparationQuery = `
CREATE INDEX idx_snapshot_schedule_status ON snapshot_schedule USING btree (status); CREATE INDEX idx_snapshot_schedule_status ON snapshot_schedule USING btree (status);
CREATE INDEX idx_snapshot_schedule_type ON snapshot_schedule USING btree (type); CREATE INDEX idx_snapshot_schedule_type ON snapshot_schedule USING btree (type);
CREATE UNIQUE INDEX snapshot_schedule_pkey ON snapshot_schedule USING btree (id); CREATE UNIQUE INDEX snapshot_schedule_pkey ON snapshot_schedule USING btree (id);
` `;
const cleanUpQuery = ` const cleanUpQuery = `
DROP SEQUENCE IF EXISTS "snapshot_schedule_id_seq" CASCADE; DROP SEQUENCE IF EXISTS "snapshot_schedule_id_seq" CASCADE;
DROP TABLE IF EXISTS "snapshot_schedule" CASCADE; DROP TABLE IF EXISTS "snapshot_schedule" CASCADE;
` `;
async function testMocking() { async function testMocking() {
await sql.begin(async tx => { await sql.begin(async (tx) => {
await tx.unsafe(cleanUpQuery).simple(); await tx.unsafe(cleanUpQuery).simple();
await tx.unsafe(databasePreparationQuery).simple(); await tx.unsafe(databasePreparationQuery).simple();
await tx` await tx`
INSERT INTO snapshot_schedule INSERT INTO snapshot_schedule
${sql(mockSnapshotSchedules, 'aid', 'created_at', 'finished_at', 'id', 'started_at', 'status', 'type')} ${sql(mockSnapshotSchedules, "aid", "created_at", "finished_at", "id", "started_at", "status", "type")}
`; `;
await tx` await tx`
ROLLBACK; ROLLBACK;
` `;
await tx.unsafe(cleanUpQuery).simple(); await tx.unsafe(cleanUpQuery).simple();
return; return;
}); });
} }
async function testBulkSetSnapshotStatus() { async function testBulkSetSnapshotStatus() {
return await sql.begin(async tx => { return await sql.begin(async (tx) => {
await tx.unsafe(cleanUpQuery).simple(); await tx.unsafe(cleanUpQuery).simple();
await tx.unsafe(databasePreparationQuery).simple(); await tx.unsafe(databasePreparationQuery).simple();
await tx` await tx`
INSERT INTO snapshot_schedule INSERT INTO snapshot_schedule
${sql(mockSnapshotSchedules, 'aid', 'created_at', 'finished_at', 'id', 'started_at', 'status', 'type')} ${sql(mockSnapshotSchedules, "aid", "created_at", "finished_at", "id", "started_at", "status", "type")}
`; `;
const ids = [1, 2, 3]; const ids = [1, 2, 3];
await bulkSetSnapshotStatus(tx, ids, 'pending')
const rows = tx<{status: string}[]>` await bulkSetSnapshotStatus(tx, ids, "pending");
const rows = tx<{ status: string }[]>`
SELECT status FROM snapshot_schedule WHERE id = 1; SELECT status FROM snapshot_schedule WHERE id = 1;
`.execute(); `.execute();
await tx` await tx`
ROLLBACK; ROLLBACK;
` `;
await tx.unsafe(cleanUpQuery).simple(); await tx.unsafe(cleanUpQuery).simple();
return rows; return rows;
}); });
} }
test("data mocking works", async () => { test("data mocking works", async () => {
await testMocking(); await testMocking();
expect(() => {}).not.toThrowError(); expect(() => {}).not.toThrowError();
}); });
test("bulkSetSnapshotStatus core logic works smoothly", async () => { test("bulkSetSnapshotStatus core logic works smoothly", async () => {
const rows = await testBulkSetSnapshotStatus(); const rows = await testBulkSetSnapshotStatus();
expect(rows.every(item => item.status === 'pending')).toBe(true); expect(rows.every((item) => item.status === "pending")).toBe(true);
}); });

View File

@ -1,6 +1,6 @@
export function formatTimestampToPsql(timestamp: number) { export function formatTimestampToPsql(timestamp: number) {
const date = new Date(timestamp); const date = new Date(timestamp);
return date.toISOString().slice(0, 23).replace("T", " "); return date.toISOString().slice(0, 23).replace("T", " ") + "+08";
} }
export function parseTimestampFromPsql(timestamp: string) { export function parseTimestampFromPsql(timestamp: string) {

View File

@ -1,6 +1,6 @@
import { defineConfig } from "vitest/config"; import { defineConfig } from "vitest/config";
import tsconfigPaths from 'vite-tsconfig-paths' import tsconfigPaths from "vite-tsconfig-paths";
export default defineConfig({ export default defineConfig({
plugins: [tsconfigPaths()] plugins: [tsconfigPaths()]
}); });