Compare commits

...

5 Commits

8 changed files with 56 additions and 9 deletions

View File

@ -4,6 +4,7 @@ import logger from "log/logger.ts";
import { MINUTE } from "$std/datetime/constants.ts"; import { MINUTE } from "$std/datetime/constants.ts";
import { redis } from "@core/db/redis.ts"; import { redis } from "@core/db/redis.ts";
import { Redis } from "ioredis"; import { Redis } from "ioredis";
import {parseTimestampFromPsql} from "../utils/formatTimestampToPostgre.ts";
const REDIS_KEY = "cvsa:snapshot_window_counts"; const REDIS_KEY = "cvsa:snapshot_window_counts";
@ -168,6 +169,20 @@ export async function getLatestSnapshot(client: Client, aid: number): Promise<Sn
}; };
} }
export async function getLatestActiveScheduleWithType(client: Client, aid: number, type: string) {
const query: string = `
SELECT *
FROM snapshot_schedule
WHERE aid = $1
AND type = $2
AND (status = 'pending' OR status = 'processing')
ORDER BY started_at DESC
LIMIT 1
`
const res = await client.queryObject<SnapshotScheduleType>(query, [aid, type]);
return res.rows[0];
}
/* /*
* Creates a new snapshot schedule record. * Creates a new snapshot schedule record.
* @param client The database client. * @param client The database client.
@ -181,10 +196,28 @@ export async function scheduleSnapshot(
targetTime: number, targetTime: number,
force: boolean = false, force: boolean = false,
) { ) {
if (await videoHasActiveScheduleWithType(client, aid, type) && !force) return;
let adjustedTime = new Date(targetTime); let adjustedTime = new Date(targetTime);
const hashActiveSchedule = await videoHasActiveScheduleWithType(client, aid, type);
if (type == "milestone" && hashActiveSchedule) {
const latestActiveSchedule = await getLatestActiveScheduleWithType(client, aid, type);
const latestScheduleStartedAt = new Date(parseTimestampFromPsql(latestActiveSchedule.started_at!));
if (latestScheduleStartedAt > adjustedTime) {
await client.queryObject(`
UPDATE snapshot_schedule
SET started_at = $1
WHERE id = $2
`, [adjustedTime, latestActiveSchedule.id]);
logger.log(
`Updated snapshot schedule for ${aid} at ${adjustedTime.toISOString()}`,
"mq",
"fn:scheduleSnapshot",
);
return;
}
}
if (hashActiveSchedule && !force) return;
if (type !== "milestone" && type !== "new") { if (type !== "milestone" && type !== "new") {
adjustedTime = await adjustSnapshotTime(new Date(targetTime), 1000, redis); adjustedTime = await adjustSnapshotTime(new Date(targetTime), 2000, redis);
} }
logger.log(`Scheduled snapshot for ${aid} at ${adjustedTime.toISOString()}`, "mq", "fn:scheduleSnapshot"); logger.log(`Scheduled snapshot for ${aid} at ${adjustedTime.toISOString()}`, "mq", "fn:scheduleSnapshot");
return client.queryObject( return client.queryObject(

View File

@ -1,6 +1,6 @@
import { Job } from "bullmq"; import { Job } from "bullmq";
import { db } from "db/init.ts"; import { db } from "db/init.ts";
import { getUnlabelledVideos, getVideoInfoFromAllData, insertVideoLabel } from "db/allData.ts"; import { getUnlabelledVideos, getVideoInfoFromAllData, insertVideoLabel } from "../../db/bilibili_metadata.ts";
import Akari from "ml/akari.ts"; import Akari from "ml/akari.ts";
import { ClassifyVideoQueue } from "mq/index.ts"; import { ClassifyVideoQueue } from "mq/index.ts";
import logger from "log/logger.ts"; import logger from "log/logger.ts";

View File

@ -17,7 +17,7 @@ export const dispatchMilestoneSnapshotsWorker = (_job: Job): Promise<void> =>
if (eta > 144) continue; if (eta > 144) continue;
const now = Date.now(); const now = Date.now();
const scheduledNextSnapshotDelay = eta * HOUR; const scheduledNextSnapshotDelay = eta * HOUR;
const maxInterval = 4 * HOUR; const maxInterval = 1 * HOUR;
const minInterval = 1 * SECOND; const minInterval = 1 * SECOND;
const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval); const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval);
const targetTime = now + delay; const targetTime = now + delay;

View File

@ -5,7 +5,7 @@ import { scheduleSnapshot, setSnapshotStatus, snapshotScheduleExists } from "db/
import logger from "log/logger.ts"; import logger from "log/logger.ts";
import { HOUR, MINUTE, SECOND } from "@std/datetime"; import { HOUR, MINUTE, SECOND } from "@std/datetime";
import { lockManager } from "mq/lockManager.ts"; import { lockManager } from "mq/lockManager.ts";
import { getBiliVideoStatus, setBiliVideoStatus } from "db/allData.ts"; import { getBiliVideoStatus, setBiliVideoStatus } from "../../db/bilibili_metadata.ts";
import { insertVideoSnapshot } from "mq/task/getVideoStats.ts"; import { insertVideoSnapshot } from "mq/task/getVideoStats.ts";
import { getSongsPublihsedAt } from "db/songs.ts"; import { getSongsPublihsedAt } from "db/songs.ts";
import { getAdjustedShortTermETA } from "mq/scheduling.ts"; import { getAdjustedShortTermETA } from "mq/scheduling.ts";

View File

@ -6,6 +6,20 @@ import { HOUR, MINUTE } from "@std/datetime";
const log = (value: number, base: number = 10) => Math.log(value) / Math.log(base); const log = (value: number, base: number = 10) => Math.log(value) / Math.log(base);
const getFactor = (x: number) => {
const a = 1.054;
const b = 4.5;
const c = 100;
const u = 0.601;
const g = 455;
if (x>g) {
return log(b/log(x+1),a);
}
else {
return log(b/log(x+c),a)+u;
}
}
/* /*
* Returns the minimum ETA in hours for the next snapshot * Returns the minimum ETA in hours for the next snapshot
* @param client - Postgres client * @param client - Postgres client
@ -35,8 +49,8 @@ export const getAdjustedShortTermETA = async (client: Client, aid: number) => {
const target = closetMilestone(latestSnapshot.views); const target = closetMilestone(latestSnapshot.views);
const viewsToIncrease = target - latestSnapshot.views; const viewsToIncrease = target - latestSnapshot.views;
const eta = viewsToIncrease / (speed + DELTA); const eta = viewsToIncrease / (speed + DELTA);
let factor = log(2.97 / log(viewsToIncrease + 1), 1.14); let factor = getFactor(viewsToIncrease);
factor = truncate(factor, 3, 100); factor = truncate(factor, 4.5, 100);
const adjustedETA = eta / factor; const adjustedETA = eta / factor;
if (adjustedETA < minETAHours) { if (adjustedETA < minETAHours) {
minETAHours = adjustedETA; minETAHours = adjustedETA;

View File

@ -3,7 +3,7 @@ import { getVideoDetails } from "net/getVideoDetails.ts";
import { formatTimestampToPsql } from "utils/formatTimestampToPostgre.ts"; import { formatTimestampToPsql } from "utils/formatTimestampToPostgre.ts";
import logger from "log/logger.ts"; import logger from "log/logger.ts";
import { ClassifyVideoQueue } from "mq/index.ts"; import { ClassifyVideoQueue } from "mq/index.ts";
import { userExistsInBiliUsers, videoExistsInAllData } from "db/allData.ts"; import { userExistsInBiliUsers, videoExistsInAllData } from "../../db/bilibili_metadata.ts";
import { HOUR, SECOND } from "@std/datetime"; import { HOUR, SECOND } from "@std/datetime";
export async function insertVideoInfo(client: Client, aid: number) { export async function insertVideoInfo(client: Client, aid: number) {

View File

@ -1,6 +1,6 @@
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { getLatestVideoAids } from "net/getLatestVideoAids.ts"; import { getLatestVideoAids } from "net/getLatestVideoAids.ts";
import { videoExistsInAllData } from "db/allData.ts"; import { videoExistsInAllData } from "../../db/bilibili_metadata.ts";
import { sleep } from "utils/sleep.ts"; import { sleep } from "utils/sleep.ts";
import { SECOND } from "@std/datetime"; import { SECOND } from "@std/datetime";
import logger from "log/logger.ts"; import logger from "log/logger.ts";