feat: full snapshot archiving for all videos
This commit is contained in:
parent
10b761e3db
commit
7a7c5cada9
@ -2,7 +2,7 @@ import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||
import { SnapshotScheduleType } from "@core/db/schema";
|
||||
import logger from "log/logger.ts";
|
||||
import { MINUTE } from "$std/datetime/constants.ts";
|
||||
import { redis } from "../../core/db/redis.ts";
|
||||
import { redis } from "@core/db/redis.ts";
|
||||
import { Redis } from "ioredis";
|
||||
|
||||
const REDIS_KEY = "cvsa:snapshot_window_counts";
|
||||
@ -272,11 +272,17 @@ export async function getSnapshotsInNextSecond(client: Client) {
|
||||
|
||||
export async function getBulkSnapshotsInNextSecond(client: Client) {
|
||||
const query = `
|
||||
SELECT *
|
||||
FROM snapshot_schedule
|
||||
WHERE started_at <= NOW() + INTERVAL '15 seconds' AND status = 'pending' AND type = 'normal'
|
||||
ORDER BY started_at
|
||||
LIMIT 1000;
|
||||
SELECT *
|
||||
FROM snapshot_schedule
|
||||
WHERE (started_at <= NOW() + INTERVAL '15 seconds')
|
||||
AND status = 'pending'
|
||||
AND (type = 'normal' OR type = 'archive')
|
||||
ORDER BY CASE
|
||||
WHEN type = 'normal' THEN 1
|
||||
WHEN type = 'archive' THEN 2
|
||||
END,
|
||||
started_at
|
||||
LIMIT 1000;
|
||||
`;
|
||||
const res = await client.queryObject<SnapshotScheduleType>(query, []);
|
||||
return res.rows;
|
||||
@ -306,3 +312,15 @@ export async function getVideosWithoutActiveSnapshotSchedule(client: Client) {
|
||||
const res = await client.queryObject<{ aid: number }>(query, []);
|
||||
return res.rows.map((r) => Number(r.aid));
|
||||
}
|
||||
|
||||
export async function getAllVideosWithoutActiveSnapshotSchedule(client: Client) {
|
||||
const query: string = `
|
||||
SELECT s.aid
|
||||
FROM bilibili_metadata s
|
||||
LEFT JOIN snapshot_schedule ss ON s.aid = ss.aid AND (ss.status = 'pending' OR ss.status = 'processing')
|
||||
WHERE ss.aid IS NULL
|
||||
`;
|
||||
const res = await client.queryObject<{ aid: number }>(query, []);
|
||||
return res.rows.map((r) => Number(r.aid));
|
||||
}
|
||||
|
||||
|
@ -6,7 +6,7 @@ import {
|
||||
bulkScheduleSnapshot,
|
||||
bulkSetSnapshotStatus,
|
||||
findClosestSnapshot,
|
||||
findSnapshotBefore,
|
||||
findSnapshotBefore, getAllVideosWithoutActiveSnapshotSchedule,
|
||||
getBulkSnapshotsInNextSecond,
|
||||
getLatestSnapshot,
|
||||
getSnapshotsInNextSecond,
|
||||
@ -28,6 +28,7 @@ import { lockManager } from "mq/lockManager.ts";
|
||||
import { getSongsPublihsedAt } from "db/songs.ts";
|
||||
import { bulkGetVideoStats } from "net/bulkGetVideoStats.ts";
|
||||
import { getAdjustedShortTermETA } from "../scheduling.ts";
|
||||
import {SnapshotScheduleType} from "@core/db/schema";
|
||||
|
||||
const priorityMap: { [key: string]: number } = {
|
||||
"milestone": 1,
|
||||
@ -52,13 +53,8 @@ export const bulkSnapshotTickWorker = async (_job: Job) => {
|
||||
const filteredAids = await bulkGetVideosWithoutProcessingSchedules(client, aids);
|
||||
if (filteredAids.length === 0) continue;
|
||||
await bulkSetSnapshotStatus(client, filteredAids, "processing");
|
||||
const dataMap: { [key: number]: number } = {};
|
||||
for (const schedule of group) {
|
||||
const id = Number(schedule.id);
|
||||
dataMap[id] = Number(schedule.aid);
|
||||
}
|
||||
await SnapshotQueue.add("bulkSnapshotVideo", {
|
||||
map: dataMap,
|
||||
schedules: group,
|
||||
}, { priority: 3 });
|
||||
}
|
||||
return `OK`
|
||||
@ -146,6 +142,38 @@ const getRegularSnapshotInterval = async (client: Client, aid: number) => {
|
||||
return 6;
|
||||
};
|
||||
|
||||
export const archiveSnapshotsWorker = async (_job: Job) => {
|
||||
const client = await db.connect();
|
||||
const startedAt = Date.now();
|
||||
if (await lockManager.isLocked("dispatchArchiveSnapshots")) {
|
||||
logger.log("dispatchArchiveSnapshots is already running", "mq");
|
||||
client.release();
|
||||
return;
|
||||
}
|
||||
await lockManager.acquireLock("dispatchArchiveSnapshots", 30 * 60);
|
||||
try {
|
||||
const aids = await getAllVideosWithoutActiveSnapshotSchedule(client);
|
||||
for (const rawAid of aids) {
|
||||
const aid = Number(rawAid);
|
||||
const latestSnapshot = await getLatestVideoSnapshot(client, aid);
|
||||
const now = Date.now();
|
||||
const lastSnapshotedAt = latestSnapshot?.time ?? now;
|
||||
const interval = 168;
|
||||
logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq");
|
||||
const targetTime = lastSnapshotedAt + interval * HOUR;
|
||||
await scheduleSnapshot(client, aid, "archive", targetTime);
|
||||
if (now - startedAt > 250 * MINUTE) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
logger.error(e as Error, "mq", "fn:archiveSnapshotsWorker");
|
||||
} finally {
|
||||
await lockManager.releaseLock("dispatchArchiveSnapshots");
|
||||
client.release();
|
||||
}
|
||||
};
|
||||
|
||||
export const regularSnapshotsWorker = async (_job: Job) => {
|
||||
const client = await db.connect();
|
||||
const startedAt = Date.now();
|
||||
@ -179,13 +207,14 @@ export const regularSnapshotsWorker = async (_job: Job) => {
|
||||
};
|
||||
|
||||
export const takeBulkSnapshotForVideosWorker = async (job: Job) => {
|
||||
const dataMap: { [key: number]: number } = job.data.map;
|
||||
const ids = Object.keys(dataMap).map((id) => Number(id));
|
||||
const schedules: SnapshotScheduleType[] = job.data.schedules;
|
||||
const ids = schedules.map((schedule) => Number(schedule.id));
|
||||
const aidsToFetch: number[] = [];
|
||||
const client = await db.connect();
|
||||
try {
|
||||
for (const id of ids) {
|
||||
const aid = Number(dataMap[id]);
|
||||
for (const schedule of schedules) {
|
||||
const aid = Number(schedule.aid);
|
||||
const id = Number(schedule.id);
|
||||
const exists = await snapshotScheduleExists(client, id);
|
||||
if (!exists) {
|
||||
continue;
|
||||
@ -220,7 +249,11 @@ export const takeBulkSnapshotForVideosWorker = async (job: Job) => {
|
||||
logger.log(`Taken snapshot for video ${aid} in bulk.`, "net", "fn:takeBulkSnapshotForVideosWorker");
|
||||
}
|
||||
await bulkSetSnapshotStatus(client, ids, "completed");
|
||||
for (const aid of aidsToFetch) {
|
||||
|
||||
for (const schedule of schedules) {
|
||||
const aid = Number(schedule.aid);
|
||||
const type = schedule.type;
|
||||
if (type == 'archive') continue;
|
||||
const interval = await getRegularSnapshotInterval(client, aid);
|
||||
logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq");
|
||||
await scheduleSnapshot(client, aid, "normal", Date.now() + interval * HOUR);
|
||||
|
@ -1,4 +1,4 @@
|
||||
import { MINUTE, SECOND } from "$std/datetime/constants.ts";
|
||||
import { HOUR, MINUTE, SECOND } from "$std/datetime/constants.ts";
|
||||
import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "mq/index.ts";
|
||||
import logger from "log/logger.ts";
|
||||
import { initSnapshotWindowCounts } from "db/snapshotSchedule.ts";
|
||||
@ -55,6 +55,11 @@ export async function initMQ() {
|
||||
immediately: true,
|
||||
});
|
||||
|
||||
await SnapshotQueue.upsertJobScheduler("dispatchArchiveSnapshots", {
|
||||
every: 6 * HOUR,
|
||||
immediately: true,
|
||||
});
|
||||
|
||||
await SnapshotQueue.upsertJobScheduler("scheduleCleanup", {
|
||||
every: 30 * MINUTE,
|
||||
immediately: true,
|
||||
|
@ -6,6 +6,7 @@ import { lockManager } from "mq/lockManager.ts";
|
||||
import { WorkerError } from "mq/schema.ts";
|
||||
import { getVideoInfoWorker } from "mq/exec/getLatestVideos.ts";
|
||||
import {
|
||||
archiveSnapshotsWorker,
|
||||
bulkSnapshotTickWorker,
|
||||
collectMilestoneSnapshotsWorker,
|
||||
regularSnapshotsWorker,
|
||||
@ -82,6 +83,8 @@ const snapshotWorker = new Worker(
|
||||
return await takeBulkSnapshotForVideosWorker(job);
|
||||
case "bulkSnapshotTick":
|
||||
return await bulkSnapshotTickWorker(job);
|
||||
case "dispatchArchiveSnapshots":
|
||||
return await archiveSnapshotsWorker(job);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user