fix: several bugs of snapshot scheduling
This commit is contained in:
parent
2c12310e8c
commit
18fc9752bb
@ -2,6 +2,7 @@ import { DAY, MINUTE } from "$std/datetime/constants.ts";
|
|||||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
import { formatTimestampToPsql } from "lib/utils/formatTimestampToPostgre.ts";
|
import { formatTimestampToPsql } from "lib/utils/formatTimestampToPostgre.ts";
|
||||||
import { SnapshotScheduleType } from "./schema.d.ts";
|
import { SnapshotScheduleType } from "./schema.d.ts";
|
||||||
|
import logger from "../log/logger.ts";
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Returns true if the specified `aid` has at least one record with "pending" or "processing" status.
|
Returns true if the specified `aid` has at least one record with "pending" or "processing" status.
|
||||||
@ -51,6 +52,14 @@ export async function findClosestSnapshot(
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function hasAtLeast2Snapshots(client: Client, aid: number) {
|
||||||
|
const res = await client.queryObject<{ count: number }>(
|
||||||
|
`SELECT COUNT(*) FROM video_snapshot WHERE aid = $1`,
|
||||||
|
[aid],
|
||||||
|
);
|
||||||
|
return res.rows[0].count >= 2;
|
||||||
|
}
|
||||||
|
|
||||||
export async function getLatestSnapshot(client: Client, aid: number): Promise<Snapshot | null> {
|
export async function getLatestSnapshot(client: Client, aid: number): Promise<Snapshot | null> {
|
||||||
const res = await client.queryObject<{ created_at: string; views: number }>(
|
const res = await client.queryObject<{ created_at: string; views: number }>(
|
||||||
`SELECT created_at, views FROM video_snapshot WHERE aid = $1 ORDER BY created_at DESC LIMIT 1`,
|
`SELECT created_at, views FROM video_snapshot WHERE aid = $1 ORDER BY created_at DESC LIMIT 1`,
|
||||||
@ -89,10 +98,11 @@ export async function getSnapshotScheduleCountWithinRange(client: Client, start:
|
|||||||
* @param targetTime Scheduled time for snapshot. (Timestamp in milliseconds)
|
* @param targetTime Scheduled time for snapshot. (Timestamp in milliseconds)
|
||||||
*/
|
*/
|
||||||
export async function scheduleSnapshot(client: Client, aid: number, type: string, targetTime: number) {
|
export async function scheduleSnapshot(client: Client, aid: number, type: string, targetTime: number) {
|
||||||
const ajustedTime = await adjustSnapshotTime(client, new Date(targetTime));
|
const adjustedTime = (await adjustSnapshotTime(client, new Date(targetTime)));
|
||||||
|
logger.log(`Scheduled snapshot for ${aid} at ${adjustedTime.toISOString()}`, "mq", "fn:scheduleSnapshot");
|
||||||
return client.queryObject(
|
return client.queryObject(
|
||||||
`INSERT INTO snapshot_schedule (aid, type, started_at) VALUES ($1, $2, $3)`,
|
`INSERT INTO snapshot_schedule (aid, type, started_at) VALUES ($1, $2, $3)`,
|
||||||
[aid, type, ajustedTime.toISOString()],
|
[aid, type, adjustedTime.toISOString()],
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -124,25 +134,28 @@ export async function adjustSnapshotTime(
|
|||||||
ORDER BY w.window_start
|
ORDER BY w.window_start
|
||||||
LIMIT 1;
|
LIMIT 1;
|
||||||
`;
|
`;
|
||||||
for (let i = 0; i < 7; i++) {
|
const now = new Date(new Date().getTime() + 5 * MINUTE);
|
||||||
const now = new Date(new Date().getTime() + 5 * MINUTE);
|
const nowTruncated = truncateTo5MinInterval(now);
|
||||||
const nowTruncated = truncateTo5MinInterval(now);
|
const currentWindowStart = truncateTo5MinInterval(expectedStartTime);
|
||||||
const currentWindowStart = truncateTo5MinInterval(expectedStartTime);
|
const end = new Date(currentWindowStart.getTime() + 1 * DAY);
|
||||||
const end = new Date(currentWindowStart.getTime() + 1 * DAY);
|
|
||||||
|
|
||||||
const windowResult = await client.queryObject<{ window_start: Date }>(
|
const windowResult = await client.queryObject<{ window_start: Date }>(
|
||||||
findWindowQuery,
|
findWindowQuery,
|
||||||
[nowTruncated, end],
|
[nowTruncated, end],
|
||||||
);
|
);
|
||||||
|
|
||||||
const windowStart = windowResult.rows[0]?.window_start;
|
const windowStart = windowResult.rows[0]?.window_start;
|
||||||
if (!windowStart) {
|
if (!windowStart) {
|
||||||
continue;
|
return expectedStartTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
return windowStart;
|
// Returns windowStart if it is within the next 5 minutes
|
||||||
|
if (windowStart.getTime() > new Date().getTime() + 5 * MINUTE) {
|
||||||
|
return windowStart
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
return expectedStartTime;
|
||||||
}
|
}
|
||||||
return expectedStartTime;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -5,6 +5,7 @@ import {
|
|||||||
findClosestSnapshot,
|
findClosestSnapshot,
|
||||||
getLatestSnapshot,
|
getLatestSnapshot,
|
||||||
getSnapshotsInNextSecond,
|
getSnapshotsInNextSecond,
|
||||||
|
hasAtLeast2Snapshots,
|
||||||
scheduleSnapshot,
|
scheduleSnapshot,
|
||||||
setSnapshotStatus,
|
setSnapshotStatus,
|
||||||
videoHasActiveSchedule,
|
videoHasActiveSchedule,
|
||||||
@ -17,6 +18,7 @@ import { SnapshotQueue } from "lib/mq/index.ts";
|
|||||||
import { insertVideoSnapshot } from "../task/getVideoStats.ts";
|
import { insertVideoSnapshot } from "../task/getVideoStats.ts";
|
||||||
import { NetSchedulerError } from "../scheduler.ts";
|
import { NetSchedulerError } from "../scheduler.ts";
|
||||||
import { setBiliVideoStatus } from "../../db/allData.ts";
|
import { setBiliVideoStatus } from "../../db/allData.ts";
|
||||||
|
import {truncate} from "../../utils/truncate.ts";
|
||||||
|
|
||||||
const priorityMap: { [key: string]: number } = {
|
const priorityMap: { [key: string]: number } = {
|
||||||
"milestone": 1,
|
"milestone": 1,
|
||||||
@ -36,8 +38,9 @@ export const snapshotTickWorker = async (_job: Job) => {
|
|||||||
if (schedule.type && priorityMap[schedule.type]) {
|
if (schedule.type && priorityMap[schedule.type]) {
|
||||||
priority = priorityMap[schedule.type];
|
priority = priorityMap[schedule.type];
|
||||||
}
|
}
|
||||||
|
const aid = Number(schedule.aid);
|
||||||
await SnapshotQueue.add("snapshotVideo", {
|
await SnapshotQueue.add("snapshotVideo", {
|
||||||
aid: schedule.aid,
|
aid: aid,
|
||||||
id: schedule.id,
|
id: schedule.id,
|
||||||
type: schedule.type ?? "normal",
|
type: schedule.type ?? "normal",
|
||||||
}, { priority });
|
}, { priority });
|
||||||
@ -67,8 +70,10 @@ const getAdjustedShortTermETA = async (client: Client, aid: number) => {
|
|||||||
const latestSnapshot = await getLatestSnapshot(client, aid);
|
const latestSnapshot = await getLatestSnapshot(client, aid);
|
||||||
// Immediately dispatch a snapshot if there is no snapshot yet
|
// Immediately dispatch a snapshot if there is no snapshot yet
|
||||||
if (!latestSnapshot) return 0;
|
if (!latestSnapshot) return 0;
|
||||||
|
const snapshotsEnough = await hasAtLeast2Snapshots(client, aid);
|
||||||
|
if (!snapshotsEnough) return 0;
|
||||||
|
|
||||||
const currentTimestamp = Date.now();
|
const currentTimestamp = new Date().getTime()
|
||||||
const timeIntervals = [3 * MINUTE, 20 * MINUTE, 1 * HOUR, 3 * HOUR, 6 * HOUR];
|
const timeIntervals = [3 * MINUTE, 20 * MINUTE, 1 * HOUR, 3 * HOUR, 6 * HOUR];
|
||||||
const DELTA = 0.00001;
|
const DELTA = 0.00001;
|
||||||
let minETAHours = Infinity;
|
let minETAHours = Infinity;
|
||||||
@ -77,13 +82,15 @@ const getAdjustedShortTermETA = async (client: Client, aid: number) => {
|
|||||||
const date = new Date(currentTimestamp - timeInterval);
|
const date = new Date(currentTimestamp - timeInterval);
|
||||||
const snapshot = await findClosestSnapshot(client, aid, date);
|
const snapshot = await findClosestSnapshot(client, aid, date);
|
||||||
if (!snapshot) continue;
|
if (!snapshot) continue;
|
||||||
const hoursDiff = (currentTimestamp - snapshot.created_at) / HOUR;
|
const hoursDiff = (latestSnapshot.created_at - snapshot.created_at) / HOUR;
|
||||||
const viewsDiff = snapshot.views - latestSnapshot.views;
|
const viewsDiff = latestSnapshot.views - snapshot.views;
|
||||||
|
if (viewsDiff <= 0) continue;
|
||||||
const speed = viewsDiff / (hoursDiff + DELTA);
|
const speed = viewsDiff / (hoursDiff + DELTA);
|
||||||
const target = closetMilestone(latestSnapshot.views);
|
const target = closetMilestone(latestSnapshot.views);
|
||||||
const viewsToIncrease = target - latestSnapshot.views;
|
const viewsToIncrease = target - latestSnapshot.views;
|
||||||
const eta = viewsToIncrease / (speed + DELTA);
|
const eta = viewsToIncrease / (speed + DELTA);
|
||||||
const factor = log(2.97 / log(viewsToIncrease + 1), 1.14);
|
let factor = log(2.97 / log(viewsToIncrease + 1), 1.14);
|
||||||
|
factor = truncate(factor, 3, 100)
|
||||||
const adjustedETA = eta / factor;
|
const adjustedETA = eta / factor;
|
||||||
if (adjustedETA < minETAHours) {
|
if (adjustedETA < minETAHours) {
|
||||||
minETAHours = adjustedETA;
|
minETAHours = adjustedETA;
|
||||||
@ -97,12 +104,17 @@ export const collectMilestoneSnapshotsWorker = async (_job: Job) => {
|
|||||||
try {
|
try {
|
||||||
const videos = await getVideosNearMilestone(client);
|
const videos = await getVideosNearMilestone(client);
|
||||||
for (const video of videos) {
|
for (const video of videos) {
|
||||||
if (await videoHasActiveSchedule(client, video.aid)) continue;
|
const aid = Number(video.aid)
|
||||||
const eta = await getAdjustedShortTermETA(client, video.aid);
|
if (await videoHasActiveSchedule(client, aid)) continue;
|
||||||
|
const eta = await getAdjustedShortTermETA(client, aid);
|
||||||
if (eta > 72) continue;
|
if (eta > 72) continue;
|
||||||
const now = Date.now();
|
const now = Date.now();
|
||||||
const targetTime = now + eta * HOUR;
|
const scheduledNextSnapshotDelay = eta * HOUR;
|
||||||
await scheduleSnapshot(client, video.aid, "milestone", targetTime);
|
const maxInterval = 60 * MINUTE;
|
||||||
|
const minInterval = 1 * SECOND;
|
||||||
|
const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval);
|
||||||
|
const targetTime = now + delay;
|
||||||
|
await scheduleSnapshot(client, aid, "milestone", targetTime);
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
logger.error(e as Error, "mq", "fn:collectMilestoneSnapshotsWorker");
|
logger.error(e as Error, "mq", "fn:collectMilestoneSnapshotsWorker");
|
||||||
|
@ -18,6 +18,11 @@ export async function initMQ() {
|
|||||||
await SnapshotQueue.upsertJobScheduler("snapshotTick", {
|
await SnapshotQueue.upsertJobScheduler("snapshotTick", {
|
||||||
every: 1 * SECOND,
|
every: 1 * SECOND,
|
||||||
immediately: true,
|
immediately: true,
|
||||||
|
}, {
|
||||||
|
opts: {
|
||||||
|
removeOnComplete: 1,
|
||||||
|
removeOnFail: 1,
|
||||||
|
},
|
||||||
});
|
});
|
||||||
await SnapshotQueue.upsertJobScheduler("collectMilestoneSnapshots", {
|
await SnapshotQueue.upsertJobScheduler("collectMilestoneSnapshots", {
|
||||||
every: 5 * MINUTE,
|
every: 5 * MINUTE,
|
||||||
|
@ -14,12 +14,14 @@ import {
|
|||||||
Deno.addSignalListener("SIGINT", async () => {
|
Deno.addSignalListener("SIGINT", async () => {
|
||||||
logger.log("SIGINT Received: Shutting down workers...", "mq");
|
logger.log("SIGINT Received: Shutting down workers...", "mq");
|
||||||
await latestVideoWorker.close(true);
|
await latestVideoWorker.close(true);
|
||||||
|
await snapshotWorker.close(true);
|
||||||
Deno.exit();
|
Deno.exit();
|
||||||
});
|
});
|
||||||
|
|
||||||
Deno.addSignalListener("SIGTERM", async () => {
|
Deno.addSignalListener("SIGTERM", async () => {
|
||||||
logger.log("SIGTERM Received: Shutting down workers...", "mq");
|
logger.log("SIGTERM Received: Shutting down workers...", "mq");
|
||||||
await latestVideoWorker.close(true);
|
await latestVideoWorker.close(true);
|
||||||
|
await snapshotWorker.close(true);
|
||||||
Deno.exit();
|
Deno.exit();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user