fix: bugs of snapshot scheduling
This commit is contained in:
parent
18fc9752bb
commit
8652ac8fb7
@ -1,7 +1,7 @@
|
|||||||
import { DAY, MINUTE } from "$std/datetime/constants.ts";
|
import {DAY, HOUR, MINUTE} from "$std/datetime/constants.ts";
|
||||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
import {Client} from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
import { formatTimestampToPsql } from "lib/utils/formatTimestampToPostgre.ts";
|
import {formatTimestampToPsql} from "lib/utils/formatTimestampToPostgre.ts";
|
||||||
import { SnapshotScheduleType } from "./schema.d.ts";
|
import {SnapshotScheduleType} from "./schema.d.ts";
|
||||||
import logger from "../log/logger.ts";
|
import logger from "../log/logger.ts";
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -98,7 +98,8 @@ export async function getSnapshotScheduleCountWithinRange(client: Client, start:
|
|||||||
* @param targetTime Scheduled time for snapshot. (Timestamp in milliseconds)
|
* @param targetTime Scheduled time for snapshot. (Timestamp in milliseconds)
|
||||||
*/
|
*/
|
||||||
export async function scheduleSnapshot(client: Client, aid: number, type: string, targetTime: number) {
|
export async function scheduleSnapshot(client: Client, aid: number, type: string, targetTime: number) {
|
||||||
const adjustedTime = (await adjustSnapshotTime(client, new Date(targetTime)));
|
const allowedCount = type === "milestone" ? 2000 : 800;
|
||||||
|
const adjustedTime = await adjustSnapshotTime(client, new Date(targetTime), allowedCount);
|
||||||
logger.log(`Scheduled snapshot for ${aid} at ${adjustedTime.toISOString()}`, "mq", "fn:scheduleSnapshot");
|
logger.log(`Scheduled snapshot for ${aid} at ${adjustedTime.toISOString()}`, "mq", "fn:scheduleSnapshot");
|
||||||
return client.queryObject(
|
return client.queryObject(
|
||||||
`INSERT INTO snapshot_schedule (aid, type, started_at) VALUES ($1, $2, $3)`,
|
`INSERT INTO snapshot_schedule (aid, type, started_at) VALUES ($1, $2, $3)`,
|
||||||
@ -110,11 +111,13 @@ export async function scheduleSnapshot(client: Client, aid: number, type: string
|
|||||||
* Adjust the trigger time of the snapshot to ensure it does not exceed the frequency limit
|
* Adjust the trigger time of the snapshot to ensure it does not exceed the frequency limit
|
||||||
* @param client PostgreSQL client
|
* @param client PostgreSQL client
|
||||||
* @param expectedStartTime The expected snapshot time
|
* @param expectedStartTime The expected snapshot time
|
||||||
|
* @param allowedCounts The number of snapshots allowed in the 5-minutes windows.
|
||||||
* @returns The adjusted actual snapshot time
|
* @returns The adjusted actual snapshot time
|
||||||
*/
|
*/
|
||||||
export async function adjustSnapshotTime(
|
export async function adjustSnapshotTime(
|
||||||
client: Client,
|
client: Client,
|
||||||
expectedStartTime: Date,
|
expectedStartTime: Date,
|
||||||
|
allowedCounts: number = 2000
|
||||||
): Promise<Date> {
|
): Promise<Date> {
|
||||||
const findWindowQuery = `
|
const findWindowQuery = `
|
||||||
WITH windows AS (
|
WITH windows AS (
|
||||||
@ -130,30 +133,34 @@ export async function adjustSnapshotTime(
|
|||||||
AND s.started_at < w.window_start + INTERVAL '5 MINUTES'
|
AND s.started_at < w.window_start + INTERVAL '5 MINUTES'
|
||||||
AND s.status = 'pending'
|
AND s.status = 'pending'
|
||||||
GROUP BY w.window_start
|
GROUP BY w.window_start
|
||||||
HAVING COUNT(s.*) < 2000
|
HAVING COUNT(s.*) < ${allowedCounts}
|
||||||
ORDER BY w.window_start
|
ORDER BY w.window_start
|
||||||
LIMIT 1;
|
LIMIT 1;
|
||||||
`;
|
`;
|
||||||
const now = new Date(new Date().getTime() + 5 * MINUTE);
|
const now = new Date();
|
||||||
const nowTruncated = truncateTo5MinInterval(now);
|
const targetTime = expectedStartTime.getTime();
|
||||||
const currentWindowStart = truncateTo5MinInterval(expectedStartTime);
|
let start = new Date(targetTime - 2 * HOUR);
|
||||||
const end = new Date(currentWindowStart.getTime() + 1 * DAY);
|
if (start.getTime() <= now.getTime()) {
|
||||||
|
start = now;
|
||||||
|
}
|
||||||
|
const startTruncated = truncateTo5MinInterval(start);
|
||||||
|
const end = new Date(startTruncated.getTime() + 1 * DAY);
|
||||||
|
|
||||||
const windowResult = await client.queryObject<{ window_start: Date }>(
|
const windowResult = await client.queryObject<{ window_start: Date }>(
|
||||||
findWindowQuery,
|
findWindowQuery,
|
||||||
[nowTruncated, end],
|
[startTruncated, end],
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
||||||
const windowStart = windowResult.rows[0]?.window_start;
|
const windowStart = windowResult.rows[0]?.window_start;
|
||||||
if (!windowStart) {
|
if (!windowStart) {
|
||||||
return expectedStartTime;
|
return expectedStartTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns windowStart if it is within the next 5 minutes
|
|
||||||
if (windowStart.getTime() > new Date().getTime() + 5 * MINUTE) {
|
if (windowStart.getTime() > new Date().getTime() + 5 * MINUTE) {
|
||||||
return windowStart
|
const randomDelay = Math.floor(Math.random() * 5 * MINUTE);
|
||||||
}
|
return new Date(windowStart.getTime() + randomDelay);
|
||||||
else {
|
} else {
|
||||||
return expectedStartTime;
|
return expectedStartTime;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -189,8 +196,19 @@ export async function getSnapshotsInNextSecond(client: Client) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export async function setSnapshotStatus(client: Client, id: number, status: string) {
|
export async function setSnapshotStatus(client: Client, id: number, status: string) {
|
||||||
return client.queryObject(
|
return await client.queryObject(
|
||||||
`UPDATE snapshot_schedule SET status = $2 WHERE id = $1`,
|
`UPDATE snapshot_schedule SET status = $2 WHERE id = $1`,
|
||||||
[id, status],
|
[id, status],
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function getVideosWithoutActiveSnapshotSchedule(client: Client) {
|
||||||
|
const query: string = `
|
||||||
|
SELECT s.aid
|
||||||
|
FROM songs s
|
||||||
|
LEFT JOIN snapshot_schedule ss ON s.aid = ss.aid AND (ss.status = 'pending' OR ss.status = 'processing')
|
||||||
|
WHERE ss.aid IS NULL
|
||||||
|
`;
|
||||||
|
const res = await client.queryObject<{ aid: number }>(query, []);
|
||||||
|
return res.rows.map((r) => Number(r.aid));
|
||||||
|
}
|
||||||
|
@ -4,7 +4,7 @@ import { getVideosNearMilestone } from "lib/db/snapshot.ts";
|
|||||||
import {
|
import {
|
||||||
findClosestSnapshot,
|
findClosestSnapshot,
|
||||||
getLatestSnapshot,
|
getLatestSnapshot,
|
||||||
getSnapshotsInNextSecond,
|
getSnapshotsInNextSecond, getVideosWithoutActiveSnapshotSchedule,
|
||||||
hasAtLeast2Snapshots,
|
hasAtLeast2Snapshots,
|
||||||
scheduleSnapshot,
|
scheduleSnapshot,
|
||||||
setSnapshotStatus,
|
setSnapshotStatus,
|
||||||
@ -15,13 +15,14 @@ import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
|||||||
import { HOUR, MINUTE, SECOND } from "$std/datetime/constants.ts";
|
import { HOUR, MINUTE, SECOND } from "$std/datetime/constants.ts";
|
||||||
import logger from "lib/log/logger.ts";
|
import logger from "lib/log/logger.ts";
|
||||||
import { SnapshotQueue } from "lib/mq/index.ts";
|
import { SnapshotQueue } from "lib/mq/index.ts";
|
||||||
import { insertVideoSnapshot } from "../task/getVideoStats.ts";
|
import { insertVideoSnapshot } from "lib/mq/task/getVideoStats.ts";
|
||||||
import { NetSchedulerError } from "../scheduler.ts";
|
import { NetSchedulerError } from "lib/mq/scheduler.ts";
|
||||||
import { setBiliVideoStatus } from "../../db/allData.ts";
|
import { setBiliVideoStatus } from "lib/db/allData.ts";
|
||||||
import {truncate} from "../../utils/truncate.ts";
|
import { truncate } from "lib/utils/truncate.ts";
|
||||||
|
|
||||||
const priorityMap: { [key: string]: number } = {
|
const priorityMap: { [key: string]: number } = {
|
||||||
"milestone": 1,
|
"milestone": 1,
|
||||||
|
"normal": 3,
|
||||||
};
|
};
|
||||||
|
|
||||||
const snapshotTypeToTaskMap: { [key: string]: string } = {
|
const snapshotTypeToTaskMap: { [key: string]: string } = {
|
||||||
@ -73,7 +74,7 @@ const getAdjustedShortTermETA = async (client: Client, aid: number) => {
|
|||||||
const snapshotsEnough = await hasAtLeast2Snapshots(client, aid);
|
const snapshotsEnough = await hasAtLeast2Snapshots(client, aid);
|
||||||
if (!snapshotsEnough) return 0;
|
if (!snapshotsEnough) return 0;
|
||||||
|
|
||||||
const currentTimestamp = new Date().getTime()
|
const currentTimestamp = new Date().getTime();
|
||||||
const timeIntervals = [3 * MINUTE, 20 * MINUTE, 1 * HOUR, 3 * HOUR, 6 * HOUR];
|
const timeIntervals = [3 * MINUTE, 20 * MINUTE, 1 * HOUR, 3 * HOUR, 6 * HOUR];
|
||||||
const DELTA = 0.00001;
|
const DELTA = 0.00001;
|
||||||
let minETAHours = Infinity;
|
let minETAHours = Infinity;
|
||||||
@ -90,7 +91,7 @@ const getAdjustedShortTermETA = async (client: Client, aid: number) => {
|
|||||||
const viewsToIncrease = target - latestSnapshot.views;
|
const viewsToIncrease = target - latestSnapshot.views;
|
||||||
const eta = viewsToIncrease / (speed + DELTA);
|
const eta = viewsToIncrease / (speed + DELTA);
|
||||||
let factor = log(2.97 / log(viewsToIncrease + 1), 1.14);
|
let factor = log(2.97 / log(viewsToIncrease + 1), 1.14);
|
||||||
factor = truncate(factor, 3, 100)
|
factor = truncate(factor, 3, 100);
|
||||||
const adjustedETA = eta / factor;
|
const adjustedETA = eta / factor;
|
||||||
if (adjustedETA < minETAHours) {
|
if (adjustedETA < minETAHours) {
|
||||||
minETAHours = adjustedETA;
|
minETAHours = adjustedETA;
|
||||||
@ -104,13 +105,13 @@ export const collectMilestoneSnapshotsWorker = async (_job: Job) => {
|
|||||||
try {
|
try {
|
||||||
const videos = await getVideosNearMilestone(client);
|
const videos = await getVideosNearMilestone(client);
|
||||||
for (const video of videos) {
|
for (const video of videos) {
|
||||||
const aid = Number(video.aid)
|
const aid = Number(video.aid);
|
||||||
if (await videoHasActiveSchedule(client, aid)) continue;
|
if (await videoHasActiveSchedule(client, aid)) continue;
|
||||||
const eta = await getAdjustedShortTermETA(client, aid);
|
const eta = await getAdjustedShortTermETA(client, aid);
|
||||||
if (eta > 72) continue;
|
if (eta > 72) continue;
|
||||||
const now = Date.now();
|
const now = Date.now();
|
||||||
const scheduledNextSnapshotDelay = eta * HOUR;
|
const scheduledNextSnapshotDelay = eta * HOUR;
|
||||||
const maxInterval = 60 * MINUTE;
|
const maxInterval = 4 * HOUR;
|
||||||
const minInterval = 1 * SECOND;
|
const minInterval = 1 * SECOND;
|
||||||
const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval);
|
const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval);
|
||||||
const targetTime = now + delay;
|
const targetTime = now + delay;
|
||||||
@ -123,11 +124,31 @@ export const collectMilestoneSnapshotsWorker = async (_job: Job) => {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
export const regularSnapshotsWorker = async (_job: Job) => {
|
||||||
|
const client = await db.connect();
|
||||||
|
try {
|
||||||
|
const aids = await getVideosWithoutActiveSnapshotSchedule(client);
|
||||||
|
for (const rawAid of aids) {
|
||||||
|
const aid = Number(rawAid);
|
||||||
|
if (await videoHasActiveSchedule(client, aid)) continue;
|
||||||
|
const now = Date.now();
|
||||||
|
const targetTime = now + 24 * HOUR;
|
||||||
|
await scheduleSnapshot(client, aid, "normal", targetTime);
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
logger.error(e as Error, "mq", "fn:regularSnapshotsWorker");
|
||||||
|
} finally {
|
||||||
|
client.release();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
export const takeSnapshotForVideoWorker = async (job: Job) => {
|
export const takeSnapshotForVideoWorker = async (job: Job) => {
|
||||||
const id = job.data.id;
|
const id = job.data.id;
|
||||||
const aid = job.data.aid;
|
const aid = Number(job.data.aid);
|
||||||
const task = snapshotTypeToTaskMap[job.data.type] ?? "snapshotVideo";
|
const type = job.data.type;
|
||||||
|
const task = snapshotTypeToTaskMap[type] ?? "snapshotVideo";
|
||||||
const client = await db.connect();
|
const client = await db.connect();
|
||||||
|
const retryInterval = type === "milestone" ? 5 * SECOND : 2 * MINUTE;
|
||||||
try {
|
try {
|
||||||
if (await videoHasProcessingSchedule(client, aid)) {
|
if (await videoHasProcessingSchedule(client, aid)) {
|
||||||
return `ALREADY_PROCESSING`;
|
return `ALREADY_PROCESSING`;
|
||||||
@ -139,12 +160,14 @@ export const takeSnapshotForVideoWorker = async (job: Job) => {
|
|||||||
await setSnapshotStatus(client, id, "completed");
|
await setSnapshotStatus(client, id, "completed");
|
||||||
return `BILI_STATUS_${stat}`;
|
return `BILI_STATUS_${stat}`;
|
||||||
}
|
}
|
||||||
|
await setSnapshotStatus(client, id, "completed");
|
||||||
|
if (type !== "milestone") return `DONE`;
|
||||||
const eta = await getAdjustedShortTermETA(client, aid);
|
const eta = await getAdjustedShortTermETA(client, aid);
|
||||||
if (eta > 72) return "ETA_TOO_LONG";
|
if (eta > 72) return "ETA_TOO_LONG";
|
||||||
const now = Date.now();
|
const now = Date.now();
|
||||||
const targetTime = now + eta * HOUR;
|
const targetTime = now + eta * HOUR;
|
||||||
await setSnapshotStatus(client, id, "completed");
|
await scheduleSnapshot(client, aid, type, targetTime);
|
||||||
await scheduleSnapshot(client, aid, "milestone", targetTime);
|
return `DONE`;
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") {
|
if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") {
|
||||||
logger.warn(
|
logger.warn(
|
||||||
@ -153,7 +176,7 @@ export const takeSnapshotForVideoWorker = async (job: Job) => {
|
|||||||
"fn:takeSnapshotForVideoWorker",
|
"fn:takeSnapshotForVideoWorker",
|
||||||
);
|
);
|
||||||
await setSnapshotStatus(client, id, "completed");
|
await setSnapshotStatus(client, id, "completed");
|
||||||
await scheduleSnapshot(client, aid, "milestone", Date.now() + 5 * SECOND);
|
await scheduleSnapshot(client, aid, type, Date.now() + retryInterval);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
logger.error(e as Error, "mq", "fn:takeSnapshotForVideoWorker");
|
logger.error(e as Error, "mq", "fn:takeSnapshotForVideoWorker");
|
||||||
|
@ -24,10 +24,16 @@ export async function initMQ() {
|
|||||||
removeOnFail: 1,
|
removeOnFail: 1,
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
await SnapshotQueue.upsertJobScheduler("collectMilestoneSnapshots", {
|
await SnapshotQueue.upsertJobScheduler("collectMilestoneSnapshots", {
|
||||||
every: 5 * MINUTE,
|
every: 5 * MINUTE,
|
||||||
immediately: true,
|
immediately: true,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
await SnapshotQueue.upsertJobScheduler("dispatchRegularSnapshots", {
|
||||||
|
every: 30 * MINUTE,
|
||||||
|
immediately: true,
|
||||||
|
});
|
||||||
|
|
||||||
logger.log("Message queue initialized.");
|
logger.log("Message queue initialized.");
|
||||||
}
|
}
|
||||||
|
@ -6,7 +6,7 @@ import { lockManager } from "lib/mq/lockManager.ts";
|
|||||||
import { WorkerError } from "lib/mq/schema.ts";
|
import { WorkerError } from "lib/mq/schema.ts";
|
||||||
import { getVideoInfoWorker } from "lib/mq/exec/getLatestVideos.ts";
|
import { getVideoInfoWorker } from "lib/mq/exec/getLatestVideos.ts";
|
||||||
import {
|
import {
|
||||||
collectMilestoneSnapshotsWorker,
|
collectMilestoneSnapshotsWorker, regularSnapshotsWorker,
|
||||||
snapshotTickWorker,
|
snapshotTickWorker,
|
||||||
takeSnapshotForVideoWorker,
|
takeSnapshotForVideoWorker,
|
||||||
} from "lib/mq/exec/snapshotTick.ts";
|
} from "lib/mq/exec/snapshotTick.ts";
|
||||||
@ -76,6 +76,9 @@ const snapshotWorker = new Worker(
|
|||||||
case "collectMilestoneSnapshots":
|
case "collectMilestoneSnapshots":
|
||||||
await collectMilestoneSnapshotsWorker(job);
|
await collectMilestoneSnapshotsWorker(job);
|
||||||
break;
|
break;
|
||||||
|
case "dispatchRegularSnapshots":
|
||||||
|
await regularSnapshotsWorker(job);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user