feat: bulk fetch

This commit is contained in:
alikia2x (寒寒) 2025-03-27 02:58:42 +08:00
parent aea9e10d1a
commit d229e49ff2
Signed by: alikia2x
GPG Key ID: 56209E0CCD8420C6
3 changed files with 76 additions and 9 deletions

View File

@ -84,6 +84,14 @@ export async function videoHasProcessingSchedule(client: Client, aid: number) {
return res.rows.length > 0; return res.rows.length > 0;
} }
export async function bulkGetVideosWithoutProcessingSchedules(client: Client, aids: number[]) {
const res = await client.queryObject<{ aid: number }>(
`SELECT aid FROM snapshot_schedule WHERE aid = ANY($1) AND status != 'processing' GROUP BY aid`,
[aids],
);
return res.rows.map((row) => row.aid);
}
interface Snapshot { interface Snapshot {
created_at: number; created_at: number;
views: number; views: number;
@ -196,6 +204,12 @@ export async function scheduleSnapshot(client: Client, aid: number, type: string
); );
} }
export async function bulkScheduleSnapshot(client: Client, aids: number[], type: string, targetTime: number, force: boolean = false) {
for (const aid of aids) {
await scheduleSnapshot(client, aid, type, targetTime, force);
}
}
export async function adjustSnapshotTime( export async function adjustSnapshotTime(
expectedStartTime: Date, expectedStartTime: Date,
allowedCounts: number = 1000, allowedCounts: number = 1000,

View File

@ -2,6 +2,8 @@ import { Job } from "bullmq";
import { db } from "lib/db/init.ts"; import { db } from "lib/db/init.ts";
import { getLatestVideoSnapshot, getVideosNearMilestone } from "lib/db/snapshot.ts"; import { getLatestVideoSnapshot, getVideosNearMilestone } from "lib/db/snapshot.ts";
import { import {
bulkGetVideosWithoutProcessingSchedules,
bulkScheduleSnapshot,
bulkSetSnapshotStatus, bulkSetSnapshotStatus,
findClosestSnapshot, findClosestSnapshot,
findSnapshotBefore, findSnapshotBefore,
@ -41,6 +43,22 @@ export const snapshotTickWorker = async (_job: Job) => {
const client = await db.connect(); const client = await db.connect();
try { try {
const schedules = await getSnapshotsInNextSecond(client); const schedules = await getSnapshotsInNextSecond(client);
const count = schedules.length;
const groups = Math.ceil(count / 30);
for (let i = 0; i < groups; i++) {
const group = schedules.slice(i * 30, (i + 1) * 30);
const aids = group.map((schedule) => schedule.aid);
const filteredAids = await bulkGetVideosWithoutProcessingSchedules(client, aids);
if (filteredAids.length === 0) continue;
await bulkSetSnapshotStatus(client, filteredAids, "processing");
const dataMap: { [key: number]: number } = {};
for (const schedule of group) {
dataMap[schedule.id] = schedule.aid;
}
await SnapshotQueue.add("bulkSnapshotVideo", {
map: dataMap,
}, { priority: 3 });
}
for (const schedule of schedules) { for (const schedule of schedules) {
if (await videoHasProcessingSchedule(client, schedule.aid)) { if (await videoHasProcessingSchedule(client, schedule.aid)) {
return `ALREADY_PROCESSING`; return `ALREADY_PROCESSING`;
@ -174,7 +192,7 @@ export const regularSnapshotsWorker = async (_job: Job) => {
const now = Date.now(); const now = Date.now();
const lastSnapshotedAt = latestSnapshot?.time ?? now; const lastSnapshotedAt = latestSnapshot?.time ?? now;
const interval = await getRegularSnapshotInterval(client, aid); const interval = await getRegularSnapshotInterval(client, aid);
logger.log(`Schedule regular snapshot for aid ${aid} in ${interval} hours.`, "mq") logger.log(`Schedule regular snapshot for aid ${aid} in ${interval} hours.`, "mq");
const targetTime = truncate(lastSnapshotedAt + interval * HOUR, now + 1, now + 100000 * WEEK); const targetTime = truncate(lastSnapshotedAt + interval * HOUR, now + 1, now + 100000 * WEEK);
await scheduleSnapshot(client, aid, "normal", targetTime); await scheduleSnapshot(client, aid, "normal", targetTime);
if (now - startedAt > 25 * MINUTE) { if (now - startedAt > 25 * MINUTE) {
@ -199,20 +217,47 @@ export const takeBulkSnapshotForVideosWorker = async (job: Job) => {
const aid = Number(dataMap[id]); const aid = Number(dataMap[id]);
const exists = await snapshotScheduleExists(client, id); const exists = await snapshotScheduleExists(client, id);
if (!exists) { if (!exists) {
continue continue;
} }
aidsToFetch.push(aid); aidsToFetch.push(aid);
} }
const data = await bulkGetVideoStats(aidsToFetch); const data = await bulkGetVideoStats(aidsToFetch);
if (typeof data === "number") { if (typeof data === "number") {
await bulkSetSnapshotStatus(client, ids, "failed"); await bulkSetSnapshotStatus(client, ids, "failed");
await bulkScheduleSnapshot(client, aidsToFetch, "normal", Date.now() + 15 * SECOND);
return `GET_BILI_STATUS_${data}`; return `GET_BILI_STATUS_${data}`;
} }
for (const video of data) {
const aid = video.id;
const stat = video.cnt_info;
const views = stat.play;
const danmakus = stat.danmaku;
const replies = stat.reply;
const likes = stat.thumb_up;
const coins = stat.coin;
const shares = stat.share;
const favorites = stat.collect;
const query: string = `
INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`;
await client.queryObject(
query,
[aid, views, danmakus, replies, likes, coins, shares, favorites],
);
logger.log(`Taken snapshot for video ${aid} in bulk.`, "net", "fn:takeBulkSnapshotForVideosWorker");
} }
finally { for (const aid of aidsToFetch) {
const interval = await getRegularSnapshotInterval(client, aid);
logger.log(`Schedule regular snapshot for aid ${aid} in ${interval} hours.`, "mq");
await scheduleSnapshot(client, aid, "normal", Date.now() + interval * HOUR);
}
return `DONE`;
} finally {
client.release(); client.release();
} }
} };
export const takeSnapshotForVideoWorker = async (job: Job) => { export const takeSnapshotForVideoWorker = async (job: Job) => {
const id = job.data.id; const id = job.data.id;
@ -242,7 +287,7 @@ export const takeSnapshotForVideoWorker = async (job: Job) => {
await setSnapshotStatus(client, id, "completed"); await setSnapshotStatus(client, id, "completed");
if (type === "normal") { if (type === "normal") {
const interval = await getRegularSnapshotInterval(client, aid); const interval = await getRegularSnapshotInterval(client, aid);
logger.log(`Schedule regular snapshot for aid ${aid} in ${interval} hours.`, "mq") logger.log(`Schedule regular snapshot for aid ${aid} in ${interval} hours.`, "mq");
await scheduleSnapshot(client, aid, type, Date.now() + interval * HOUR); await scheduleSnapshot(client, aid, type, Date.now() + interval * HOUR);
return `DONE`; return `DONE`;
} else if (type === "new") { } else if (type === "new") {
@ -309,7 +354,11 @@ export const scheduleCleanupWorker = async (_job: Job) => {
const type = row.type; const type = row.type;
await setSnapshotStatus(client, id, "timeout"); await setSnapshotStatus(client, id, "timeout");
await scheduleSnapshot(client, aid, type, Date.now() + 10 * SECOND); await scheduleSnapshot(client, aid, type, Date.now() + 10 * SECOND);
logger.log(`Schedule ${id} has no response received for 5 minutes, rescheduled.`, "mq", "fn:scheduleCleanupWorker") logger.log(
`Schedule ${id} has no response received for 5 minutes, rescheduled.`,
"mq",
"fn:scheduleCleanupWorker",
);
} }
} catch (e) { } catch (e) {
logger.error(e as Error, "mq", "fn:scheduleCleanupWorker"); logger.error(e as Error, "mq", "fn:scheduleCleanupWorker");

View File

@ -10,7 +10,8 @@ import {
regularSnapshotsWorker, regularSnapshotsWorker,
snapshotTickWorker, snapshotTickWorker,
takeSnapshotForVideoWorker, takeSnapshotForVideoWorker,
scheduleCleanupWorker scheduleCleanupWorker,
takeBulkSnapshotForVideosWorker
} from "lib/mq/exec/snapshotTick.ts"; } from "lib/mq/exec/snapshotTick.ts";
Deno.addSignalListener("SIGINT", async () => { Deno.addSignalListener("SIGINT", async () => {
@ -84,6 +85,9 @@ const snapshotWorker = new Worker(
case "scheduleCleanup": case "scheduleCleanup":
await scheduleCleanupWorker(job); await scheduleCleanupWorker(job);
break; break;
case "bulkSnapshotVideo":
await takeBulkSnapshotForVideosWorker(job);
break;
default: default:
break; break;
} }