Compare commits
5 Commits
7689e687ff
...
244298913a
Author | SHA1 | Date | |
---|---|---|---|
244298913a | |||
2c47105913 | |||
6eaaf921d6 | |||
288e4f9571 | |||
907c0a6976 |
@ -4,6 +4,7 @@ import logger from "log/logger.ts";
|
||||
import { MINUTE } from "$std/datetime/constants.ts";
|
||||
import { redis } from "@core/db/redis.ts";
|
||||
import { Redis } from "ioredis";
|
||||
import {parseTimestampFromPsql} from "../utils/formatTimestampToPostgre.ts";
|
||||
|
||||
const REDIS_KEY = "cvsa:snapshot_window_counts";
|
||||
|
||||
@ -168,6 +169,20 @@ export async function getLatestSnapshot(client: Client, aid: number): Promise<Sn
|
||||
};
|
||||
}
|
||||
|
||||
export async function getLatestActiveScheduleWithType(client: Client, aid: number, type: string) {
|
||||
const query: string = `
|
||||
SELECT *
|
||||
FROM snapshot_schedule
|
||||
WHERE aid = $1
|
||||
AND type = $2
|
||||
AND (status = 'pending' OR status = 'processing')
|
||||
ORDER BY started_at DESC
|
||||
LIMIT 1
|
||||
`
|
||||
const res = await client.queryObject<SnapshotScheduleType>(query, [aid, type]);
|
||||
return res.rows[0];
|
||||
}
|
||||
|
||||
/*
|
||||
* Creates a new snapshot schedule record.
|
||||
* @param client The database client.
|
||||
@ -181,10 +196,28 @@ export async function scheduleSnapshot(
|
||||
targetTime: number,
|
||||
force: boolean = false,
|
||||
) {
|
||||
if (await videoHasActiveScheduleWithType(client, aid, type) && !force) return;
|
||||
let adjustedTime = new Date(targetTime);
|
||||
const hashActiveSchedule = await videoHasActiveScheduleWithType(client, aid, type);
|
||||
if (type == "milestone" && hashActiveSchedule) {
|
||||
const latestActiveSchedule = await getLatestActiveScheduleWithType(client, aid, type);
|
||||
const latestScheduleStartedAt = new Date(parseTimestampFromPsql(latestActiveSchedule.started_at!));
|
||||
if (latestScheduleStartedAt > adjustedTime) {
|
||||
await client.queryObject(`
|
||||
UPDATE snapshot_schedule
|
||||
SET started_at = $1
|
||||
WHERE id = $2
|
||||
`, [adjustedTime, latestActiveSchedule.id]);
|
||||
logger.log(
|
||||
`Updated snapshot schedule for ${aid} at ${adjustedTime.toISOString()}`,
|
||||
"mq",
|
||||
"fn:scheduleSnapshot",
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (hashActiveSchedule && !force) return;
|
||||
if (type !== "milestone" && type !== "new") {
|
||||
adjustedTime = await adjustSnapshotTime(new Date(targetTime), 1000, redis);
|
||||
adjustedTime = await adjustSnapshotTime(new Date(targetTime), 2000, redis);
|
||||
}
|
||||
logger.log(`Scheduled snapshot for ${aid} at ${adjustedTime.toISOString()}`, "mq", "fn:scheduleSnapshot");
|
||||
return client.queryObject(
|
||||
|
@ -1,6 +1,6 @@
|
||||
import { Job } from "bullmq";
|
||||
import { db } from "db/init.ts";
|
||||
import { getUnlabelledVideos, getVideoInfoFromAllData, insertVideoLabel } from "db/allData.ts";
|
||||
import { getUnlabelledVideos, getVideoInfoFromAllData, insertVideoLabel } from "../../db/bilibili_metadata.ts";
|
||||
import Akari from "ml/akari.ts";
|
||||
import { ClassifyVideoQueue } from "mq/index.ts";
|
||||
import logger from "log/logger.ts";
|
||||
|
@ -17,7 +17,7 @@ export const dispatchMilestoneSnapshotsWorker = (_job: Job): Promise<void> =>
|
||||
if (eta > 144) continue;
|
||||
const now = Date.now();
|
||||
const scheduledNextSnapshotDelay = eta * HOUR;
|
||||
const maxInterval = 4 * HOUR;
|
||||
const maxInterval = 1 * HOUR;
|
||||
const minInterval = 1 * SECOND;
|
||||
const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval);
|
||||
const targetTime = now + delay;
|
||||
|
@ -5,7 +5,7 @@ import { scheduleSnapshot, setSnapshotStatus, snapshotScheduleExists } from "db/
|
||||
import logger from "log/logger.ts";
|
||||
import { HOUR, MINUTE, SECOND } from "@std/datetime";
|
||||
import { lockManager } from "mq/lockManager.ts";
|
||||
import { getBiliVideoStatus, setBiliVideoStatus } from "db/allData.ts";
|
||||
import { getBiliVideoStatus, setBiliVideoStatus } from "../../db/bilibili_metadata.ts";
|
||||
import { insertVideoSnapshot } from "mq/task/getVideoStats.ts";
|
||||
import { getSongsPublihsedAt } from "db/songs.ts";
|
||||
import { getAdjustedShortTermETA } from "mq/scheduling.ts";
|
||||
|
@ -6,6 +6,20 @@ import { HOUR, MINUTE } from "@std/datetime";
|
||||
|
||||
const log = (value: number, base: number = 10) => Math.log(value) / Math.log(base);
|
||||
|
||||
const getFactor = (x: number) => {
|
||||
const a = 1.054;
|
||||
const b = 4.5;
|
||||
const c = 100;
|
||||
const u = 0.601;
|
||||
const g = 455;
|
||||
if (x>g) {
|
||||
return log(b/log(x+1),a);
|
||||
}
|
||||
else {
|
||||
return log(b/log(x+c),a)+u;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Returns the minimum ETA in hours for the next snapshot
|
||||
* @param client - Postgres client
|
||||
@ -35,8 +49,8 @@ export const getAdjustedShortTermETA = async (client: Client, aid: number) => {
|
||||
const target = closetMilestone(latestSnapshot.views);
|
||||
const viewsToIncrease = target - latestSnapshot.views;
|
||||
const eta = viewsToIncrease / (speed + DELTA);
|
||||
let factor = log(2.97 / log(viewsToIncrease + 1), 1.14);
|
||||
factor = truncate(factor, 3, 100);
|
||||
let factor = getFactor(viewsToIncrease);
|
||||
factor = truncate(factor, 4.5, 100);
|
||||
const adjustedETA = eta / factor;
|
||||
if (adjustedETA < minETAHours) {
|
||||
minETAHours = adjustedETA;
|
||||
|
@ -3,7 +3,7 @@ import { getVideoDetails } from "net/getVideoDetails.ts";
|
||||
import { formatTimestampToPsql } from "utils/formatTimestampToPostgre.ts";
|
||||
import logger from "log/logger.ts";
|
||||
import { ClassifyVideoQueue } from "mq/index.ts";
|
||||
import { userExistsInBiliUsers, videoExistsInAllData } from "db/allData.ts";
|
||||
import { userExistsInBiliUsers, videoExistsInAllData } from "../../db/bilibili_metadata.ts";
|
||||
import { HOUR, SECOND } from "@std/datetime";
|
||||
|
||||
export async function insertVideoInfo(client: Client, aid: number) {
|
||||
|
@ -1,6 +1,6 @@
|
||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||
import { getLatestVideoAids } from "net/getLatestVideoAids.ts";
|
||||
import { videoExistsInAllData } from "db/allData.ts";
|
||||
import { videoExistsInAllData } from "../../db/bilibili_metadata.ts";
|
||||
import { sleep } from "utils/sleep.ts";
|
||||
import { SECOND } from "@std/datetime";
|
||||
import logger from "log/logger.ts";
|
||||
|
Loading…
Reference in New Issue
Block a user