update: preparation for snapshotSchedule
This commit is contained in:
parent
a6c8fd7f3f
commit
b07d0c18f9
@ -3,7 +3,7 @@ import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
|||||||
import { VideoSnapshotType } from "lib/db/schema.d.ts";
|
import { VideoSnapshotType } from "lib/db/schema.d.ts";
|
||||||
import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts";
|
import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts";
|
||||||
|
|
||||||
export async function getSongsNearMilestone(client: Client) {
|
export async function getVideosNearMilestone(client: Client) {
|
||||||
const queryResult = await client.queryObject<VideoSnapshotType>(`
|
const queryResult = await client.queryObject<VideoSnapshotType>(`
|
||||||
WITH max_views_per_aid AS (
|
WITH max_views_per_aid AS (
|
||||||
-- 找出每个 aid 的最大 views 值,并确保 aid 存在于 songs 表中
|
-- 找出每个 aid 的最大 views 值,并确保 aid 存在于 songs 表中
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
import { DAY, HOUR, MINUTE, SECOND } from "$std/datetime/constants.ts";
|
||||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -10,3 +11,91 @@ export async function videoHasActiveSchedule(client: Client, aid: number) {
|
|||||||
);
|
);
|
||||||
return res.rows.length > 0;
|
return res.rows.length > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
interface Snapshot {
|
||||||
|
created_at: Date;
|
||||||
|
views: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function findClosestSnapshot(
|
||||||
|
client: Client,
|
||||||
|
aid: number,
|
||||||
|
targetTime: Date
|
||||||
|
): Promise<Snapshot | null> {
|
||||||
|
const query = `
|
||||||
|
SELECT created_at, views FROM video_snapshot
|
||||||
|
WHERE aid = $1
|
||||||
|
ORDER BY ABS(EXTRACT(EPOCH FROM (created_at - $2::timestamptz))) ASC
|
||||||
|
LIMIT 1
|
||||||
|
`;
|
||||||
|
const result = await client.queryObject<{ created_at: string; views: number }>(
|
||||||
|
query,
|
||||||
|
[aid, targetTime.toISOString()]
|
||||||
|
);
|
||||||
|
if (result.rows.length === 0) return null;
|
||||||
|
const row = result.rows[0];
|
||||||
|
return {
|
||||||
|
created_at: new Date(row.created_at),
|
||||||
|
views: row.views,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function getShortTermTimeFeaturesForVideo(
|
||||||
|
client: Client,
|
||||||
|
aid: number,
|
||||||
|
initialTimestampMiliseconds: number
|
||||||
|
): Promise<number[]> {
|
||||||
|
const initialTime = new Date(initialTimestampMiliseconds);
|
||||||
|
const timeWindows = [
|
||||||
|
[ 5 * MINUTE, 0 * MINUTE],
|
||||||
|
[ 15 * MINUTE, 0 * MINUTE],
|
||||||
|
[ 40 * MINUTE, 0 * MINUTE],
|
||||||
|
[ 1 * HOUR, 0 * HOUR],
|
||||||
|
[ 2 * HOUR, 1 * HOUR],
|
||||||
|
[ 3 * HOUR, 2 * HOUR],
|
||||||
|
[ 3 * HOUR, 0 * HOUR],
|
||||||
|
[ 6 * HOUR, 0 * HOUR],
|
||||||
|
[18 * HOUR, 12 * HOUR],
|
||||||
|
[ 1 * DAY, 0 * DAY],
|
||||||
|
[ 3 * DAY, 0 * DAY],
|
||||||
|
[ 7 * DAY, 0 * DAY]
|
||||||
|
];
|
||||||
|
|
||||||
|
const results: number[] = [];
|
||||||
|
|
||||||
|
for (const [windowStart, windowEnd] of timeWindows) {
|
||||||
|
const targetTimeStart = new Date(initialTime.getTime() - windowStart);
|
||||||
|
const targetTimeEnd = new Date(initialTime.getTime() - windowEnd);
|
||||||
|
|
||||||
|
const startRecord = await findClosestSnapshot(client, aid, targetTimeStart);
|
||||||
|
const endRecord = await findClosestSnapshot(client, aid, targetTimeEnd);
|
||||||
|
|
||||||
|
if (!startRecord || !endRecord) {
|
||||||
|
results.push(NaN);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const timeDiffSeconds =
|
||||||
|
(endRecord.created_at.getTime() - startRecord.created_at.getTime()) / 1000;
|
||||||
|
const windowDuration = windowStart - windowEnd;
|
||||||
|
|
||||||
|
let scale = 0;
|
||||||
|
if (windowDuration > 0) {
|
||||||
|
scale = timeDiffSeconds / windowDuration;
|
||||||
|
}
|
||||||
|
|
||||||
|
const viewsDiff = endRecord.views - startRecord.views;
|
||||||
|
const adjustedViews = Math.max(viewsDiff, 1);
|
||||||
|
|
||||||
|
let result: number;
|
||||||
|
if (scale > 0) {
|
||||||
|
result = Math.log2(adjustedViews / scale + 1);
|
||||||
|
} else {
|
||||||
|
result = Math.log2(adjustedViews + 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
results.push(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
return results;
|
||||||
|
}
|
@ -5,8 +5,8 @@ import { WorkerError } from "lib/mq/schema.ts";
|
|||||||
import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers";
|
import { AutoTokenizer, PreTrainedTokenizer } from "@huggingface/transformers";
|
||||||
|
|
||||||
const tokenizerModel = "alikia2x/jina-embedding-v3-m2v-1024";
|
const tokenizerModel = "alikia2x/jina-embedding-v3-m2v-1024";
|
||||||
const onnxClassifierPath = "./model/video_classifier_v3_17.onnx";
|
const onnxClassifierPath = "./model/akari/3.17.onnx";
|
||||||
const onnxEmbeddingPath = "./model/model.onnx";
|
const onnxEmbeddingPath = "./model/embedding/model.onnx";
|
||||||
|
|
||||||
class AkariProto extends AIManager {
|
class AkariProto extends AIManager {
|
||||||
private tokenizer: PreTrainedTokenizer | null = null;
|
private tokenizer: PreTrainedTokenizer | null = null;
|
||||||
@ -17,11 +17,11 @@ class AkariProto extends AIManager {
|
|||||||
this.models = {
|
this.models = {
|
||||||
"classifier": onnxClassifierPath,
|
"classifier": onnxClassifierPath,
|
||||||
"embedding": onnxEmbeddingPath,
|
"embedding": onnxEmbeddingPath,
|
||||||
}
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
public override async init(): Promise<void> {
|
public override async init(): Promise<void> {
|
||||||
super.init();
|
await super.init();
|
||||||
await this.initJinaTokenizer();
|
await this.initJinaTokenizer();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -54,13 +54,13 @@ class AkariProto extends AIManager {
|
|||||||
|
|
||||||
const { input_ids } = await tokenizer(texts, {
|
const { input_ids } = await tokenizer(texts, {
|
||||||
add_special_tokens: false,
|
add_special_tokens: false,
|
||||||
return_tensors: "js",
|
return_tensor: false,
|
||||||
});
|
});
|
||||||
|
|
||||||
const cumsum = (arr: number[]): number[] =>
|
const cumsum = (arr: number[]): number[] =>
|
||||||
arr.reduce((acc: number[], num: number, i: number) => [...acc, num + (acc[i - 1] || 0)], []);
|
arr.reduce((acc: number[], num: number, i: number) => [...acc, num + (acc[i - 1] || 0)], []);
|
||||||
|
|
||||||
const offsets: number[] = [0, ...cumsum(input_ids.slice(0, -1).map((x: string[]) => x.length))];
|
const offsets: number[] = [0, ...cumsum(input_ids.slice(0, -1).map((x: string) => x.length))];
|
||||||
const flattened_input_ids = input_ids.flat();
|
const flattened_input_ids = input_ids.flat();
|
||||||
|
|
||||||
const inputs = {
|
const inputs = {
|
||||||
@ -85,14 +85,16 @@ class AkariProto extends AIManager {
|
|||||||
return this.softmax(logits.data as Float32Array);
|
return this.softmax(logits.data as Float32Array);
|
||||||
}
|
}
|
||||||
|
|
||||||
public async classifyVideo(title: string, description: string, tags: string, aid: number): Promise<number> {
|
public async classifyVideo(title: string, description: string, tags: string, aid?: number): Promise<number> {
|
||||||
const embeddings = await this.getJinaEmbeddings1024([
|
const embeddings = await this.getJinaEmbeddings1024([
|
||||||
title,
|
title,
|
||||||
description,
|
description,
|
||||||
tags,
|
tags,
|
||||||
]);
|
]);
|
||||||
const probabilities = await this.runClassification(embeddings);
|
const probabilities = await this.runClassification(embeddings);
|
||||||
|
if (aid) {
|
||||||
logger.log(`Prediction result for aid: ${aid}: [${probabilities.map((p) => p.toFixed(5))}]`, "ml");
|
logger.log(`Prediction result for aid: ${aid}: [${probabilities.map((p) => p.toFixed(5))}]`, "ml");
|
||||||
|
}
|
||||||
return probabilities.indexOf(Math.max(...probabilities));
|
return probabilities.indexOf(Math.max(...probabilities));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -103,4 +105,3 @@ class AkariProto extends AIManager {
|
|||||||
|
|
||||||
const Akari = new AkariProto();
|
const Akari = new AkariProto();
|
||||||
export default Akari;
|
export default Akari;
|
||||||
|
|
||||||
|
@ -22,7 +22,7 @@ export class AIManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public getModelSession(key: string): ort.InferenceSession {
|
public getModelSession(key: string): ort.InferenceSession {
|
||||||
if (!this.sessions[key]) {
|
if (this.sessions[key] === undefined) {
|
||||||
throw new WorkerError(new Error(`Model ${key} not found / not initialized.`), "ml", "fn:getModelSession");
|
throw new WorkerError(new Error(`Model ${key} not found / not initialized.`), "ml", "fn:getModelSession");
|
||||||
}
|
}
|
||||||
return this.sessions[key];
|
return this.sessions[key];
|
||||||
|
25
lib/ml/mantis.ts
Normal file
25
lib/ml/mantis.ts
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
import { AIManager } from "lib/ml/manager.ts";
|
||||||
|
import * as ort from "onnxruntime";
|
||||||
|
import logger from "lib/log/logger.ts";
|
||||||
|
import { WorkerError } from "lib/mq/schema.ts";
|
||||||
|
|
||||||
|
const modelPath = "./model/model.onnx";
|
||||||
|
|
||||||
|
class MantisProto extends AIManager {
|
||||||
|
|
||||||
|
constructor() {
|
||||||
|
super();
|
||||||
|
this.models = {
|
||||||
|
"predictor": modelPath,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public override async init(): Promise<void> {
|
||||||
|
await super.init();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
const Mantis = new MantisProto();
|
||||||
|
export default Mantis;
|
@ -1,229 +1,31 @@
|
|||||||
import { Job } from "bullmq";
|
import { Job } from "bullmq";
|
||||||
import { MINUTE, SECOND } from "$std/datetime/constants.ts";
|
|
||||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
|
||||||
import { db } from "lib/db/init.ts";
|
import { db } from "lib/db/init.ts";
|
||||||
import {
|
import { getVideosNearMilestone } from "lib/db/snapshot.ts";
|
||||||
getShortTermEtaPrediction,
|
import { videoHasActiveSchedule } from "lib/db/snapshotSchedule.ts";
|
||||||
getSongsNearMilestone,
|
|
||||||
getUnsnapshotedSongs,
|
|
||||||
songEligibleForMilestoneSnapshot,
|
|
||||||
} from "lib/db/snapshot.ts";
|
|
||||||
import { SnapshotQueue } from "lib/mq/index.ts";
|
|
||||||
import { insertVideoStats } from "lib/mq/task/getVideoStats.ts";
|
|
||||||
import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts";
|
|
||||||
import { redis } from "lib/db/redis.ts";
|
|
||||||
import { NetSchedulerError } from "lib/mq/scheduler.ts";
|
|
||||||
import logger from "lib/log/logger.ts";
|
|
||||||
import { formatSeconds } from "lib/utils/formatSeconds.ts";
|
|
||||||
import { truncate } from "lib/utils/truncate.ts";
|
|
||||||
|
|
||||||
async function snapshotScheduled(aid: number) {
|
|
||||||
try {
|
|
||||||
return await redis.exists(`cvsa:snapshot:${aid}`);
|
|
||||||
} catch {
|
|
||||||
logger.error(`Failed to check scheduled status for ${aid}`, "mq");
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async function setSnapshotScheduled(aid: number, value: boolean, exp: number) {
|
|
||||||
try {
|
|
||||||
if (value) {
|
|
||||||
await redis.set(`cvsa:snapshot:${aid}`, 1, "EX", exp);
|
|
||||||
} else {
|
|
||||||
await redis.del(`cvsa:snapshot:${aid}`);
|
|
||||||
}
|
|
||||||
} catch {
|
|
||||||
logger.error(`Failed to set scheduled status to ${value} for ${aid}`, "mq");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
interface SongNearMilestone {
|
|
||||||
aid: number;
|
|
||||||
id: number;
|
|
||||||
created_at: string;
|
|
||||||
views: number;
|
|
||||||
coins: number;
|
|
||||||
likes: number;
|
|
||||||
favorites: number;
|
|
||||||
shares: number;
|
|
||||||
danmakus: number;
|
|
||||||
replies: number;
|
|
||||||
}
|
|
||||||
|
|
||||||
async function processMilestoneSnapshots(client: Client, vidoesNearMilestone: SongNearMilestone[]) {
|
|
||||||
let i = 0;
|
|
||||||
for (const snapshot of vidoesNearMilestone) {
|
|
||||||
if (await snapshotScheduled(snapshot.aid)) {
|
|
||||||
logger.silly(
|
|
||||||
`Video ${snapshot.aid} is already scheduled for snapshot`,
|
|
||||||
"mq",
|
|
||||||
"fn:processMilestoneSnapshots",
|
|
||||||
);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (await songEligibleForMilestoneSnapshot(client, snapshot.aid) === false) {
|
|
||||||
logger.silly(
|
|
||||||
`Video ${snapshot.aid} is not eligible for milestone snapshot`,
|
|
||||||
"mq",
|
|
||||||
"fn:processMilestoneSnapshots",
|
|
||||||
);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
const factor = Math.floor(i / 8);
|
|
||||||
const delayTime = factor * SECOND * 2;
|
|
||||||
await SnapshotQueue.add("snapshotMilestoneVideo", {
|
|
||||||
aid: snapshot.aid,
|
|
||||||
currentViews: snapshot.views,
|
|
||||||
snapshotedAt: snapshot.created_at,
|
|
||||||
}, { delay: delayTime, priority: 1 });
|
|
||||||
await setSnapshotScheduled(snapshot.aid, true, 20 * 60);
|
|
||||||
i++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async function processUnsnapshotedVideos(unsnapshotedVideos: number[]) {
|
|
||||||
let i = 0;
|
|
||||||
for (const aid of unsnapshotedVideos) {
|
|
||||||
if (await snapshotScheduled(aid)) {
|
|
||||||
logger.silly(`Video ${aid} is already scheduled for snapshot`, "mq", "fn:processUnsnapshotedVideos");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
const factor = Math.floor(i / 5);
|
|
||||||
const delayTime = factor * SECOND * 4;
|
|
||||||
await SnapshotQueue.add("snapshotVideo", {
|
|
||||||
aid,
|
|
||||||
}, { delay: delayTime, priority: 3 });
|
|
||||||
await setSnapshotScheduled(aid, true, 6 * 60 * 60);
|
|
||||||
i++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export const snapshotTickWorker = async (_job: Job) => {
|
export const snapshotTickWorker = async (_job: Job) => {
|
||||||
const client = await db.connect();
|
const client = await db.connect();
|
||||||
try {
|
try {
|
||||||
const vidoesNearMilestone = await getSongsNearMilestone(client);
|
// TODO: implement
|
||||||
await processMilestoneSnapshots(client, vidoesNearMilestone);
|
|
||||||
|
|
||||||
const unsnapshotedVideos = await getUnsnapshotedSongs(client);
|
|
||||||
await processUnsnapshotedVideos(unsnapshotedVideos);
|
|
||||||
} finally {
|
} finally {
|
||||||
client.release();
|
client.release();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
export const takeSnapshotForMilestoneVideoWorker = async (job: Job) => {
|
export const collectMilestoneSnapshotsWorker = async (_job: Job) => {
|
||||||
const client = await db.connect();
|
const client = await db.connect();
|
||||||
await setSnapshotScheduled(job.data.aid, true, 20 * 60);
|
|
||||||
try {
|
try {
|
||||||
const aid: number = job.data.aid;
|
const videos = await getVideosNearMilestone(client);
|
||||||
const currentViews: number = job.data.currentViews;
|
for (const video of videos) {
|
||||||
const lastSnapshoted: string = job.data.snapshotedAt;
|
if (await videoHasActiveSchedule(client, video.aid)) continue;
|
||||||
const stat = await insertVideoStats(client, aid, "snapshotMilestoneVideo");
|
|
||||||
if (typeof stat === "number") {
|
|
||||||
if (stat === -404 || stat === 62002 || stat == 62012) {
|
|
||||||
await setSnapshotScheduled(aid, true, 6 * 60 * 60);
|
|
||||||
} else {
|
|
||||||
await setSnapshotScheduled(aid, false, 0);
|
|
||||||
}
|
}
|
||||||
return;
|
} catch (_e) {
|
||||||
}
|
//
|
||||||
const nextMilestone = currentViews >= 100000 ? 1000000 : 100000;
|
|
||||||
if (stat.views >= nextMilestone) {
|
|
||||||
await setSnapshotScheduled(aid, false, 0);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
let eta = await getShortTermEtaPrediction(client, aid);
|
|
||||||
if (eta === null) {
|
|
||||||
const DELTA = 0.001;
|
|
||||||
const intervalSeconds = (Date.now() - parseTimestampFromPsql(lastSnapshoted)) / SECOND;
|
|
||||||
const viewsIncrement = stat.views - currentViews;
|
|
||||||
const incrementSpeed = viewsIncrement / (intervalSeconds + DELTA);
|
|
||||||
const viewsToIncrease = nextMilestone - stat.views;
|
|
||||||
eta = viewsToIncrease / (incrementSpeed + DELTA);
|
|
||||||
}
|
|
||||||
const scheduledNextSnapshotDelay = eta * SECOND / 3;
|
|
||||||
const maxInterval = 20 * MINUTE;
|
|
||||||
const minInterval = 1 * SECOND;
|
|
||||||
const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval);
|
|
||||||
await SnapshotQueue.add("snapshotMilestoneVideo", {
|
|
||||||
aid,
|
|
||||||
currentViews: stat.views,
|
|
||||||
snapshotedAt: stat.time,
|
|
||||||
}, { delay, priority: 1 });
|
|
||||||
await job.updateData({
|
|
||||||
...job.data,
|
|
||||||
updatedViews: stat.views,
|
|
||||||
updatedTime: new Date(stat.time).toISOString(),
|
|
||||||
etaInMins: eta / 60,
|
|
||||||
});
|
|
||||||
logger.log(
|
|
||||||
`Scheduled next milestone snapshot for ${aid} in ${
|
|
||||||
formatSeconds(delay / 1000)
|
|
||||||
}, current views: ${stat.views}`,
|
|
||||||
"mq",
|
|
||||||
);
|
|
||||||
} catch (e) {
|
|
||||||
if (e instanceof NetSchedulerError && e.code === "NO_AVAILABLE_PROXY") {
|
|
||||||
logger.warn(
|
|
||||||
`No available proxy for aid ${job.data.aid}.`,
|
|
||||||
"mq",
|
|
||||||
"fn:takeSnapshotForMilestoneVideoWorker",
|
|
||||||
);
|
|
||||||
await SnapshotQueue.add("snapshotMilestoneVideo", {
|
|
||||||
aid: job.data.aid,
|
|
||||||
currentViews: job.data.currentViews,
|
|
||||||
snapshotedAt: job.data.snapshotedAt,
|
|
||||||
}, { delay: 5 * SECOND, priority: 1 });
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
throw e;
|
|
||||||
} finally {
|
} finally {
|
||||||
client.release();
|
client.release();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
export const takeSnapshotForVideoWorker = async (job: Job) => {
|
export const takeSnapshotForVideoWorker = async (_job: Job) => {
|
||||||
const client = await db.connect();
|
// TODO: implement
|
||||||
await setSnapshotScheduled(job.data.aid, true, 6 * 60 * 60);
|
|
||||||
try {
|
|
||||||
const { aid } = job.data;
|
|
||||||
const stat = await insertVideoStats(client, aid, "getVideoInfo");
|
|
||||||
if (typeof stat === "number") {
|
|
||||||
if (stat === -404 || stat === 62002 || stat == 62012) {
|
|
||||||
await setSnapshotScheduled(aid, true, 6 * 60 * 60);
|
|
||||||
} else {
|
|
||||||
await setSnapshotScheduled(aid, false, 0);
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
logger.log(`Taken snapshot for ${aid}`, "mq");
|
|
||||||
if (stat == null) {
|
|
||||||
setSnapshotScheduled(aid, false, 0);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
await job.updateData({
|
|
||||||
...job.data,
|
|
||||||
updatedViews: stat.views,
|
|
||||||
updatedTime: new Date(stat.time).toISOString(),
|
|
||||||
});
|
|
||||||
const nearMilestone = (stat.views >= 90000 && stat.views < 100000) ||
|
|
||||||
(stat.views >= 900000 && stat.views < 1000000);
|
|
||||||
if (nearMilestone) {
|
|
||||||
await SnapshotQueue.add("snapshotMilestoneVideo", {
|
|
||||||
aid,
|
|
||||||
currentViews: stat.views,
|
|
||||||
snapshotedAt: stat.time,
|
|
||||||
}, { delay: 0, priority: 1 });
|
|
||||||
}
|
|
||||||
await setSnapshotScheduled(aid, false, 0);
|
|
||||||
} catch (e) {
|
|
||||||
if (e instanceof NetSchedulerError && e.code === "NO_AVAILABLE_PROXY") {
|
|
||||||
await setSnapshotScheduled(job.data.aid, false, 0);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
throw e;
|
|
||||||
} finally {
|
|
||||||
client.release();
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
@ -19,6 +19,10 @@ export async function initMQ() {
|
|||||||
every: 1 * SECOND,
|
every: 1 * SECOND,
|
||||||
immediately: true,
|
immediately: true,
|
||||||
});
|
});
|
||||||
|
await SnapshotQueue.upsertJobScheduler("collectMilestoneSnapshots", {
|
||||||
|
every: 5 * MINUTE,
|
||||||
|
immediately: true,
|
||||||
|
});
|
||||||
|
|
||||||
logger.log("Message queue initialized.");
|
logger.log("Message queue initialized.");
|
||||||
}
|
}
|
||||||
|
@ -4,20 +4,20 @@ from model import CompactPredictor
|
|||||||
import torch
|
import torch
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
model = CompactPredictor(16).to('cpu', dtype=torch.float32)
|
model = CompactPredictor(10).to('cpu', dtype=torch.float32)
|
||||||
model.load_state_dict(torch.load('./pred/checkpoints/model_20250315_0530.pt'))
|
model.load_state_dict(torch.load('./pred/checkpoints/long_term.pt'))
|
||||||
model.eval()
|
model.eval()
|
||||||
# inference
|
# inference
|
||||||
initial = 999269
|
initial = 997029
|
||||||
last = initial
|
last = initial
|
||||||
start_time = '2025-03-15 01:03:21'
|
start_time = '2025-03-17 00:13:17'
|
||||||
for i in range(1, 48):
|
for i in range(1, 120):
|
||||||
hour = i / 0.5
|
hour = i / 0.5
|
||||||
sec = hour * 3600
|
sec = hour * 3600
|
||||||
time_d = np.log2(sec)
|
time_d = np.log2(sec)
|
||||||
data = [time_d, np.log2(initial+1), # time_delta, current_views
|
data = [time_d, np.log2(initial+1), # time_delta, current_views
|
||||||
2.801318, 3.455128, 3.903391, 3.995577, 4.641488, 5.75131, 6.723868, 6.105322, 8.141023, 9.576701, 10.665067, # grows_feat
|
6.111542, 8.404707, 10.071566, 11.55888, 12.457823,# grows_feat
|
||||||
0.043993, 0.72057, 28.000902 # time_feat
|
0.009225, 0.001318, 28.001814# time_feat
|
||||||
]
|
]
|
||||||
np_arr = np.array([data])
|
np_arr = np.array([data])
|
||||||
tensor = torch.from_numpy(np_arr).to('cpu', dtype=torch.float32)
|
tensor = torch.from_numpy(np_arr).to('cpu', dtype=torch.float32)
|
||||||
@ -25,7 +25,7 @@ def main():
|
|||||||
num = output.detach().numpy()[0][0]
|
num = output.detach().numpy()[0][0]
|
||||||
views_pred = int(np.exp2(num)) + initial
|
views_pred = int(np.exp2(num)) + initial
|
||||||
current_time = datetime.datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S') + datetime.timedelta(hours=hour)
|
current_time = datetime.datetime.strptime(start_time, '%Y-%m-%d %H:%M:%S') + datetime.timedelta(hours=hour)
|
||||||
print(current_time.strftime('%m-%d %H:%M'), views_pred, views_pred - last)
|
print(current_time.strftime('%m-%d %H:%M:%S'), views_pred, views_pred - last)
|
||||||
last = views_pred
|
last = views_pred
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
@ -5,7 +5,7 @@ import logger from "lib/log/logger.ts";
|
|||||||
import { lockManager } from "lib/mq/lockManager.ts";
|
import { lockManager } from "lib/mq/lockManager.ts";
|
||||||
import { WorkerError } from "lib/mq/schema.ts";
|
import { WorkerError } from "lib/mq/schema.ts";
|
||||||
import { getVideoInfoWorker } from "lib/mq/exec/getLatestVideos.ts";
|
import { getVideoInfoWorker } from "lib/mq/exec/getLatestVideos.ts";
|
||||||
import { snapshotTickWorker, takeSnapshotForMilestoneVideoWorker, takeSnapshotForVideoWorker } from "lib/mq/exec/snapshotTick.ts";
|
import { snapshotTickWorker, collectMilestoneSnapshotsWorker, takeSnapshotForVideoWorker } from "lib/mq/exec/snapshotTick.ts";
|
||||||
|
|
||||||
Deno.addSignalListener("SIGINT", async () => {
|
Deno.addSignalListener("SIGINT", async () => {
|
||||||
logger.log("SIGINT Received: Shutting down workers...", "mq");
|
logger.log("SIGINT Received: Shutting down workers...", "mq");
|
||||||
@ -56,15 +56,15 @@ const snapshotWorker = new Worker(
|
|||||||
"snapshot",
|
"snapshot",
|
||||||
async (job: Job) => {
|
async (job: Job) => {
|
||||||
switch (job.name) {
|
switch (job.name) {
|
||||||
case "snapshotMilestoneVideo":
|
|
||||||
await takeSnapshotForMilestoneVideoWorker(job);
|
|
||||||
break;
|
|
||||||
case "snapshotVideo":
|
case "snapshotVideo":
|
||||||
await takeSnapshotForVideoWorker(job);
|
await takeSnapshotForVideoWorker(job);
|
||||||
break;
|
break;
|
||||||
case "snapshotTick":
|
case "snapshotTick":
|
||||||
await snapshotTickWorker(job);
|
await snapshotTickWorker(job);
|
||||||
break;
|
break;
|
||||||
|
case "collectMilestoneSnapshots":
|
||||||
|
await collectMilestoneSnapshotsWorker(job);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
18
test/db/snapshotSchedule.test.ts
Normal file
18
test/db/snapshotSchedule.test.ts
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
import { assertEquals, assertInstanceOf, assertNotEquals } from "@std/assert";
|
||||||
|
import { findClosestSnapshot } from "lib/db/snapshotSchedule.ts";
|
||||||
|
import { postgresConfig } from "lib/db/pgConfig.ts";
|
||||||
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
|
|
||||||
|
Deno.test("Snapshot Schedule - getShortTermTimeFeaturesForVideo", async () => {
|
||||||
|
const client = new Client(postgresConfig);
|
||||||
|
try {
|
||||||
|
const result = await findClosestSnapshot(client, 247308539, new Date(1741983383000));
|
||||||
|
assertNotEquals(result, null);
|
||||||
|
const created_at = result!.created_at;
|
||||||
|
const views = result!.views;
|
||||||
|
assertInstanceOf(created_at, Date);
|
||||||
|
assertEquals(typeof views, "number");
|
||||||
|
} finally {
|
||||||
|
client.end();
|
||||||
|
}
|
||||||
|
});
|
22
test/ml/akari.json
Normal file
22
test/ml/akari.json
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
{
|
||||||
|
"test1": [
|
||||||
|
{
|
||||||
|
"title": "【洛天依】《一花依世界》(2024重调版)|“抬头仰望,夜空多安详”【原创PV付】",
|
||||||
|
"desc": "本家:BV1Vs411H7JH\n作曲:LS\n作词:杏花包子\n调教:鬼面P\n混音:虎皮猫P\n演唱:洛天依\n曲绘:山下鸭鸭窝\n映像:阿妍\n——————————————————————\n本稿为同人二创,非本家重制",
|
||||||
|
"tags": "发现《一花依世界》, Vsinger创作激励计划, 洛天依, VOCALOID CHINA, 翻唱, 原创PV付, ACE虚拟歌姬, 中文VOCALOID, 国风电子, 一花依世界, ACE Studio, Vsinger创作激励计划2024冬季物语",
|
||||||
|
"label": 2
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"title": "【鏡音レン】アカシア【VOCALOID Cover】",
|
||||||
|
"desc": "鏡音リン・レン 13th Anniversary\n\nMusic:BUMP OF CHICKEN https://youtu.be/BoZ0Zwab6Oc\nust:Maplestyle sm37853236\nOff Vocal: https://youtu.be/YMzrUzq1uX0\nSinger:鏡音レン\n\n氷雨ハルカ\nYoutube :https://t.co/8zuv6g7Acm\nniconico:https://t.co/C6DRfdYAp0\ntwitter :https://twitter.com/hisame_haruka\n\n転載禁止\nPlease do not reprint without my permission.",
|
||||||
|
"tags": "鏡音レン",
|
||||||
|
"label": 0
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"title": "【洛天依原创曲】谪星【姆斯塔之谕】",
|
||||||
|
"desc": "谪星\n\n策划/世界观:听雨\n作词:听雨\n作曲/编曲:太白\n混音:虎皮猫\n人设:以木\n曲绘:Ar极光\n调校:哈士奇p\n视频:苏卿白",
|
||||||
|
"tags": "2025虚拟歌手贺岁纪, 洛天依, 原创歌曲, VOCALOID, 虚拟歌手, 原创音乐, 姆斯塔, 中文VOCALOID",
|
||||||
|
"label": 1
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
46
test/ml/akari.test.ts
Normal file
46
test/ml/akari.test.ts
Normal file
@ -0,0 +1,46 @@
|
|||||||
|
import Akari from "lib/ml/akari.ts";
|
||||||
|
import { assertEquals, assertGreaterOrEqual } from "jsr:@std/assert";
|
||||||
|
import { join } from "$std/path/join.ts";
|
||||||
|
import { SECOND } from "$std/datetime/constants.ts";
|
||||||
|
|
||||||
|
Deno.test("Akari AI - normal cases accuracy test", async () => {
|
||||||
|
const path = import.meta.dirname!;
|
||||||
|
const dataPath = join(path, "akari.json");
|
||||||
|
const rawData = await Deno.readTextFile(dataPath);
|
||||||
|
const data = JSON.parse(rawData);
|
||||||
|
await Akari.init();
|
||||||
|
for (const testCase of data.test1) {
|
||||||
|
const result = await Akari.classifyVideo(
|
||||||
|
testCase.title,
|
||||||
|
testCase.desc,
|
||||||
|
testCase.tags
|
||||||
|
);
|
||||||
|
assertEquals(result, testCase.label);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Deno.test("Akari AI - performance test", async () => {
|
||||||
|
const path = import.meta.dirname!;
|
||||||
|
const dataPath = join(path, "akari.json");
|
||||||
|
const rawData = await Deno.readTextFile(dataPath);
|
||||||
|
const data = JSON.parse(rawData);
|
||||||
|
await Akari.init();
|
||||||
|
const N = 200;
|
||||||
|
const testCase = data.test1[0];
|
||||||
|
const title = testCase.title;
|
||||||
|
const desc = testCase.desc;
|
||||||
|
const tags = testCase.tags;
|
||||||
|
const time = performance.now();
|
||||||
|
for (let i = 0; i < N; i++){
|
||||||
|
await Akari.classifyVideo(
|
||||||
|
title,
|
||||||
|
desc,
|
||||||
|
tags
|
||||||
|
);
|
||||||
|
}
|
||||||
|
const end = performance.now();
|
||||||
|
const elapsed = (end - time) / SECOND;
|
||||||
|
const throughput = N / elapsed;
|
||||||
|
assertGreaterOrEqual(throughput, 100);
|
||||||
|
console.log(`Akari AI throughput: ${throughput.toFixed(1)} samples / sec`)
|
||||||
|
});
|
Loading…
Reference in New Issue
Block a user