Compare commits

...

3 Commits

Author SHA1 Message Date
1ff71ab241
improve: code quality 2025-05-19 00:13:01 +08:00
cf7a285f57
fix: incorrect import for psql.d.ts, remove unnecessary benchmark files 2025-05-19 00:11:53 +08:00
79a37d927a
fix: forget to specify type when collecting videos without active schedules;
missing return when eta too long for milestone snapshot
2025-05-19 00:10:33 +08:00
8 changed files with 44 additions and 401 deletions

View File

@ -63,18 +63,6 @@ export async function snapshotScheduleExists(sql: Psql, id: number) {
return rows.length > 0;
}
export async function videoHasActiveSchedule(sql: Psql, aid: number) {
const rows = await sql<{ status: string }[]>`
SELECT status
FROM snapshot_schedule
WHERE aid = ${aid}
AND (status = 'pending'
OR status = 'processing'
)
`
return rows.length > 0;
}
export async function videoHasActiveScheduleWithType(sql: Psql, aid: number, type: string) {
const rows = await sql<{ status: string }[]>`
SELECT status FROM snapshot_schedule
@ -292,23 +280,23 @@ export async function adjustSnapshotTime(
}
export async function getSnapshotsInNextSecond(sql: Psql) {
const rows = await sql<SnapshotScheduleType[]>`
SELECT *
FROM snapshot_schedule
WHERE started_at <= NOW() + INTERVAL '1 seconds' AND status = 'pending' AND type != 'normal'
ORDER BY
CASE
WHEN type = 'milestone' THEN 0
ELSE 1
END,
started_at
LIMIT 10;
`
return rows;
return sql<SnapshotScheduleType[]>`
SELECT *
FROM snapshot_schedule
WHERE started_at <= NOW() + INTERVAL '1 seconds'
AND status = 'pending'
AND type != 'normal'
ORDER BY CASE
WHEN type = 'milestone' THEN 0
ELSE 1
END,
started_at
LIMIT 10;
`;
}
export async function getBulkSnapshotsInNextSecond(sql: Psql) {
const rows = await sql<SnapshotScheduleType[]>`
return sql<SnapshotScheduleType[]>`
SELECT *
FROM snapshot_schedule
WHERE (started_at <= NOW() + INTERVAL '15 seconds')
@ -320,27 +308,33 @@ export async function getBulkSnapshotsInNextSecond(sql: Psql) {
END,
started_at
LIMIT 1000;
`
return rows;
`;
}
export async function setSnapshotStatus(sql: Psql, id: number, status: string) {
return await sql`
UPDATE snapshot_schedule SET status = ${status} WHERE id = ${id}
return sql`
UPDATE snapshot_schedule
SET status = ${status}
WHERE id = ${id}
`;
}
export async function bulkSetSnapshotStatus(sql: Psql, ids: number[], status: string) {
return await sql`
UPDATE snapshot_schedule SET status = ${status} WHERE id = ANY(${ids})
return sql`
UPDATE snapshot_schedule
SET status = ${status}
WHERE id = ANY (${ids})
`;
}
export async function getVideosWithoutActiveSnapshotSchedule(sql: Psql) {
export async function getVideosWithoutActiveSnapshotScheduleByType(sql: Psql, type: string) {
const rows = await sql<{ aid: string }[]>`
SELECT s.aid
FROM songs s
LEFT JOIN snapshot_schedule ss ON s.aid = ss.aid AND (ss.status = 'pending' OR ss.status = 'processing')
LEFT JOIN snapshot_schedule ss ON
s.aid = ss.aid AND
(ss.status = 'pending' OR ss.status = 'processing') AND
ss.type = ${type}
WHERE ss.aid IS NULL
`;
return rows.map((r) => Number(r.aid));

View File

@ -1,179 +0,0 @@
import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers";
import * as ort from "onnxruntime";
function softmax(logits: Float32Array): number[] {
const maxLogit = Math.max(...logits);
const exponents = logits.map((logit) => Math.exp(logit - maxLogit));
const sumOfExponents = exponents.reduce((sum, exp) => sum + exp, 0);
return Array.from(exponents.map((exp) => exp / sumOfExponents));
}
// 配置参数
const sentenceTransformerModelName = "alikia2x/jina-embedding-v3-m2v-1024";
const onnxClassifierPath = "./model/video_classifier_v3_17.onnx";
const onnxEmbeddingPath = "./model/embedding_original.onnx";
const testDataPath = "./data/filter/test1.jsonl";
// 初始化会话
const [sessionClassifier, sessionEmbedding] = await Promise.all([
ort.InferenceSession.create(onnxClassifierPath),
ort.InferenceSession.create(onnxEmbeddingPath),
]);
let tokenizer: PreTrainedTokenizer;
// 初始化分词器
async function loadTokenizer() {
const tokenizerConfig = { local_files_only: true };
tokenizer = await AutoTokenizer.from_pretrained(sentenceTransformerModelName, tokenizerConfig);
}
// 新的嵌入生成函数使用ONNX
async function getONNXEmbeddings(texts: string[], session: ort.InferenceSession): Promise<number[]> {
const { input_ids } = await tokenizer(texts, {
add_special_tokens: false,
return_tensor: false,
});
// 构造输入参数
const cumsum = (arr: number[]): number[] =>
arr.reduce((acc: number[], num: number, i: number) => [...acc, num + (acc[i - 1] || 0)], []);
const offsets: number[] = [0, ...cumsum(input_ids.slice(0, -1).map((x: string) => x.length))];
const flattened_input_ids = input_ids.flat();
// 准备ONNX输入
const inputs = {
input_ids: new ort.Tensor("int64", new BigInt64Array(flattened_input_ids.map(BigInt)), [
flattened_input_ids.length,
]),
offsets: new ort.Tensor("int64", new BigInt64Array(offsets.map(BigInt)), [offsets.length]),
};
// 执行推理
const { embeddings } = await session.run(inputs);
return Array.from(embeddings.data as Float32Array);
}
// 分类推理函数
async function runClassification(embeddings: number[]): Promise<number[]> {
const inputTensor = new ort.Tensor(
Float32Array.from(embeddings),
[1, 3, 1024],
);
const { logits } = await sessionClassifier.run({ channel_features: inputTensor });
return softmax(logits.data as Float32Array);
}
// 指标计算函数
function calculateMetrics(labels: number[], predictions: number[], elapsedTime: number): {
accuracy: number;
precision: number;
recall: number;
f1: number;
"Class 0 Prec": number;
speed: string;
} {
// 输出label和prediction不一样的index列表
const arr = [];
for (let i = 0; i < labels.length; i++) {
if (labels[i] !== predictions[i] && predictions[i] == 0) {
arr.push([i + 1, labels[i], predictions[i]]);
}
}
console.log(arr);
// 初始化混淆矩阵
const classCount = Math.max(...labels, ...predictions) + 1;
const matrix = Array.from({ length: classCount }, () => Array.from({ length: classCount }, () => 0));
// 填充矩阵
labels.forEach((trueLabel, i) => {
matrix[trueLabel][predictions[i]]++;
});
// 计算各指标
let totalTP = 0, totalFP = 0, totalFN = 0;
for (let c = 0; c < classCount; c++) {
const TP = matrix[c][c];
const FP = matrix.flatMap((row, i) => i === c ? [] : [row[c]]).reduce((a, b) => a + b, 0);
const FN = matrix[c].filter((_, i) => i !== c).reduce((a, b) => a + b, 0);
totalTP += TP;
totalFP += FP;
totalFN += FN;
}
const precision = totalTP / (totalTP + totalFP);
const recall = totalTP / (totalTP + totalFN);
const f1 = 2 * (precision * recall) / (precision + recall) || 0;
// 计算Class 0 Precision
const class0TP = matrix[0][0];
const class0FP = matrix.flatMap((row, i) => i === 0 ? [] : [row[0]]).reduce((a, b) => a + b, 0);
const class0Precision = class0TP / (class0TP + class0FP) || 0;
return {
accuracy: labels.filter((l, i) => l === predictions[i]).length / labels.length,
precision,
recall,
f1,
speed: `${(labels.length / (elapsedTime / 1000)).toFixed(1)} samples/sec`,
"Class 0 Prec": class0Precision,
};
}
// 改造后的评估函数
async function evaluateModel(session: ort.InferenceSession): Promise<{
accuracy: number;
precision: number;
recall: number;
f1: number;
"Class 0 Prec": number;
}> {
const data = await Deno.readTextFile(testDataPath);
const samples = data.split("\n")
.map((line) => {
try {
return JSON.parse(line);
} catch {
return null;
}
})
.filter(Boolean);
const allPredictions: number[] = [];
const allLabels: number[] = [];
const t = new Date().getTime();
for (const sample of samples) {
try {
const embeddings = await getONNXEmbeddings([
sample.title,
sample.description,
sample.tags.join(","),
], session);
const probabilities = await runClassification(embeddings);
allPredictions.push(probabilities.indexOf(Math.max(...probabilities)));
allLabels.push(sample.label);
} catch (error) {
console.error("Processing error:", error);
}
}
const elapsed = new Date().getTime() - t;
return calculateMetrics(allLabels, allPredictions, elapsed);
}
// 主函数
async function main() {
await loadTokenizer();
const metrics = await evaluateModel(sessionEmbedding);
console.log("Model Metrics:");
console.table(metrics);
}
await main();

View File

@ -1,171 +0,0 @@
import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers";
import * as ort from "onnxruntime";
function softmax(logits: Float32Array): number[] {
const maxLogit = Math.max(...logits);
const exponents = logits.map((logit) => Math.exp(logit - maxLogit));
const sumOfExponents = exponents.reduce((sum, exp) => sum + exp, 0);
return Array.from(exponents.map((exp) => exp / sumOfExponents));
}
// 配置参数
const sentenceTransformerModelName = "alikia2x/jina-embedding-v3-m2v-1024";
const onnxClassifierPath = "./model/video_classifier_v3_11.onnx";
const onnxEmbeddingOriginalPath = "./model/embedding_original.onnx";
const onnxEmbeddingQuantizedPath = "./model/embedding_original.onnx";
// 初始化会话
const [sessionClassifier, sessionEmbeddingOriginal, sessionEmbeddingQuantized] = await Promise.all([
ort.InferenceSession.create(onnxClassifierPath),
ort.InferenceSession.create(onnxEmbeddingOriginalPath),
ort.InferenceSession.create(onnxEmbeddingQuantizedPath),
]);
let tokenizer: PreTrainedTokenizer;
// 初始化分词器
async function loadTokenizer() {
const tokenizerConfig = { local_files_only: true };
tokenizer = await AutoTokenizer.from_pretrained(sentenceTransformerModelName, tokenizerConfig);
}
// 新的嵌入生成函数使用ONNX
async function getONNXEmbeddings(texts: string[], session: ort.InferenceSession): Promise<number[]> {
const { input_ids } = await tokenizer(texts, {
add_special_tokens: false,
return_tensor: false,
});
// 构造输入参数
const cumsum = (arr: number[]): number[] =>
arr.reduce((acc: number[], num: number, i: number) => [...acc, num + (acc[i - 1] || 0)], []);
const offsets: number[] = [0, ...cumsum(input_ids.slice(0, -1).map((x: string) => x.length))];
const flattened_input_ids = input_ids.flat();
// 准备ONNX输入
const inputs = {
input_ids: new ort.Tensor("int64", new BigInt64Array(flattened_input_ids.map(BigInt)), [
flattened_input_ids.length,
]),
offsets: new ort.Tensor("int64", new BigInt64Array(offsets.map(BigInt)), [offsets.length]),
};
// 执行推理
const { embeddings } = await session.run(inputs);
return Array.from(embeddings.data as Float32Array);
}
// 分类推理函数
async function runClassification(embeddings: number[]): Promise<number[]> {
const inputTensor = new ort.Tensor(
Float32Array.from(embeddings),
[1, 4, 1024],
);
const { logits } = await sessionClassifier.run({ channel_features: inputTensor });
return softmax(logits.data as Float32Array);
}
// 指标计算函数
function calculateMetrics(labels: number[], predictions: number[], elapsedTime: number): {
accuracy: number;
precision: number;
recall: number;
f1: number;
speed: string;
} {
// 初始化混淆矩阵
const classCount = Math.max(...labels, ...predictions) + 1;
const matrix = Array.from({ length: classCount }, () => Array.from({ length: classCount }, () => 0));
// 填充矩阵
labels.forEach((trueLabel, i) => {
matrix[trueLabel][predictions[i]]++;
});
// 计算各指标
let totalTP = 0, totalFP = 0, totalFN = 0;
for (let c = 0; c < classCount; c++) {
const TP = matrix[c][c];
const FP = matrix.flatMap((row, i) => i === c ? [] : [row[c]]).reduce((a, b) => a + b, 0);
const FN = matrix[c].filter((_, i) => i !== c).reduce((a, b) => a + b, 0);
totalTP += TP;
totalFP += FP;
totalFN += FN;
}
const precision = totalTP / (totalTP + totalFP);
const recall = totalTP / (totalTP + totalFN);
const f1 = 2 * (precision * recall) / (precision + recall) || 0;
return {
accuracy: labels.filter((l, i) => l === predictions[i]).length / labels.length,
precision,
recall,
f1,
speed: `${(labels.length / (elapsedTime / 1000)).toFixed(1)} samples/sec`,
};
}
// 改造后的评估函数
async function evaluateModel(session: ort.InferenceSession): Promise<{
accuracy: number;
precision: number;
recall: number;
f1: number;
}> {
const data = await Deno.readTextFile("./data/filter/test1.jsonl");
const samples = data.split("\n")
.map((line) => {
try {
return JSON.parse(line);
} catch {
return null;
}
})
.filter(Boolean);
const allPredictions: number[] = [];
const allLabels: number[] = [];
const t = new Date().getTime();
for (const sample of samples) {
try {
const embeddings = await getONNXEmbeddings([
sample.title,
sample.description,
sample.tags.join(","),
sample.author_info,
], session);
const probabilities = await runClassification(embeddings);
allPredictions.push(probabilities.indexOf(Math.max(...probabilities)));
allLabels.push(sample.label);
} catch (error) {
console.error("Processing error:", error);
}
}
const elapsed = new Date().getTime() - t;
return calculateMetrics(allLabels, allPredictions, elapsed);
}
// 主函数
async function main() {
await loadTokenizer();
// 评估原始模型
const originalMetrics = await evaluateModel(sessionEmbeddingOriginal);
console.log("Original Model Metrics:");
console.table(originalMetrics);
// 评估量化模型
const quantizedMetrics = await evaluateModel(sessionEmbeddingQuantized);
console.log("Quantized Model Metrics:");
console.table(quantizedMetrics);
}
await main();

View File

@ -16,8 +16,8 @@ export const dispatchMilestoneSnapshotsWorker = async (_job: Job) => {
if (eta > 144) continue;
const now = Date.now();
const scheduledNextSnapshotDelay = eta * HOUR;
const maxInterval = 1 * HOUR;
const minInterval = 1 * SECOND;
const maxInterval = 1.2 * HOUR;
const minInterval = 2 * SECOND;
const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval);
const targetTime = now + delay;
await scheduleSnapshot(sql, aid, "milestone", targetTime);
@ -25,5 +25,5 @@ export const dispatchMilestoneSnapshotsWorker = async (_job: Job) => {
}
} catch (e) {
logger.error(e as Error, "mq", "fn:dispatchMilestoneSnapshotsWorker");
};
}
}

View File

@ -1,7 +1,10 @@
import { Job } from "bullmq";
import { getLatestVideoSnapshot } from "db/snapshot.ts";
import { truncate } from "utils/truncate.ts";
import { getVideosWithoutActiveSnapshotSchedule, scheduleSnapshot } from "db/snapshotSchedule.ts";
import {
getVideosWithoutActiveSnapshotScheduleByType,
scheduleSnapshot
} from "db/snapshotSchedule.ts";
import logger from "@core/log/logger.ts";
import { HOUR, MINUTE, WEEK } from "@core/const/time.ts";
import { lockManager } from "@core/mq/lockManager.ts";
@ -17,7 +20,7 @@ export const dispatchRegularSnapshotsWorker = async (_job: Job): Promise<void> =
}
await lockManager.acquireLock("dispatchRegularSnapshots", 30 * 60);
const aids = await getVideosWithoutActiveSnapshotSchedule(sql);
const aids = await getVideosWithoutActiveSnapshotScheduleByType(sql, "normal");
for (const rawAid of aids) {
const aid = Number(rawAid);
const latestSnapshot = await getLatestVideoSnapshot(sql, aid);

View File

@ -2,7 +2,6 @@ import { Job } from "bullmq";
import { scheduleSnapshot, setSnapshotStatus, snapshotScheduleExists } from "db/snapshotSchedule.ts";
import logger from "@core/log/logger.ts";
import { HOUR, MINUTE, SECOND } from "@core/const/time.ts";
import { lockManager } from "@core/mq/lockManager.ts";
import { getBiliVideoStatus, setBiliVideoStatus } from "../../db/bilibili_metadata.ts";
import { insertVideoSnapshot } from "mq/task/getVideoStats.ts";
import { getSongsPublihsedAt } from "db/songs.ts";
@ -78,8 +77,9 @@ export const snapshotVideoWorker = async (job: Job): Promise<void> => {
logger.warn(
`ETA (${etaHoursString}) too long for milestone snapshot. aid: ${aid}.`,
"mq",
"fn:dispatchRegularSnapshotsWorker",
"fn:snapshotVideoWorker",
);
return;
}
const now = Date.now();
const targetTime = now + eta * HOUR;
@ -92,7 +92,7 @@ export const snapshotVideoWorker = async (job: Job): Promise<void> => {
logger.warn(
`No available proxy for aid ${job.data.aid}.`,
"mq",
"fn:takeSnapshotForVideoWorker",
"fn:snapshotVideoWorker",
);
await setSnapshotStatus(sql, id, "no_proxy");
await scheduleSnapshot(sql, aid, type, Date.now() + retryInterval);
@ -102,16 +102,12 @@ export const snapshotVideoWorker = async (job: Job): Promise<void> => {
logger.warn(
`Failed to proxy request for aid ${job.data.aid}: ${e.message}`,
"mq",
"fn:takeSnapshotForVideoWorker",
"fn:snapshotVideoWorker",
);
await setSnapshotStatus(sql, id, "failed");
await scheduleSnapshot(sql, aid, type, Date.now() + retryInterval);
}
logger.error(e as Error, "mq", "fn:takeSnapshotForVideoWorker");
logger.error(e as Error, "mq", "fn:snapshotVideoWorker");
await setSnapshotStatus(sql, id, "failed");
}
finally {
await lockManager.releaseLock("dispatchRegularSnapshots");
};
return;
};

View File

@ -2,7 +2,7 @@ import { findClosestSnapshot, getLatestSnapshot, hasAtLeast2Snapshots } from "db
import { truncate } from "utils/truncate.ts";
import { closetMilestone } from "./exec/snapshotTick.ts";
import { HOUR, MINUTE } from "@core/const/time.ts";
import type { Psql } from "@core/db/global.d.ts";
import type { Psql } from "@core/db/psql.d.ts";
const log = (value: number, base: number = 10) => Math.log(value) / Math.log(base);
@ -34,7 +34,7 @@ export const getAdjustedShortTermETA = async (sql: Psql, aid: number) => {
if (!snapshotsEnough) return 0;
const currentTimestamp = new Date().getTime();
const timeIntervals = [3 * MINUTE, 20 * MINUTE, 1 * HOUR, 3 * HOUR, 6 * HOUR, 72 * HOUR];
const timeIntervals = [3 * MINUTE, 20 * MINUTE, HOUR, 3 * HOUR, 6 * HOUR, 72 * HOUR];
const DELTA = 0.00001;
let minETAHours = Infinity;

View File

@ -1,6 +1,6 @@
import { findClosestSnapshot, findSnapshotBefore, getLatestSnapshot } from "db/snapshotSchedule.ts";
import { HOUR } from "@core/const/time.ts";
import type { Psql } from "@core/db/global.d.ts";
import type { Psql } from "@core/db/psql.d.ts";
export const getRegularSnapshotInterval = async (sql: Psql, aid: number) => {
const now = Date.now();