update: schedule archive snapshots to next Saturday midnight
fix: no expire when acquiring lock for classifyVideos ref: format
This commit is contained in:
parent
2b0497c83a
commit
1a20d5afe0
1
bun.lock
1
bun.lock
@ -7,6 +7,7 @@
|
|||||||
"postgres": "^3.4.5",
|
"postgres": "^3.4.5",
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
|
"prettier": "^3.5.3",
|
||||||
"vite-tsconfig-paths": "^5.1.4",
|
"vite-tsconfig-paths": "^5.1.4",
|
||||||
"vitest": "^3.1.2",
|
"vitest": "^3.1.2",
|
||||||
"vitest-tsconfig-paths": "^3.4.1",
|
"vitest-tsconfig-paths": "^3.4.1",
|
||||||
|
37
package.json
37
package.json
@ -1,20 +1,21 @@
|
|||||||
{
|
{
|
||||||
"name": "cvsa",
|
"name": "cvsa",
|
||||||
"version": "2.13.22",
|
"version": "2.13.22",
|
||||||
"private": false,
|
"private": false,
|
||||||
"type": "module",
|
"type": "module",
|
||||||
"workspaces": [
|
"workspaces": [
|
||||||
"packages/frontend",
|
"packages/frontend",
|
||||||
"packages/core",
|
"packages/core",
|
||||||
"packages/backend",
|
"packages/backend",
|
||||||
"packages/crawler"
|
"packages/crawler"
|
||||||
],
|
],
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"postgres": "^3.4.5"
|
"postgres": "^3.4.5"
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"vite-tsconfig-paths": "^5.1.4",
|
"vite-tsconfig-paths": "^5.1.4",
|
||||||
"vitest": "^3.1.2",
|
"vitest": "^3.1.2",
|
||||||
"vitest-tsconfig-paths": "^3.4.1"
|
"vitest-tsconfig-paths": "^3.4.1",
|
||||||
}
|
"prettier": "^3.5.3"
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -3,75 +3,75 @@ import { AllDataType, BiliUserType } from "@core/db/schema";
|
|||||||
import { AkariModelVersion } from "ml/const";
|
import { AkariModelVersion } from "ml/const";
|
||||||
|
|
||||||
export async function videoExistsInAllData(sql: Psql, aid: number) {
|
export async function videoExistsInAllData(sql: Psql, aid: number) {
|
||||||
const rows = await sql<{ exists: boolean }[]>`
|
const rows = await sql<{ exists: boolean }[]>`
|
||||||
SELECT EXISTS(SELECT 1 FROM bilibili_metadata WHERE aid = ${aid})
|
SELECT EXISTS(SELECT 1 FROM bilibili_metadata WHERE aid = ${aid})
|
||||||
`;
|
`;
|
||||||
return rows[0].exists;
|
return rows[0].exists;
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function userExistsInBiliUsers(sql: Psql, uid: number) {
|
export async function userExistsInBiliUsers(sql: Psql, uid: number) {
|
||||||
const rows = await sql<{ exists: boolean }[]>`
|
const rows = await sql<{ exists: boolean }[]>`
|
||||||
SELECT EXISTS(SELECT 1 FROM bilibili_user WHERE uid = ${uid})
|
SELECT EXISTS(SELECT 1 FROM bilibili_user WHERE uid = ${uid})
|
||||||
`;
|
`;
|
||||||
return rows[0].exists;
|
return rows[0].exists;
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function getUnlabelledVideos(sql: Psql) {
|
export async function getUnlabelledVideos(sql: Psql) {
|
||||||
const rows = await sql<{ aid: number }[]>`
|
const rows = await sql<{ aid: number }[]>`
|
||||||
SELECT a.aid FROM bilibili_metadata a LEFT JOIN labelling_result l ON a.aid = l.aid WHERE l.aid IS NULL
|
SELECT a.aid FROM bilibili_metadata a LEFT JOIN labelling_result l ON a.aid = l.aid WHERE l.aid IS NULL
|
||||||
`;
|
`;
|
||||||
return rows.map((row) => row.aid);
|
return rows.map((row) => row.aid);
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function insertVideoLabel(sql: Psql, aid: number, label: number) {
|
export async function insertVideoLabel(sql: Psql, aid: number, label: number) {
|
||||||
await sql`
|
await sql`
|
||||||
INSERT INTO labelling_result (aid, label, model_version) VALUES (${aid}, ${label}, ${AkariModelVersion}) ON CONFLICT (aid, model_version) DO NOTHING
|
INSERT INTO labelling_result (aid, label, model_version) VALUES (${aid}, ${label}, ${AkariModelVersion}) ON CONFLICT (aid, model_version) DO NOTHING
|
||||||
`;
|
`;
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function getVideoInfoFromAllData(sql: Psql, aid: number) {
|
export async function getVideoInfoFromAllData(sql: Psql, aid: number) {
|
||||||
const rows = await sql<AllDataType[]>`
|
const rows = await sql<AllDataType[]>`
|
||||||
SELECT * FROM bilibili_metadata WHERE aid = ${aid}
|
SELECT * FROM bilibili_metadata WHERE aid = ${aid}
|
||||||
`;
|
`;
|
||||||
const row = rows[0];
|
const row = rows[0];
|
||||||
let authorInfo = "";
|
let authorInfo = "";
|
||||||
if (row.uid && await userExistsInBiliUsers(sql, row.uid)) {
|
if (row.uid && (await userExistsInBiliUsers(sql, row.uid))) {
|
||||||
const userRows = await sql<BiliUserType[]>`
|
const userRows = await sql<BiliUserType[]>`
|
||||||
SELECT * FROM bilibili_user WHERE uid = ${row.uid}
|
SELECT * FROM bilibili_user WHERE uid = ${row.uid}
|
||||||
`;
|
`;
|
||||||
const userRow = userRows[0];
|
const userRow = userRows[0];
|
||||||
if (userRow) {
|
if (userRow) {
|
||||||
authorInfo = userRow.desc;
|
authorInfo = userRow.desc;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return {
|
return {
|
||||||
title: row.title,
|
title: row.title,
|
||||||
description: row.description,
|
description: row.description,
|
||||||
tags: row.tags,
|
tags: row.tags,
|
||||||
author_info: authorInfo,
|
author_info: authorInfo
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function getUnArchivedBiliUsers(sql: Psql) {
|
export async function getUnArchivedBiliUsers(sql: Psql) {
|
||||||
const rows = await sql<{ uid: number }[]>`
|
const rows = await sql<{ uid: number }[]>`
|
||||||
SELECT ad.uid
|
SELECT ad.uid
|
||||||
FROM bilibili_metadata ad
|
FROM bilibili_metadata ad
|
||||||
LEFT JOIN bilibili_user bu ON ad.uid = bu.uid
|
LEFT JOIN bilibili_user bu ON ad.uid = bu.uid
|
||||||
WHERE bu.uid IS NULL;
|
WHERE bu.uid IS NULL;
|
||||||
`;
|
`;
|
||||||
return rows.map((row) => row.uid);
|
return rows.map((row) => row.uid);
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function setBiliVideoStatus(sql: Psql, aid: number, status: number) {
|
export async function setBiliVideoStatus(sql: Psql, aid: number, status: number) {
|
||||||
await sql`
|
await sql`
|
||||||
UPDATE bilibili_metadata SET status = ${status} WHERE aid = ${aid}
|
UPDATE bilibili_metadata SET status = ${status} WHERE aid = ${aid}
|
||||||
`;
|
`;
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function getBiliVideoStatus(sql: Psql, aid: number) {
|
export async function getBiliVideoStatus(sql: Psql, aid: number) {
|
||||||
const rows = await sql<{ status: number }[]>`
|
const rows = await sql<{ status: number }[]>`
|
||||||
SELECT status FROM bilibili_metadata WHERE aid = ${aid}
|
SELECT status FROM bilibili_metadata WHERE aid = ${aid}
|
||||||
`;
|
`;
|
||||||
if (rows.length === 0) return 0;
|
if (rows.length === 0) return 0;
|
||||||
return rows[0].status;
|
return rows[0].status;
|
||||||
}
|
}
|
@ -22,7 +22,7 @@ export async function getVideosNearMilestone(sql: Psql) {
|
|||||||
return queryResult.map((row) => {
|
return queryResult.map((row) => {
|
||||||
return {
|
return {
|
||||||
...row,
|
...row,
|
||||||
aid: Number(row.aid),
|
aid: Number(row.aid)
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@ -40,7 +40,7 @@ export async function getLatestVideoSnapshot(sql: Psql, aid: number): Promise<nu
|
|||||||
return {
|
return {
|
||||||
...row,
|
...row,
|
||||||
aid: Number(row.aid),
|
aid: Number(row.aid),
|
||||||
time: new Date(row.time).getTime(),
|
time: new Date(row.time).getTime()
|
||||||
};
|
};
|
||||||
})[0];
|
})[0];
|
||||||
}
|
}
|
||||||
|
@ -79,7 +79,7 @@ export async function videoHasProcessingSchedule(sql: Psql, aid: number) {
|
|||||||
FROM snapshot_schedule
|
FROM snapshot_schedule
|
||||||
WHERE aid = ${aid}
|
WHERE aid = ${aid}
|
||||||
AND status = 'processing'
|
AND status = 'processing'
|
||||||
`
|
`;
|
||||||
return rows.length > 0;
|
return rows.length > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -90,7 +90,7 @@ export async function bulkGetVideosWithoutProcessingSchedules(sql: Psql, aids: n
|
|||||||
WHERE aid = ANY(${aids})
|
WHERE aid = ANY(${aids})
|
||||||
AND status != 'processing'
|
AND status != 'processing'
|
||||||
GROUP BY aid
|
GROUP BY aid
|
||||||
`
|
`;
|
||||||
return rows.map((row) => Number(row.aid));
|
return rows.map((row) => Number(row.aid));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -346,6 +346,6 @@ export async function getAllVideosWithoutActiveSnapshotSchedule(psql: Psql) {
|
|||||||
FROM bilibili_metadata s
|
FROM bilibili_metadata s
|
||||||
LEFT JOIN snapshot_schedule ss ON s.aid = ss.aid AND (ss.status = 'pending' OR ss.status = 'processing')
|
LEFT JOIN snapshot_schedule ss ON s.aid = ss.aid AND (ss.status = 'pending' OR ss.status = 'processing')
|
||||||
WHERE ss.aid IS NULL
|
WHERE ss.aid IS NULL
|
||||||
`
|
`;
|
||||||
return rows.map((r) => Number(r.aid));
|
return rows.map((r) => Number(r.aid));
|
||||||
}
|
}
|
||||||
|
@ -2,7 +2,7 @@ import type { Psql } from "@core/db/psql.d.ts";
|
|||||||
import { parseTimestampFromPsql } from "utils/formatTimestampToPostgre.ts";
|
import { parseTimestampFromPsql } from "utils/formatTimestampToPostgre.ts";
|
||||||
|
|
||||||
export async function getNotCollectedSongs(sql: Psql) {
|
export async function getNotCollectedSongs(sql: Psql) {
|
||||||
const rows = await sql<{ aid: number }[]>`
|
const rows = await sql<{ aid: number }[]>`
|
||||||
SELECT lr.aid
|
SELECT lr.aid
|
||||||
FROM labelling_result lr
|
FROM labelling_result lr
|
||||||
WHERE lr.label != 0
|
WHERE lr.label != 0
|
||||||
@ -12,28 +12,28 @@ export async function getNotCollectedSongs(sql: Psql) {
|
|||||||
WHERE s.aid = lr.aid
|
WHERE s.aid = lr.aid
|
||||||
);
|
);
|
||||||
`;
|
`;
|
||||||
return rows.map((row) => row.aid);
|
return rows.map((row) => row.aid);
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function aidExistsInSongs(sql: Psql, aid: number) {
|
export async function aidExistsInSongs(sql: Psql, aid: number) {
|
||||||
const rows = await sql<{ exists: boolean }[]>`
|
const rows = await sql<{ exists: boolean }[]>`
|
||||||
SELECT EXISTS (
|
SELECT EXISTS (
|
||||||
SELECT 1
|
SELECT 1
|
||||||
FROM songs
|
FROM songs
|
||||||
WHERE aid = ${aid}
|
WHERE aid = ${aid}
|
||||||
);
|
);
|
||||||
`;
|
`;
|
||||||
return rows[0].exists;
|
return rows[0].exists;
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function getSongsPublihsedAt(sql: Psql, aid: number) {
|
export async function getSongsPublihsedAt(sql: Psql, aid: number) {
|
||||||
const rows = await sql<{ published_at: string }[]>`
|
const rows = await sql<{ published_at: string }[]>`
|
||||||
SELECT published_at
|
SELECT published_at
|
||||||
FROM songs
|
FROM songs
|
||||||
WHERE aid = ${aid};
|
WHERE aid = ${aid};
|
||||||
`;
|
`;
|
||||||
if (rows.length === 0) {
|
if (rows.length === 0) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
return parseTimestampFromPsql(rows[0].published_at);
|
return parseTimestampFromPsql(rows[0].published_at);
|
||||||
}
|
}
|
@ -16,8 +16,8 @@ class AkariProto extends AIManager {
|
|||||||
constructor() {
|
constructor() {
|
||||||
super();
|
super();
|
||||||
this.models = {
|
this.models = {
|
||||||
"classifier": onnxClassifierPath,
|
classifier: onnxClassifierPath,
|
||||||
"embedding": onnxEmbeddingPath,
|
embedding: onnxEmbeddingPath
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -55,7 +55,7 @@ class AkariProto extends AIManager {
|
|||||||
|
|
||||||
const { input_ids } = await tokenizer(texts, {
|
const { input_ids } = await tokenizer(texts, {
|
||||||
add_special_tokens: false,
|
add_special_tokens: false,
|
||||||
return_tensor: false,
|
return_tensor: false
|
||||||
});
|
});
|
||||||
|
|
||||||
const cumsum = (arr: number[]): number[] =>
|
const cumsum = (arr: number[]): number[] =>
|
||||||
@ -66,9 +66,9 @@ class AkariProto extends AIManager {
|
|||||||
|
|
||||||
const inputs = {
|
const inputs = {
|
||||||
input_ids: new ort.Tensor("int64", new BigInt64Array(flattened_input_ids.map(BigInt)), [
|
input_ids: new ort.Tensor("int64", new BigInt64Array(flattened_input_ids.map(BigInt)), [
|
||||||
flattened_input_ids.length,
|
flattened_input_ids.length
|
||||||
]),
|
]),
|
||||||
offsets: new ort.Tensor("int64", new BigInt64Array(offsets.map(BigInt)), [offsets.length]),
|
offsets: new ort.Tensor("int64", new BigInt64Array(offsets.map(BigInt)), [offsets.length])
|
||||||
};
|
};
|
||||||
|
|
||||||
const { embeddings } = await session.run(inputs);
|
const { embeddings } = await session.run(inputs);
|
||||||
@ -77,21 +77,14 @@ class AkariProto extends AIManager {
|
|||||||
|
|
||||||
private async runClassification(embeddings: number[]): Promise<number[]> {
|
private async runClassification(embeddings: number[]): Promise<number[]> {
|
||||||
const session = this.getModelSession("classifier");
|
const session = this.getModelSession("classifier");
|
||||||
const inputTensor = new ort.Tensor(
|
const inputTensor = new ort.Tensor(Float32Array.from(embeddings), [1, 3, 1024]);
|
||||||
Float32Array.from(embeddings),
|
|
||||||
[1, 3, 1024],
|
|
||||||
);
|
|
||||||
|
|
||||||
const { logits } = await session.run({ channel_features: inputTensor });
|
const { logits } = await session.run({ channel_features: inputTensor });
|
||||||
return this.softmax(logits.data as Float32Array);
|
return this.softmax(logits.data as Float32Array);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async classifyVideo(title: string, description: string, tags: string, aid?: number): Promise<number> {
|
public async classifyVideo(title: string, description: string, tags: string, aid?: number): Promise<number> {
|
||||||
const embeddings = await this.getJinaEmbeddings1024([
|
const embeddings = await this.getJinaEmbeddings1024([title, description, tags]);
|
||||||
title,
|
|
||||||
description,
|
|
||||||
tags,
|
|
||||||
]);
|
|
||||||
const probabilities = await this.runClassification(embeddings);
|
const probabilities = await this.runClassification(embeddings);
|
||||||
if (aid) {
|
if (aid) {
|
||||||
logger.log(`Prediction result for aid: ${aid}: [${probabilities.map((p) => p.toFixed(5))}]`, "ml");
|
logger.log(`Prediction result for aid: ${aid}: [${probabilities.map((p) => p.toFixed(5))}]`, "ml");
|
||||||
|
@ -6,8 +6,7 @@ export class AIManager {
|
|||||||
public sessions: { [key: string]: ort.InferenceSession } = {};
|
public sessions: { [key: string]: ort.InferenceSession } = {};
|
||||||
public models: { [key: string]: string } = {};
|
public models: { [key: string]: string } = {};
|
||||||
|
|
||||||
constructor() {
|
constructor() {}
|
||||||
}
|
|
||||||
|
|
||||||
public async init() {
|
public async init() {
|
||||||
const modelKeys = Object.keys(this.models);
|
const modelKeys = Object.keys(this.models);
|
||||||
|
@ -3,9 +3,26 @@ import { getAllVideosWithoutActiveSnapshotSchedule, scheduleSnapshot } from "db/
|
|||||||
import logger from "@core/log/logger.ts";
|
import logger from "@core/log/logger.ts";
|
||||||
import { lockManager } from "@core/mq/lockManager.ts";
|
import { lockManager } from "@core/mq/lockManager.ts";
|
||||||
import { getLatestVideoSnapshot } from "db/snapshot.ts";
|
import { getLatestVideoSnapshot } from "db/snapshot.ts";
|
||||||
import { HOUR, MINUTE } from "@core/const/time.ts";
|
import { MINUTE } from "@core/const/time.ts";
|
||||||
import { sql } from "@core/db/dbNew";
|
import { sql } from "@core/db/dbNew";
|
||||||
|
|
||||||
|
function getNextSaturdayMidnightTimestamp(): number {
|
||||||
|
const now = new Date();
|
||||||
|
const currentDay = now.getDay();
|
||||||
|
|
||||||
|
let daysUntilNextSaturday = (6 - currentDay + 7) % 7;
|
||||||
|
|
||||||
|
if (daysUntilNextSaturday === 0) {
|
||||||
|
daysUntilNextSaturday = 7;
|
||||||
|
}
|
||||||
|
|
||||||
|
const nextSaturday = new Date(now);
|
||||||
|
nextSaturday.setDate(nextSaturday.getDate() + daysUntilNextSaturday);
|
||||||
|
nextSaturday.setHours(0, 0, 0, 0);
|
||||||
|
|
||||||
|
return nextSaturday.getTime();
|
||||||
|
}
|
||||||
|
|
||||||
export const archiveSnapshotsWorker = async (_job: Job) => {
|
export const archiveSnapshotsWorker = async (_job: Job) => {
|
||||||
try {
|
try {
|
||||||
const startedAt = Date.now();
|
const startedAt = Date.now();
|
||||||
@ -20,15 +37,16 @@ export const archiveSnapshotsWorker = async (_job: Job) => {
|
|||||||
const latestSnapshot = await getLatestVideoSnapshot(sql, aid);
|
const latestSnapshot = await getLatestVideoSnapshot(sql, aid);
|
||||||
const now = Date.now();
|
const now = Date.now();
|
||||||
const lastSnapshotedAt = latestSnapshot?.time ?? now;
|
const lastSnapshotedAt = latestSnapshot?.time ?? now;
|
||||||
const interval = 168;
|
const nextSatMidnight = getNextSaturdayMidnightTimestamp();
|
||||||
|
const interval = nextSatMidnight - now;
|
||||||
logger.log(
|
logger.log(
|
||||||
`Scheduled archive snapshot for aid ${aid} in ${interval} hours.`,
|
`Scheduled archive snapshot for aid ${aid} in ${interval} hours.`,
|
||||||
"mq",
|
"mq",
|
||||||
"fn:archiveSnapshotsWorker"
|
"fn:archiveSnapshotsWorker"
|
||||||
);
|
);
|
||||||
const targetTime = lastSnapshotedAt + interval * HOUR;
|
const targetTime = lastSnapshotedAt + interval;
|
||||||
await scheduleSnapshot(sql, aid, "archive", targetTime);
|
await scheduleSnapshot(sql, aid, "archive", targetTime);
|
||||||
if (now - startedAt > 250 * MINUTE) {
|
if (now - startedAt > 30 * MINUTE) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -34,7 +34,7 @@ export const classifyVideoWorker = async (job: Job) => {
|
|||||||
|
|
||||||
await job.updateData({
|
await job.updateData({
|
||||||
...job.data,
|
...job.data,
|
||||||
label: label,
|
label: label
|
||||||
});
|
});
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
@ -46,7 +46,7 @@ export const classifyVideosWorker = async () => {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
await lockManager.acquireLock("classifyVideos");
|
await lockManager.acquireLock("classifyVideos", 5 * 60);
|
||||||
|
|
||||||
const videos = await getUnlabelledVideos(sql);
|
const videos = await getUnlabelledVideos(sql);
|
||||||
logger.log(`Found ${videos.length} unlabelled videos`);
|
logger.log(`Found ${videos.length} unlabelled videos`);
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
import { Job } from "bullmq";
|
import { Job } from "bullmq";
|
||||||
import { collectSongs } from "mq/task/collectSongs.ts";
|
import { collectSongs } from "mq/task/collectSongs.ts";
|
||||||
|
|
||||||
export const collectSongsWorker = async (_job: Job): Promise<void> =>{
|
export const collectSongsWorker = async (_job: Job): Promise<void> => {
|
||||||
await collectSongs();
|
await collectSongs();
|
||||||
return;
|
return;
|
||||||
}
|
};
|
||||||
|
@ -26,4 +26,4 @@ export const dispatchMilestoneSnapshotsWorker = async (_job: Job) => {
|
|||||||
} catch (e) {
|
} catch (e) {
|
||||||
logger.error(e as Error, "mq", "fn:dispatchMilestoneSnapshotsWorker");
|
logger.error(e as Error, "mq", "fn:dispatchMilestoneSnapshotsWorker");
|
||||||
}
|
}
|
||||||
}
|
};
|
||||||
|
@ -1,10 +1,7 @@
|
|||||||
import { Job } from "bullmq";
|
import { Job } from "bullmq";
|
||||||
import { getLatestVideoSnapshot } from "db/snapshot.ts";
|
import { getLatestVideoSnapshot } from "db/snapshot.ts";
|
||||||
import { truncate } from "utils/truncate.ts";
|
import { truncate } from "utils/truncate.ts";
|
||||||
import {
|
import { getVideosWithoutActiveSnapshotScheduleByType, scheduleSnapshot } from "db/snapshotSchedule.ts";
|
||||||
getVideosWithoutActiveSnapshotScheduleByType,
|
|
||||||
scheduleSnapshot
|
|
||||||
} from "db/snapshotSchedule.ts";
|
|
||||||
import logger from "@core/log/logger.ts";
|
import logger from "@core/log/logger.ts";
|
||||||
import { HOUR, MINUTE, WEEK } from "@core/const/time.ts";
|
import { HOUR, MINUTE, WEEK } from "@core/const/time.ts";
|
||||||
import { lockManager } from "@core/mq/lockManager.ts";
|
import { lockManager } from "@core/mq/lockManager.ts";
|
||||||
|
@ -2,6 +2,6 @@ import { sql } from "@core/db/dbNew";
|
|||||||
import { Job } from "bullmq";
|
import { Job } from "bullmq";
|
||||||
import { queueLatestVideos } from "mq/task/queueLatestVideo.ts";
|
import { queueLatestVideos } from "mq/task/queueLatestVideo.ts";
|
||||||
|
|
||||||
export const getLatestVideosWorker = async (_job: Job): Promise<void> =>{
|
export const getLatestVideosWorker = async (_job: Job): Promise<void> => {
|
||||||
await queueLatestVideos(sql);
|
await queueLatestVideos(sql);
|
||||||
}
|
};
|
||||||
|
@ -10,4 +10,4 @@ export const getVideoInfoWorker = async (job: Job): Promise<void> => {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
await insertVideoInfo(sql, aid);
|
await insertVideoInfo(sql, aid);
|
||||||
}
|
};
|
||||||
|
@ -5,15 +5,15 @@ import {
|
|||||||
getBulkSnapshotsInNextSecond,
|
getBulkSnapshotsInNextSecond,
|
||||||
getSnapshotsInNextSecond,
|
getSnapshotsInNextSecond,
|
||||||
setSnapshotStatus,
|
setSnapshotStatus,
|
||||||
videoHasProcessingSchedule,
|
videoHasProcessingSchedule
|
||||||
} from "db/snapshotSchedule.ts";
|
} from "db/snapshotSchedule.ts";
|
||||||
import logger from "@core/log/logger.ts";
|
import logger from "@core/log/logger.ts";
|
||||||
import { SnapshotQueue } from "mq/index.ts";
|
import { SnapshotQueue } from "mq/index.ts";
|
||||||
import { sql } from "@core/db/dbNew";
|
import { sql } from "@core/db/dbNew";
|
||||||
|
|
||||||
const priorityMap: { [key: string]: number } = {
|
const priorityMap: { [key: string]: number } = {
|
||||||
"milestone": 1,
|
milestone: 1,
|
||||||
"normal": 3,
|
normal: 3
|
||||||
};
|
};
|
||||||
|
|
||||||
export const bulkSnapshotTickWorker = async (_job: Job) => {
|
export const bulkSnapshotTickWorker = async (_job: Job) => {
|
||||||
@ -35,12 +35,16 @@ export const bulkSnapshotTickWorker = async (_job: Job) => {
|
|||||||
created_at: schedule.created_at,
|
created_at: schedule.created_at,
|
||||||
started_at: schedule.started_at,
|
started_at: schedule.started_at,
|
||||||
finished_at: schedule.finished_at,
|
finished_at: schedule.finished_at,
|
||||||
status: schedule.status,
|
status: schedule.status
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
await SnapshotQueue.add("bulkSnapshotVideo", {
|
await SnapshotQueue.add(
|
||||||
schedules: schedulesData,
|
"bulkSnapshotVideo",
|
||||||
}, { priority: 3 });
|
{
|
||||||
|
schedules: schedulesData
|
||||||
|
},
|
||||||
|
{ priority: 3 }
|
||||||
|
);
|
||||||
}
|
}
|
||||||
return `OK`;
|
return `OK`;
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
@ -61,11 +65,15 @@ export const snapshotTickWorker = async (_job: Job) => {
|
|||||||
}
|
}
|
||||||
const aid = Number(schedule.aid);
|
const aid = Number(schedule.aid);
|
||||||
await setSnapshotStatus(sql, schedule.id, "processing");
|
await setSnapshotStatus(sql, schedule.id, "processing");
|
||||||
await SnapshotQueue.add("snapshotVideo", {
|
await SnapshotQueue.add(
|
||||||
aid: Number(aid),
|
"snapshotVideo",
|
||||||
id: Number(schedule.id),
|
{
|
||||||
type: schedule.type ?? "normal",
|
aid: Number(aid),
|
||||||
}, { priority });
|
id: Number(schedule.id),
|
||||||
|
type: schedule.type ?? "normal"
|
||||||
|
},
|
||||||
|
{ priority }
|
||||||
|
);
|
||||||
}
|
}
|
||||||
return `OK`;
|
return `OK`;
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
|
@ -10,9 +10,9 @@ import { NetSchedulerError } from "@core/net/delegate.ts";
|
|||||||
import { sql } from "@core/db/dbNew.ts";
|
import { sql } from "@core/db/dbNew.ts";
|
||||||
|
|
||||||
const snapshotTypeToTaskMap: { [key: string]: string } = {
|
const snapshotTypeToTaskMap: { [key: string]: string } = {
|
||||||
"milestone": "snapshotMilestoneVideo",
|
milestone: "snapshotMilestoneVideo",
|
||||||
"normal": "snapshotVideo",
|
normal: "snapshotVideo",
|
||||||
"new": "snapshotMilestoneVideo",
|
new: "snapshotMilestoneVideo"
|
||||||
};
|
};
|
||||||
|
|
||||||
export const snapshotVideoWorker = async (job: Job): Promise<void> => {
|
export const snapshotVideoWorker = async (job: Job): Promise<void> => {
|
||||||
@ -31,7 +31,7 @@ export const snapshotVideoWorker = async (job: Job): Promise<void> => {
|
|||||||
logger.warn(
|
logger.warn(
|
||||||
`Video ${aid} has status ${status} in the database. Abort snapshoting.`,
|
`Video ${aid} has status ${status} in the database. Abort snapshoting.`,
|
||||||
"mq",
|
"mq",
|
||||||
"fn:dispatchRegularSnapshotsWorker",
|
"fn:dispatchRegularSnapshotsWorker"
|
||||||
);
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -43,7 +43,7 @@ export const snapshotVideoWorker = async (job: Job): Promise<void> => {
|
|||||||
logger.warn(
|
logger.warn(
|
||||||
`Bilibili return status ${status} when snapshoting for ${aid}.`,
|
`Bilibili return status ${status} when snapshoting for ${aid}.`,
|
||||||
"mq",
|
"mq",
|
||||||
"fn:dispatchRegularSnapshotsWorker",
|
"fn:dispatchRegularSnapshotsWorker"
|
||||||
);
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -51,7 +51,7 @@ export const snapshotVideoWorker = async (job: Job): Promise<void> => {
|
|||||||
if (type === "new") {
|
if (type === "new") {
|
||||||
const publihsedAt = await getSongsPublihsedAt(sql, aid);
|
const publihsedAt = await getSongsPublihsedAt(sql, aid);
|
||||||
const timeSincePublished = stat.time - publihsedAt!;
|
const timeSincePublished = stat.time - publihsedAt!;
|
||||||
const viewsPerHour = stat.views / timeSincePublished * HOUR;
|
const viewsPerHour = (stat.views / timeSincePublished) * HOUR;
|
||||||
if (timeSincePublished > 48 * HOUR) {
|
if (timeSincePublished > 48 * HOUR) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -77,7 +77,7 @@ export const snapshotVideoWorker = async (job: Job): Promise<void> => {
|
|||||||
logger.warn(
|
logger.warn(
|
||||||
`ETA (${etaHoursString}) too long for milestone snapshot. aid: ${aid}.`,
|
`ETA (${etaHoursString}) too long for milestone snapshot. aid: ${aid}.`,
|
||||||
"mq",
|
"mq",
|
||||||
"fn:snapshotVideoWorker",
|
"fn:snapshotVideoWorker"
|
||||||
);
|
);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -86,23 +86,17 @@ export const snapshotVideoWorker = async (job: Job): Promise<void> => {
|
|||||||
await scheduleSnapshot(sql, aid, type, targetTime);
|
await scheduleSnapshot(sql, aid, type, targetTime);
|
||||||
await setSnapshotStatus(sql, id, "completed");
|
await setSnapshotStatus(sql, id, "completed");
|
||||||
return;
|
return;
|
||||||
}
|
} catch (e) {
|
||||||
catch (e) {
|
|
||||||
if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") {
|
if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") {
|
||||||
logger.warn(
|
logger.warn(`No available proxy for aid ${job.data.aid}.`, "mq", "fn:snapshotVideoWorker");
|
||||||
`No available proxy for aid ${job.data.aid}.`,
|
|
||||||
"mq",
|
|
||||||
"fn:snapshotVideoWorker",
|
|
||||||
);
|
|
||||||
await setSnapshotStatus(sql, id, "no_proxy");
|
await setSnapshotStatus(sql, id, "no_proxy");
|
||||||
await scheduleSnapshot(sql, aid, type, Date.now() + retryInterval);
|
await scheduleSnapshot(sql, aid, type, Date.now() + retryInterval);
|
||||||
return;
|
return;
|
||||||
}
|
} else if (e instanceof NetSchedulerError && e.code === "ALICLOUD_PROXY_ERR") {
|
||||||
else if (e instanceof NetSchedulerError && e.code === "ALICLOUD_PROXY_ERR") {
|
|
||||||
logger.warn(
|
logger.warn(
|
||||||
`Failed to proxy request for aid ${job.data.aid}: ${e.message}`,
|
`Failed to proxy request for aid ${job.data.aid}: ${e.message}`,
|
||||||
"mq",
|
"mq",
|
||||||
"fn:snapshotVideoWorker",
|
"fn:snapshotVideoWorker"
|
||||||
);
|
);
|
||||||
await setSnapshotStatus(sql, id, "failed");
|
await setSnapshotStatus(sql, id, "failed");
|
||||||
await scheduleSnapshot(sql, aid, type, Date.now() + retryInterval);
|
await scheduleSnapshot(sql, aid, type, Date.now() + retryInterval);
|
||||||
|
@ -3,7 +3,7 @@ import {
|
|||||||
bulkScheduleSnapshot,
|
bulkScheduleSnapshot,
|
||||||
bulkSetSnapshotStatus,
|
bulkSetSnapshotStatus,
|
||||||
scheduleSnapshot,
|
scheduleSnapshot,
|
||||||
snapshotScheduleExists,
|
snapshotScheduleExists
|
||||||
} from "db/snapshotSchedule.ts";
|
} from "db/snapshotSchedule.ts";
|
||||||
import { bulkGetVideoStats } from "net/bulkGetVideoStats.ts";
|
import { bulkGetVideoStats } from "net/bulkGetVideoStats.ts";
|
||||||
import logger from "@core/log/logger.ts";
|
import logger from "@core/log/logger.ts";
|
||||||
@ -55,7 +55,7 @@ export const takeBulkSnapshotForVideosWorker = async (job: Job) => {
|
|||||||
${shares},
|
${shares},
|
||||||
${favorites}
|
${favorites}
|
||||||
)
|
)
|
||||||
`
|
`;
|
||||||
|
|
||||||
logger.log(`Taken snapshot for video ${aid} in bulk.`, "net", "fn:takeBulkSnapshotForVideosWorker");
|
logger.log(`Taken snapshot for video ${aid} in bulk.`, "net", "fn:takeBulkSnapshotForVideosWorker");
|
||||||
}
|
}
|
||||||
@ -72,11 +72,7 @@ export const takeBulkSnapshotForVideosWorker = async (job: Job) => {
|
|||||||
return `DONE`;
|
return `DONE`;
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") {
|
if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") {
|
||||||
logger.warn(
|
logger.warn(`No available proxy for bulk request now.`, "mq", "fn:takeBulkSnapshotForVideosWorker");
|
||||||
`No available proxy for bulk request now.`,
|
|
||||||
"mq",
|
|
||||||
"fn:takeBulkSnapshotForVideosWorker",
|
|
||||||
);
|
|
||||||
await bulkSetSnapshotStatus(sql, ids, "no_proxy");
|
await bulkSetSnapshotStatus(sql, ids, "no_proxy");
|
||||||
await bulkScheduleSnapshot(sql, aidsToFetch, "normal", Date.now() + 20 * MINUTE * Math.random());
|
await bulkScheduleSnapshot(sql, aidsToFetch, "normal", Date.now() + 20 * MINUTE * Math.random());
|
||||||
return;
|
return;
|
||||||
|
@ -2,13 +2,13 @@ import { Queue, ConnectionOptions } from "bullmq";
|
|||||||
import { redis } from "@core/db/redis.ts";
|
import { redis } from "@core/db/redis.ts";
|
||||||
|
|
||||||
export const LatestVideosQueue = new Queue("latestVideos", {
|
export const LatestVideosQueue = new Queue("latestVideos", {
|
||||||
connection: redis as ConnectionOptions
|
connection: redis as ConnectionOptions
|
||||||
});
|
});
|
||||||
|
|
||||||
export const ClassifyVideoQueue = new Queue("classifyVideo", {
|
export const ClassifyVideoQueue = new Queue("classifyVideo", {
|
||||||
connection: redis as ConnectionOptions
|
connection: redis as ConnectionOptions
|
||||||
});
|
});
|
||||||
|
|
||||||
export const SnapshotQueue = new Queue("snapshot", {
|
export const SnapshotQueue = new Queue("snapshot", {
|
||||||
connection: redis as ConnectionOptions
|
connection: redis as ConnectionOptions
|
||||||
});
|
});
|
||||||
|
@ -62,8 +62,8 @@ export async function initMQ() {
|
|||||||
});
|
});
|
||||||
|
|
||||||
await SnapshotQueue.upsertJobScheduler("dispatchArchiveSnapshots", {
|
await SnapshotQueue.upsertJobScheduler("dispatchArchiveSnapshots", {
|
||||||
every: 6 * HOUR,
|
every: 2 * HOUR,
|
||||||
immediately: true
|
immediately: false
|
||||||
});
|
});
|
||||||
|
|
||||||
await SnapshotQueue.upsertJobScheduler("scheduleCleanup", {
|
await SnapshotQueue.upsertJobScheduler("scheduleCleanup", {
|
||||||
|
@ -12,13 +12,12 @@ const getFactor = (x: number) => {
|
|||||||
const c = 100;
|
const c = 100;
|
||||||
const u = 0.601;
|
const u = 0.601;
|
||||||
const g = 455;
|
const g = 455;
|
||||||
if (x>g) {
|
if (x > g) {
|
||||||
return log(b/log(x+1),a);
|
return log(b / log(x + 1), a);
|
||||||
|
} else {
|
||||||
|
return log(b / log(x + c), a) + u;
|
||||||
}
|
}
|
||||||
else {
|
};
|
||||||
return log(b/log(x+c),a)+u;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Returns the minimum ETA in hours for the next snapshot
|
* Returns the minimum ETA in hours for the next snapshot
|
||||||
|
@ -25,5 +25,5 @@ export async function insertIntoSongs(sql: Psql, aid: number) {
|
|||||||
(SELECT duration FROM bilibili_metadata WHERE aid = ${aid})
|
(SELECT duration FROM bilibili_metadata WHERE aid = ${aid})
|
||||||
)
|
)
|
||||||
ON CONFLICT DO NOTHING
|
ON CONFLICT DO NOTHING
|
||||||
`
|
`;
|
||||||
}
|
}
|
||||||
|
@ -18,9 +18,9 @@ export async function insertVideoInfo(sql: Psql, aid: number) {
|
|||||||
const bvid = data.View.bvid;
|
const bvid = data.View.bvid;
|
||||||
const desc = data.View.desc;
|
const desc = data.View.desc;
|
||||||
const uid = data.View.owner.mid;
|
const uid = data.View.owner.mid;
|
||||||
const tags = data.Tags
|
const tags = data.Tags.filter((tag) => !["old_channel", "topic"].indexOf(tag.tag_type))
|
||||||
.filter((tag) => !["old_channel", "topic"].indexOf(tag.tag_type))
|
.map((tag) => tag.tag_name)
|
||||||
.map((tag) => tag.tag_name).join(",");
|
.join(",");
|
||||||
const title = data.View.title;
|
const title = data.View.title;
|
||||||
const published_at = formatTimestampToPsql(data.View.pubdate * SECOND + 8 * HOUR);
|
const published_at = formatTimestampToPsql(data.View.pubdate * SECOND + 8 * HOUR);
|
||||||
const duration = data.View.duration;
|
const duration = data.View.duration;
|
||||||
@ -55,7 +55,7 @@ export async function insertVideoInfo(sql: Psql, aid: number) {
|
|||||||
${stat.share},
|
${stat.share},
|
||||||
${stat.favorite}
|
${stat.favorite}
|
||||||
)
|
)
|
||||||
`
|
`;
|
||||||
|
|
||||||
logger.log(`Inserted video metadata for aid: ${aid}`, "mq");
|
logger.log(`Inserted video metadata for aid: ${aid}`, "mq");
|
||||||
await ClassifyVideoQueue.add("classifyVideo", { aid });
|
await ClassifyVideoQueue.add("classifyVideo", { aid });
|
||||||
|
@ -24,11 +24,7 @@ export interface SnapshotNumber {
|
|||||||
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
|
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
|
||||||
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
|
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
|
||||||
*/
|
*/
|
||||||
export async function insertVideoSnapshot(
|
export async function insertVideoSnapshot(sql: Psql, aid: number, task: string): Promise<number | SnapshotNumber> {
|
||||||
sql: Psql,
|
|
||||||
aid: number,
|
|
||||||
task: string,
|
|
||||||
): Promise<number | SnapshotNumber> {
|
|
||||||
const data = await getVideoInfo(aid, task);
|
const data = await getVideoInfo(aid, task);
|
||||||
if (typeof data == "number") {
|
if (typeof data == "number") {
|
||||||
return data;
|
return data;
|
||||||
@ -42,10 +38,10 @@ export async function insertVideoSnapshot(
|
|||||||
const shares = data.stat.share;
|
const shares = data.stat.share;
|
||||||
const favorites = data.stat.favorite;
|
const favorites = data.stat.favorite;
|
||||||
|
|
||||||
await sql`
|
await sql`
|
||||||
INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites)
|
INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites)
|
||||||
VALUES (${aid}, ${views}, ${danmakus}, ${replies}, ${likes}, ${coins}, ${shares}, ${favorites})
|
VALUES (${aid}, ${views}, ${danmakus}, ${replies}, ${likes}, ${coins}, ${shares}, ${favorites})
|
||||||
`
|
`;
|
||||||
|
|
||||||
logger.log(`Taken snapshot for video ${aid}.`, "net", "fn:insertVideoSnapshot");
|
logger.log(`Taken snapshot for video ${aid}.`, "net", "fn:insertVideoSnapshot");
|
||||||
|
|
||||||
@ -58,6 +54,6 @@ export async function insertVideoSnapshot(
|
|||||||
coins,
|
coins,
|
||||||
shares,
|
shares,
|
||||||
favorites,
|
favorites,
|
||||||
time,
|
time
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -6,9 +6,7 @@ import logger from "@core/log/logger.ts";
|
|||||||
import { LatestVideosQueue } from "mq/index.ts";
|
import { LatestVideosQueue } from "mq/index.ts";
|
||||||
import type { Psql } from "@core/db/psql.d.ts";
|
import type { Psql } from "@core/db/psql.d.ts";
|
||||||
|
|
||||||
export async function queueLatestVideos(
|
export async function queueLatestVideos(sql: Psql): Promise<number | null> {
|
||||||
sql: Psql,
|
|
||||||
): Promise<number | null> {
|
|
||||||
let page = 1;
|
let page = 1;
|
||||||
let i = 0;
|
let i = 0;
|
||||||
const videosFound = new Set();
|
const videosFound = new Set();
|
||||||
@ -26,14 +24,18 @@ export async function queueLatestVideos(
|
|||||||
if (videoExists) {
|
if (videoExists) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
await LatestVideosQueue.add("getVideoInfo", { aid }, {
|
await LatestVideosQueue.add(
|
||||||
delay,
|
"getVideoInfo",
|
||||||
attempts: 100,
|
{ aid },
|
||||||
backoff: {
|
{
|
||||||
type: "fixed",
|
delay,
|
||||||
delay: SECOND * 5,
|
attempts: 100,
|
||||||
},
|
backoff: {
|
||||||
});
|
type: "fixed",
|
||||||
|
delay: SECOND * 5
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
videosFound.add(aid);
|
videosFound.add(aid);
|
||||||
allExists = false;
|
allExists = false;
|
||||||
delay += Math.random() * SECOND * 1.5;
|
delay += Math.random() * SECOND * 1.5;
|
||||||
@ -42,7 +44,7 @@ export async function queueLatestVideos(
|
|||||||
logger.log(
|
logger.log(
|
||||||
`Page ${page} crawled, total: ${videosFound.size}/${i} videos added/observed.`,
|
`Page ${page} crawled, total: ${videosFound.size}/${i} videos added/observed.`,
|
||||||
"net",
|
"net",
|
||||||
"fn:queueLatestVideos()",
|
"fn:queueLatestVideos()"
|
||||||
);
|
);
|
||||||
if (allExists) {
|
if (allExists) {
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -14,7 +14,7 @@ export const getRegularSnapshotInterval = async (sql: Psql, aid: number) => {
|
|||||||
if (hoursDiff < 8) return 24;
|
if (hoursDiff < 8) return 24;
|
||||||
const viewsDiff = latestSnapshot.views - oldSnapshot.views;
|
const viewsDiff = latestSnapshot.views - oldSnapshot.views;
|
||||||
if (viewsDiff === 0) return 72;
|
if (viewsDiff === 0) return 72;
|
||||||
const speedPerDay = viewsDiff / (hoursDiff + 0.001) * 24;
|
const speedPerDay = (viewsDiff / (hoursDiff + 0.001)) * 24;
|
||||||
if (speedPerDay < 6) return 36;
|
if (speedPerDay < 6) return 36;
|
||||||
if (speedPerDay < 120) return 24;
|
if (speedPerDay < 120) return 24;
|
||||||
if (speedPerDay < 320) return 12;
|
if (speedPerDay < 320) return 12;
|
||||||
|
@ -2,11 +2,7 @@ import { sql } from "@core/db/dbNew";
|
|||||||
import logger from "@core/log/logger.ts";
|
import logger from "@core/log/logger.ts";
|
||||||
|
|
||||||
export async function removeAllTimeoutSchedules() {
|
export async function removeAllTimeoutSchedules() {
|
||||||
logger.log(
|
logger.log("Too many timeout schedules, directly removing these schedules...", "mq", "fn:scheduleCleanupWorker");
|
||||||
"Too many timeout schedules, directly removing these schedules...",
|
|
||||||
"mq",
|
|
||||||
"fn:scheduleCleanupWorker",
|
|
||||||
);
|
|
||||||
return await sql`
|
return await sql`
|
||||||
DELETE FROM snapshot_schedule
|
DELETE FROM snapshot_schedule
|
||||||
WHERE status IN ('pending', 'processing')
|
WHERE status IN ('pending', 'processing')
|
||||||
|
@ -7,7 +7,8 @@
|
|||||||
"worker:filter": "bun run ./build/filterWorker.js",
|
"worker:filter": "bun run ./build/filterWorker.js",
|
||||||
"adder": "bun run ./src/jobAdder.ts",
|
"adder": "bun run ./src/jobAdder.ts",
|
||||||
"bullui": "bun run ./src/bullui.ts",
|
"bullui": "bun run ./src/bullui.ts",
|
||||||
"all": "bun run concurrently --restart-tries -1 'bun run worker:main' 'bun run adder' 'bun run worker:filter'"
|
"all": "bun run concurrently --restart-tries -1 'bun run worker:main' 'bun run adder' 'bun run worker:filter'",
|
||||||
|
"format": "prettier --write ."
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"concurrently": "^9.1.2"
|
"concurrently": "^9.1.2"
|
||||||
|
@ -3,10 +3,9 @@ import Bun from "bun";
|
|||||||
await Bun.build({
|
await Bun.build({
|
||||||
entrypoints: ["./src/filterWorker.ts"],
|
entrypoints: ["./src/filterWorker.ts"],
|
||||||
outdir: "./build",
|
outdir: "./build",
|
||||||
target: "node"
|
target: "node"
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
const file = Bun.file("./build/filterWorker.js");
|
const file = Bun.file("./build/filterWorker.js");
|
||||||
const code = await file.text();
|
const code = await file.text();
|
||||||
|
|
||||||
|
@ -11,9 +11,9 @@ createBullBoard({
|
|||||||
queues: [
|
queues: [
|
||||||
new BullMQAdapter(LatestVideosQueue),
|
new BullMQAdapter(LatestVideosQueue),
|
||||||
new BullMQAdapter(ClassifyVideoQueue),
|
new BullMQAdapter(ClassifyVideoQueue),
|
||||||
new BullMQAdapter(SnapshotQueue),
|
new BullMQAdapter(SnapshotQueue)
|
||||||
],
|
],
|
||||||
serverAdapter: serverAdapter,
|
serverAdapter: serverAdapter
|
||||||
});
|
});
|
||||||
|
|
||||||
const app = express();
|
const app = express();
|
||||||
|
@ -7,13 +7,13 @@ import { lockManager } from "@core/mq/lockManager.ts";
|
|||||||
import Akari from "ml/akari.ts";
|
import Akari from "ml/akari.ts";
|
||||||
|
|
||||||
const shutdown = async (signal: string) => {
|
const shutdown = async (signal: string) => {
|
||||||
logger.log(`${signal} Received: Shutting down workers...`, "mq");
|
logger.log(`${signal} Received: Shutting down workers...`, "mq");
|
||||||
await filterWorker.close(true);
|
await filterWorker.close(true);
|
||||||
process.exit(0);
|
process.exit(0);
|
||||||
};
|
};
|
||||||
|
|
||||||
process.on('SIGINT', () => shutdown('SIGINT'));
|
process.on("SIGINT", () => shutdown("SIGINT"));
|
||||||
process.on('SIGTERM', () => shutdown('SIGTERM'));
|
process.on("SIGTERM", () => shutdown("SIGTERM"));
|
||||||
|
|
||||||
await Akari.init();
|
await Akari.init();
|
||||||
|
|
||||||
@ -29,7 +29,7 @@ const filterWorker = new Worker(
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
{ connection: redis as ConnectionOptions, concurrency: 2, removeOnComplete: { count: 1000 } },
|
{ connection: redis as ConnectionOptions, concurrency: 2, removeOnComplete: { count: 1000 } }
|
||||||
);
|
);
|
||||||
|
|
||||||
filterWorker.on("active", () => {
|
filterWorker.on("active", () => {
|
||||||
|
@ -10,7 +10,7 @@ import {
|
|||||||
scheduleCleanupWorker,
|
scheduleCleanupWorker,
|
||||||
snapshotTickWorker,
|
snapshotTickWorker,
|
||||||
snapshotVideoWorker,
|
snapshotVideoWorker,
|
||||||
takeBulkSnapshotForVideosWorker,
|
takeBulkSnapshotForVideosWorker
|
||||||
} from "mq/exec/executors.ts";
|
} from "mq/exec/executors.ts";
|
||||||
import { redis } from "@core/db/redis.ts";
|
import { redis } from "@core/db/redis.ts";
|
||||||
import logger from "@core/log/logger.ts";
|
import logger from "@core/log/logger.ts";
|
||||||
@ -30,15 +30,15 @@ const releaseAllLocks = async () => {
|
|||||||
};
|
};
|
||||||
|
|
||||||
const shutdown = async (signal: string) => {
|
const shutdown = async (signal: string) => {
|
||||||
logger.log(`${signal} Received: Shutting down workers...`, "mq");
|
logger.log(`${signal} Received: Shutting down workers...`, "mq");
|
||||||
await releaseAllLocks();
|
await releaseAllLocks();
|
||||||
await latestVideoWorker.close(true);
|
await latestVideoWorker.close(true);
|
||||||
await snapshotWorker.close(true);
|
await snapshotWorker.close(true);
|
||||||
process.exit(0);
|
process.exit(0);
|
||||||
};
|
};
|
||||||
|
|
||||||
process.on('SIGINT', () => shutdown('SIGINT'));
|
process.on("SIGINT", () => shutdown("SIGINT"));
|
||||||
process.on('SIGTERM', () => shutdown('SIGTERM'));
|
process.on("SIGTERM", () => shutdown("SIGTERM"));
|
||||||
|
|
||||||
const latestVideoWorker = new Worker(
|
const latestVideoWorker = new Worker(
|
||||||
"latestVideos",
|
"latestVideos",
|
||||||
@ -58,8 +58,8 @@ const latestVideoWorker = new Worker(
|
|||||||
connection: redis as ConnectionOptions,
|
connection: redis as ConnectionOptions,
|
||||||
concurrency: 6,
|
concurrency: 6,
|
||||||
removeOnComplete: { count: 1440 },
|
removeOnComplete: { count: 1440 },
|
||||||
removeOnFail: { count: 0 },
|
removeOnFail: { count: 0 }
|
||||||
},
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
latestVideoWorker.on("active", () => {
|
latestVideoWorker.on("active", () => {
|
||||||
@ -95,7 +95,7 @@ const snapshotWorker = new Worker(
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
{ connection: redis as ConnectionOptions, concurrency: 50, removeOnComplete: { count: 2000 } },
|
{ connection: redis as ConnectionOptions, concurrency: 50, removeOnComplete: { count: 2000 } }
|
||||||
);
|
);
|
||||||
|
|
||||||
snapshotWorker.on("error", (err) => {
|
snapshotWorker.on("error", (err) => {
|
||||||
|
@ -56,65 +56,65 @@ const databasePreparationQuery = `
|
|||||||
CREATE INDEX idx_snapshot_schedule_status ON snapshot_schedule USING btree (status);
|
CREATE INDEX idx_snapshot_schedule_status ON snapshot_schedule USING btree (status);
|
||||||
CREATE INDEX idx_snapshot_schedule_type ON snapshot_schedule USING btree (type);
|
CREATE INDEX idx_snapshot_schedule_type ON snapshot_schedule USING btree (type);
|
||||||
CREATE UNIQUE INDEX snapshot_schedule_pkey ON snapshot_schedule USING btree (id);
|
CREATE UNIQUE INDEX snapshot_schedule_pkey ON snapshot_schedule USING btree (id);
|
||||||
`
|
`;
|
||||||
|
|
||||||
const cleanUpQuery = `
|
const cleanUpQuery = `
|
||||||
DROP SEQUENCE IF EXISTS "snapshot_schedule_id_seq" CASCADE;
|
DROP SEQUENCE IF EXISTS "snapshot_schedule_id_seq" CASCADE;
|
||||||
DROP TABLE IF EXISTS "snapshot_schedule" CASCADE;
|
DROP TABLE IF EXISTS "snapshot_schedule" CASCADE;
|
||||||
`
|
`;
|
||||||
|
|
||||||
async function testMocking() {
|
async function testMocking() {
|
||||||
await sql.begin(async tx => {
|
await sql.begin(async (tx) => {
|
||||||
await tx.unsafe(cleanUpQuery).simple();
|
await tx.unsafe(cleanUpQuery).simple();
|
||||||
await tx.unsafe(databasePreparationQuery).simple();
|
await tx.unsafe(databasePreparationQuery).simple();
|
||||||
|
|
||||||
await tx`
|
await tx`
|
||||||
INSERT INTO snapshot_schedule
|
INSERT INTO snapshot_schedule
|
||||||
${sql(mockSnapshotSchedules, 'aid', 'created_at', 'finished_at', 'id', 'started_at', 'status', 'type')}
|
${sql(mockSnapshotSchedules, "aid", "created_at", "finished_at", "id", "started_at", "status", "type")}
|
||||||
`;
|
`;
|
||||||
|
|
||||||
await tx`
|
await tx`
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
`
|
`;
|
||||||
|
|
||||||
await tx.unsafe(cleanUpQuery).simple();
|
await tx.unsafe(cleanUpQuery).simple();
|
||||||
return;
|
return;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async function testBulkSetSnapshotStatus() {
|
async function testBulkSetSnapshotStatus() {
|
||||||
return await sql.begin(async tx => {
|
return await sql.begin(async (tx) => {
|
||||||
await tx.unsafe(cleanUpQuery).simple();
|
await tx.unsafe(cleanUpQuery).simple();
|
||||||
await tx.unsafe(databasePreparationQuery).simple();
|
await tx.unsafe(databasePreparationQuery).simple();
|
||||||
|
|
||||||
await tx`
|
await tx`
|
||||||
INSERT INTO snapshot_schedule
|
INSERT INTO snapshot_schedule
|
||||||
${sql(mockSnapshotSchedules, 'aid', 'created_at', 'finished_at', 'id', 'started_at', 'status', 'type')}
|
${sql(mockSnapshotSchedules, "aid", "created_at", "finished_at", "id", "started_at", "status", "type")}
|
||||||
`;
|
`;
|
||||||
|
|
||||||
const ids = [1, 2, 3];
|
const ids = [1, 2, 3];
|
||||||
|
|
||||||
await bulkSetSnapshotStatus(tx, ids, 'pending')
|
await bulkSetSnapshotStatus(tx, ids, "pending");
|
||||||
|
|
||||||
const rows = tx<{status: string}[]>`
|
const rows = tx<{ status: string }[]>`
|
||||||
SELECT status FROM snapshot_schedule WHERE id = 1;
|
SELECT status FROM snapshot_schedule WHERE id = 1;
|
||||||
`.execute();
|
`.execute();
|
||||||
|
|
||||||
await tx`
|
await tx`
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
`
|
`;
|
||||||
|
|
||||||
await tx.unsafe(cleanUpQuery).simple();
|
await tx.unsafe(cleanUpQuery).simple();
|
||||||
return rows;
|
return rows;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
test("data mocking works", async () => {
|
test("data mocking works", async () => {
|
||||||
await testMocking();
|
await testMocking();
|
||||||
expect(() => {}).not.toThrowError();
|
expect(() => {}).not.toThrowError();
|
||||||
});
|
});
|
||||||
|
|
||||||
test("bulkSetSnapshotStatus core logic works smoothly", async () => {
|
test("bulkSetSnapshotStatus core logic works smoothly", async () => {
|
||||||
const rows = await testBulkSetSnapshotStatus();
|
const rows = await testBulkSetSnapshotStatus();
|
||||||
expect(rows.every(item => item.status === 'pending')).toBe(true);
|
expect(rows.every((item) => item.status === "pending")).toBe(true);
|
||||||
});
|
});
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
export function formatTimestampToPsql(timestamp: number) {
|
export function formatTimestampToPsql(timestamp: number) {
|
||||||
const date = new Date(timestamp);
|
const date = new Date(timestamp);
|
||||||
return date.toISOString().slice(0, 23).replace("T", " ") + '+08';
|
return date.toISOString().slice(0, 23).replace("T", " ") + "+08";
|
||||||
}
|
}
|
||||||
|
|
||||||
export function parseTimestampFromPsql(timestamp: string) {
|
export function parseTimestampFromPsql(timestamp: string) {
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
import { defineConfig } from "vitest/config";
|
import { defineConfig } from "vitest/config";
|
||||||
import tsconfigPaths from 'vite-tsconfig-paths'
|
import tsconfigPaths from "vite-tsconfig-paths";
|
||||||
|
|
||||||
export default defineConfig({
|
export default defineConfig({
|
||||||
plugins: [tsconfigPaths()]
|
plugins: [tsconfigPaths()]
|
||||||
});
|
});
|
||||||
|
Loading…
Reference in New Issue
Block a user