Compare commits

...

31 Commits

Author SHA1 Message Date
d44ba8a0ae
fix: incorrect offset calculation in adjustSnapshotTime 2025-03-27 02:13:33 +08:00
d6dd4d9334
fix: potential shifting for obtained window offset in last commit 2025-03-27 02:09:48 +08:00
d5c278ae06
improve: cache result for adjustSnapshotTime 2025-03-27 01:48:10 +08:00
c80b047e0c
fix: did not release db client before quiting 2025-03-27 01:31:54 +08:00
92678066a7
update: forgot to plus one for iteration count 2025-03-26 23:37:12 +08:00
afee7f58bf
update: return when meeting non-0 video status code in snapshotVideo job 2025-03-26 23:25:07 +08:00
5450c17e13
update: reduce rate limit, no longer collect deleted videos for milestone monitoring 2025-03-26 23:13:53 +08:00
2c51c3c09c
add: log for getRegularSnapshotInterval 2025-03-26 23:04:08 +08:00
7adc370ba2
update: increase concurrency of snapshot worker 2025-03-26 23:00:45 +08:00
b286a9d7b1
update: use findSnapshotBefore instead of findClosetSnapshot in regular snapshot scheduling 2025-03-26 22:59:52 +08:00
cabb360a16
fix: missing entrance for schedule type 'new'
update: perf log text
2025-03-25 22:58:56 +08:00
17ded63758
update: perf monitor for adjustSnapshotTime 2025-03-25 22:38:57 +08:00
2d8e990bc9
fix: prevent duplicate jobs added to queue 2025-03-25 21:33:58 +08:00
b33fd790d1
fix: snapshot process early returned incorrectly 2025-03-25 21:29:47 +08:00
86337e3802
add: set scheduled job status to 'processing' before adding to bullmq 2025-03-25 05:05:33 +08:00
76c9256662
test: looser rate limits for snapshot 2025-03-25 04:57:24 +08:00
a178b7fc16
update: force schedule for new songs 2025-03-25 04:52:19 +08:00
6b7142a6d5
fix: new jobs may be scheduled before the current time 2025-03-24 23:43:54 +08:00
314beb54b5
add: upsertJob for scheduleCleanup 2025-03-24 23:39:29 +08:00
d8b47f8fc8
add: log for scheduleCleanupWorker 2025-03-24 23:37:46 +08:00
fa058b22fe
fix: job consumption rate too low, add outdated job cleanup 2025-03-24 23:36:01 +08:00
48b1130cba
feat: continuous monitoring of new songs 2025-03-24 04:53:43 +08:00
42db333d1a
temp: schedule for new songs 2025-03-24 04:40:10 +08:00
0a73d28623
fix: incorrect time base when adjusting snapshot time 2025-03-24 04:15:51 +08:00
584a1be9f9
test: debug for adjusting snapshot time 2025-03-24 04:08:34 +08:00
20731c0530
test: debug for init window 2025-03-24 04:05:22 +08:00
cb573e55d9
fix: incorrect offset when initializing 2025-03-24 04:01:44 +08:00
8be68248df
fix: ignored the case where snapshots are actually the same 2025-03-24 03:49:06 +08:00
6f4a26e8b3
test: dynamic interval for reglar snapshots 2025-03-24 03:43:26 +08:00
c99318e2d3
update: missing env in deno job 2025-03-24 02:39:57 +08:00
3028dc13c7
improve: performance when dispatching jobs 2025-03-24 02:35:55 +08:00
13 changed files with 334 additions and 134 deletions

View File

@ -12,7 +12,7 @@
"update": "deno run -A -r https://fresh.deno.dev/update .", "update": "deno run -A -r https://fresh.deno.dev/update .",
"worker:main": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write --allow-run ./src/worker.ts", "worker:main": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write --allow-run ./src/worker.ts",
"worker:filter": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/filterWorker.ts", "worker:filter": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/filterWorker.ts",
"adder": "deno run --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts", "adder": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts",
"bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts", "bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts",
"all": "concurrently 'deno task worker:main' 'deno task adder' 'deno task bullui' 'deno task worker:filter'", "all": "concurrently 'deno task worker:main' 'deno task adder' 'deno task bullui' 'deno task worker:filter'",
"test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run" "test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run"

View File

@ -75,3 +75,13 @@ export async function setBiliVideoStatus(client: Client, aid: number, status: nu
[status, aid], [status, aid],
); );
} }
export async function getBiliVideoStatus(client: Client, aid: number) {
const queryResult = await client.queryObject<{ status: number }>(
`SELECT status FROM bilibili_metadata WHERE aid = $1`,
[aid],
);
const rows = queryResult.rows;
if (rows.length === 0) return 0;
return rows[0].status;
}

View File

@ -7,6 +7,7 @@ export async function getVideosNearMilestone(client: Client) {
FROM latest_video_snapshot ls FROM latest_video_snapshot ls
INNER JOIN INNER JOIN
songs s ON ls.aid = s.aid songs s ON ls.aid = s.aid
AND s.deleted = false
WHERE WHERE
s.deleted = false AND s.deleted = false AND
(views >= 90000 AND views < 100000) OR (views >= 90000 AND views < 100000) OR

View File

@ -3,6 +3,62 @@ import { formatTimestampToPsql } from "lib/utils/formatTimestampToPostgre.ts";
import { SnapshotScheduleType } from "./schema.d.ts"; import { SnapshotScheduleType } from "./schema.d.ts";
import logger from "lib/log/logger.ts"; import logger from "lib/log/logger.ts";
import { MINUTE } from "$std/datetime/constants.ts"; import { MINUTE } from "$std/datetime/constants.ts";
import { redis } from "lib/db/redis.ts";
import { Redis } from "ioredis";
const WINDOW_SIZE = 2880;
const REDIS_KEY = "cvsa:snapshot_window_counts";
let lastAvailableWindow: { offset: number; count: number } | null = null;
function getCurrentWindowIndex(): number {
const now = new Date();
const minutesSinceMidnight = now.getHours() * 60 + now.getMinutes();
const currentWindow = Math.floor(minutesSinceMidnight / 5);
return currentWindow;
}
export async function refreshSnapshotWindowCounts(client: Client, redisClient: Redis) {
const now = new Date();
const startTime = now.getTime();
const result = await client.queryObject<{ window_start: Date; count: number }>`
SELECT
date_trunc('hour', started_at) +
(EXTRACT(minute FROM started_at)::int / 5 * INTERVAL '5 minutes') AS window_start,
COUNT(*) AS count
FROM snapshot_schedule
WHERE started_at >= NOW() AND status = 'pending' AND started_at <= NOW() + INTERVAL '10 days'
GROUP BY 1
ORDER BY window_start
`
await redisClient.del(REDIS_KEY);
const currentWindow = getCurrentWindowIndex();
for (const row of result.rows) {
const targetOffset = Math.floor((row.window_start.getTime() - startTime) / (5 * MINUTE));
const offset = (currentWindow + targetOffset);
if (offset >= 0 && offset < WINDOW_SIZE) {
await redisClient.hset(REDIS_KEY, offset.toString(), Number(row.count));
}
}
lastAvailableWindow = null;
}
export async function initSnapshotWindowCounts(client: Client, redisClient: Redis) {
await refreshSnapshotWindowCounts(client, redisClient);
setInterval(async () => {
await refreshSnapshotWindowCounts(client, redisClient);
}, 5 * MINUTE);
}
async function getWindowCount(redisClient: Redis, offset: number): Promise<number> {
const count = await redisClient.hget(REDIS_KEY, offset.toString());
return count ? parseInt(count, 10) : 0;
}
export async function snapshotScheduleExists(client: Client, id: number) { export async function snapshotScheduleExists(client: Client, id: number) {
const res = await client.queryObject<{ id: number }>( const res = await client.queryObject<{ id: number }>(
@ -12,9 +68,6 @@ export async function snapshotScheduleExists(client: Client, id: number) {
return res.rows.length > 0; return res.rows.length > 0;
} }
/*
Returns true if the specified `aid` has at least one record with "pending" or "processing" status.
*/
export async function videoHasActiveSchedule(client: Client, aid: number) { export async function videoHasActiveSchedule(client: Client, aid: number) {
const res = await client.queryObject<{ status: string }>( const res = await client.queryObject<{ status: string }>(
`SELECT status FROM snapshot_schedule WHERE aid = $1 AND (status = 'pending' OR status = 'processing')`, `SELECT status FROM snapshot_schedule WHERE aid = $1 AND (status = 'pending' OR status = 'processing')`,
@ -60,6 +113,31 @@ export async function findClosestSnapshot(
}; };
} }
export async function findSnapshotBefore(
client: Client,
aid: number,
targetTime: Date,
): Promise<Snapshot | null> {
const query = `
SELECT created_at, views
FROM video_snapshot
WHERE aid = $1
AND created_at <= $2::timestamptz
ORDER BY created_at DESC
LIMIT 1
`;
const result = await client.queryObject<{ created_at: string; views: number }>(
query,
[aid, targetTime.toISOString()],
);
if (result.rows.length === 0) return null;
const row = result.rows[0];
return {
created_at: new Date(row.created_at).getTime(),
views: row.views,
};
}
export async function hasAtLeast2Snapshots(client: Client, aid: number) { export async function hasAtLeast2Snapshots(client: Client, aid: number) {
const res = await client.queryObject<{ count: number }>( const res = await client.queryObject<{ count: number }>(
`SELECT COUNT(*) FROM video_snapshot WHERE aid = $1`, `SELECT COUNT(*) FROM video_snapshot WHERE aid = $1`,
@ -105,10 +183,12 @@ export async function getSnapshotScheduleCountWithinRange(client: Client, start:
* @param aid The aid of the video. * @param aid The aid of the video.
* @param targetTime Scheduled time for snapshot. (Timestamp in milliseconds) * @param targetTime Scheduled time for snapshot. (Timestamp in milliseconds)
*/ */
export async function scheduleSnapshot(client: Client, aid: number, type: string, targetTime: number) { export async function scheduleSnapshot(client: Client, aid: number, type: string, targetTime: number, force: boolean = false) {
if (await videoHasActiveSchedule(client, aid)) return; if (await videoHasActiveSchedule(client, aid) && !force) return;
const allowedCount = type === "milestone" ? 2000 : 800; let adjustedTime = new Date(targetTime);
const adjustedTime = await adjustSnapshotTime(client, new Date(targetTime), allowedCount); if (type !== "milestone" && type !== "new") {
adjustedTime = await adjustSnapshotTime(new Date(targetTime), 1000, redis);
}
logger.log(`Scheduled snapshot for ${aid} at ${adjustedTime.toISOString()}`, "mq", "fn:scheduleSnapshot"); logger.log(`Scheduled snapshot for ${aid} at ${adjustedTime.toISOString()}`, "mq", "fn:scheduleSnapshot");
return client.queryObject( return client.queryObject(
`INSERT INTO snapshot_schedule (aid, type, started_at) VALUES ($1, $2, $3)`, `INSERT INTO snapshot_schedule (aid, type, started_at) VALUES ($1, $2, $3)`,
@ -116,76 +196,56 @@ export async function scheduleSnapshot(client: Client, aid: number, type: string
); );
} }
/**
* Adjust the trigger time of the snapshot to ensure it does not exceed the frequency limit
* @param client PostgreSQL client
* @param expectedStartTime The expected snapshot time
* @param allowedCounts The number of snapshots allowed in a 5-minute window (default: 2000)
* @returns The adjusted actual snapshot time within the first available window
*/
export async function adjustSnapshotTime( export async function adjustSnapshotTime(
client: Client,
expectedStartTime: Date, expectedStartTime: Date,
allowedCounts: number = 2000, allowedCounts: number = 1000,
redisClient: Redis,
): Promise<Date> { ): Promise<Date> {
// Query to find the closest available window by checking both past and future windows const currentWindow = getCurrentWindowIndex();
const findWindowQuery = ` const targetOffset = Math.floor((expectedStartTime.getTime() - Date.now()) / (5 * MINUTE)) - 6;
WITH base AS (
SELECT
date_trunc('minute', $1::timestamp)
- (EXTRACT(minute FROM $1::timestamp)::int % 5 * INTERVAL '1 minute') AS base_time
),
offsets AS (
SELECT generate_series(-100, 100) AS "offset"
),
candidate_windows AS (
SELECT
(base.base_time + ("offset" * INTERVAL '5 minutes')) AS window_start,
ABS("offset") AS distance
FROM base
CROSS JOIN offsets
)
SELECT
window_start
FROM
candidate_windows cw
LEFT JOIN
snapshot_schedule s
ON
s.started_at >= cw.window_start
AND s.started_at < cw.window_start + INTERVAL '5 minutes'
AND s.status = 'pending'
GROUP BY
cw.window_start, cw.distance
HAVING
COUNT(s.*) < $2
ORDER BY
cw.distance, cw.window_start
LIMIT 1;
`;
try { let initialOffset = currentWindow + Math.max(targetOffset, 0);
// Execute query to find the first available window
const windowResult = await client.queryObject<{ window_start: Date }>(
findWindowQuery,
[expectedStartTime, allowedCounts],
);
// If no available window found, return original time (may exceed limit) if (lastAvailableWindow && lastAvailableWindow.count < allowedCounts) {
if (windowResult.rows.length === 0) { initialOffset = Math.max(lastAvailableWindow.offset - 2, 0);
}
let timePerIteration = 0;
const t = performance.now();
for (let i = initialOffset; i < WINDOW_SIZE; i++) {
const offset = i;
const count = await getWindowCount(redisClient, offset);
if (count < allowedCounts) {
const newCount = await redisClient.hincrby(REDIS_KEY, offset.toString(), 1);
lastAvailableWindow = { offset, count: newCount };
const startPoint = new Date();
startPoint.setHours(0, 0, 0, 0);
const startTime = startPoint.getTime();
const windowStart = startTime + offset * 5 * MINUTE;
const randomDelay = Math.floor(Math.random() * 5 * MINUTE);
const delayedDate = new Date(windowStart + randomDelay);
const now = new Date();
if (delayedDate.getTime() < now.getTime()) {
const elapsed = performance.now() - t;
timePerIteration = elapsed / (i+1);
logger.log(`${timePerIteration.toFixed(3)}ms * ${i+1}iterations`, "perf", "fn:adjustSnapshotTime");
return now;
}
const elapsed = performance.now() - t;
timePerIteration = elapsed / (i+1);
logger.log(`${timePerIteration.toFixed(3)}ms * ${i+1}iterations`, "perf", "fn:adjustSnapshotTime");
return delayedDate;
}
}
const elapsed = performance.now() - t;
timePerIteration = elapsed / WINDOW_SIZE;
logger.log(`${timePerIteration.toFixed(3)}ms * ${WINDOW_SIZE}iterations`, "perf", "fn:adjustSnapshotTime");
return expectedStartTime; return expectedStartTime;
} }
// Get the target window start time
const windowStart = windowResult.rows[0].window_start;
// Add random delay within the 5-minute window to distribute load
const randomDelay = Math.floor(Math.random() * 5 * MINUTE);
return new Date(windowStart.getTime() + randomDelay);
} catch {
return expectedStartTime; // Fallback to original time
}
}
export async function getSnapshotsInNextSecond(client: Client) { export async function getSnapshotsInNextSecond(client: Client) {
const query = ` const query = `
@ -198,7 +258,7 @@ export async function getSnapshotsInNextSecond(client: Client) {
ELSE 1 ELSE 1
END, END,
started_at started_at
LIMIT 3; LIMIT 10;
`; `;
const res = await client.queryObject<SnapshotScheduleType>(query, []); const res = await client.queryObject<SnapshotScheduleType>(query, []);
return res.rows; return res.rows;

View File

@ -1,4 +1,5 @@
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts";
export async function getNotCollectedSongs(client: Client) { export async function getNotCollectedSongs(client: Client) {
const queryResult = await client.queryObject<{ aid: number }>(` const queryResult = await client.queryObject<{ aid: number }>(`
@ -27,3 +28,18 @@ export async function aidExistsInSongs(client: Client, aid: number) {
); );
return queryResult.rows[0].exists; return queryResult.rows[0].exists;
} }
export async function getSongsPublihsedAt(client: Client, aid: number) {
const queryResult = await client.queryObject<{ published_at: string }>(
`
SELECT published_at
FROM songs
WHERE aid = $1;
`,
[aid],
);
if (queryResult.rows.length === 0) {
return null;
}
return parseTimestampFromPsql(queryResult.rows[0].published_at);
}

View File

@ -7,6 +7,8 @@ import logger from "lib/log/logger.ts";
import { lockManager } from "lib/mq/lockManager.ts"; import { lockManager } from "lib/mq/lockManager.ts";
import { aidExistsInSongs } from "lib/db/songs.ts"; import { aidExistsInSongs } from "lib/db/songs.ts";
import { insertIntoSongs } from "lib/mq/task/collectSongs.ts"; import { insertIntoSongs } from "lib/mq/task/collectSongs.ts";
import { scheduleSnapshot } from "lib/db/snapshotSchedule.ts";
import { MINUTE } from "$std/datetime/constants.ts";
export const classifyVideoWorker = async (job: Job) => { export const classifyVideoWorker = async (job: Job) => {
const client = await db.connect(); const client = await db.connect();
@ -27,6 +29,7 @@ export const classifyVideoWorker = async (job: Job) => {
const exists = await aidExistsInSongs(client, aid); const exists = await aidExistsInSongs(client, aid);
if (!exists && label !== 0) { if (!exists && label !== 0) {
await scheduleSnapshot(client, aid, "new", Date.now() + 10 * MINUTE, true);
await insertIntoSongs(client, aid); await insertIntoSongs(client, aid);
} }

View File

@ -3,6 +3,7 @@ import { db } from "lib/db/init.ts";
import { getLatestVideoSnapshot, getVideosNearMilestone } from "lib/db/snapshot.ts"; import { getLatestVideoSnapshot, getVideosNearMilestone } from "lib/db/snapshot.ts";
import { import {
findClosestSnapshot, findClosestSnapshot,
findSnapshotBefore,
getLatestSnapshot, getLatestSnapshot,
getSnapshotsInNextSecond, getSnapshotsInNextSecond,
getVideosWithoutActiveSnapshotSchedule, getVideosWithoutActiveSnapshotSchedule,
@ -18,9 +19,10 @@ import logger from "lib/log/logger.ts";
import { SnapshotQueue } from "lib/mq/index.ts"; import { SnapshotQueue } from "lib/mq/index.ts";
import { insertVideoSnapshot } from "lib/mq/task/getVideoStats.ts"; import { insertVideoSnapshot } from "lib/mq/task/getVideoStats.ts";
import { NetSchedulerError } from "lib/mq/scheduler.ts"; import { NetSchedulerError } from "lib/mq/scheduler.ts";
import { setBiliVideoStatus } from "lib/db/allData.ts"; import { getBiliVideoStatus, setBiliVideoStatus } from "lib/db/allData.ts";
import { truncate } from "lib/utils/truncate.ts"; import { truncate } from "lib/utils/truncate.ts";
import { lockManager } from "lib/mq/lockManager.ts"; import { lockManager } from "lib/mq/lockManager.ts";
import { getSongsPublihsedAt } from "lib/db/songs.ts";
const priorityMap: { [key: string]: number } = { const priorityMap: { [key: string]: number } = {
"milestone": 1, "milestone": 1,
@ -30,6 +32,7 @@ const priorityMap: { [key: string]: number } = {
const snapshotTypeToTaskMap: { [key: string]: string } = { const snapshotTypeToTaskMap: { [key: string]: string } = {
"milestone": "snapshotMilestoneVideo", "milestone": "snapshotMilestoneVideo",
"normal": "snapshotVideo", "normal": "snapshotVideo",
"new": "snapshotMilestoneVideo",
}; };
export const snapshotTickWorker = async (_job: Job) => { export const snapshotTickWorker = async (_job: Job) => {
@ -37,11 +40,15 @@ export const snapshotTickWorker = async (_job: Job) => {
try { try {
const schedules = await getSnapshotsInNextSecond(client); const schedules = await getSnapshotsInNextSecond(client);
for (const schedule of schedules) { for (const schedule of schedules) {
if (await videoHasProcessingSchedule(client, schedule.aid)) {
return `ALREADY_PROCESSING`;
}
let priority = 3; let priority = 3;
if (schedule.type && priorityMap[schedule.type]) { if (schedule.type && priorityMap[schedule.type]) {
priority = priorityMap[schedule.type]; priority = priorityMap[schedule.type];
} }
const aid = Number(schedule.aid); const aid = Number(schedule.aid);
await setSnapshotStatus(client, schedule.id, "processing");
await SnapshotQueue.add("snapshotVideo", { await SnapshotQueue.add("snapshotVideo", {
aid: aid, aid: aid,
id: Number(schedule.id), id: Number(schedule.id),
@ -130,11 +137,30 @@ export const collectMilestoneSnapshotsWorker = async (_job: Job) => {
} }
}; };
const getRegularSnapshotInterval = async (client: Client, aid: number) => {
const now = Date.now();
const date = new Date(now - 24 * HOUR);
const oldSnapshot = await findSnapshotBefore(client, aid, date);
const latestSnapshot = await getLatestSnapshot(client, aid);
if (!oldSnapshot || !latestSnapshot) return 0;
if (oldSnapshot.created_at === latestSnapshot.created_at) return 0;
const hoursDiff = (latestSnapshot.created_at - oldSnapshot.created_at) / HOUR;
if (hoursDiff < 8) return 24;
const viewsDiff = latestSnapshot.views - oldSnapshot.views;
if (viewsDiff === 0) return 72;
const speedPerDay = viewsDiff / hoursDiff * 24;
if (speedPerDay < 6) return 36;
if (speedPerDay < 120) return 24;
if (speedPerDay < 320) return 12;
return 6;
};
export const regularSnapshotsWorker = async (_job: Job) => { export const regularSnapshotsWorker = async (_job: Job) => {
const client = await db.connect(); const client = await db.connect();
const startedAt = Date.now(); const startedAt = Date.now();
if (await lockManager.isLocked("dispatchRegularSnapshots")) { if (await lockManager.isLocked("dispatchRegularSnapshots")) {
logger.log("dispatchRegularSnapshots is already running", "mq"); logger.log("dispatchRegularSnapshots is already running", "mq");
client.release();
return; return;
} }
await lockManager.acquireLock("dispatchRegularSnapshots", 30 * 60); await lockManager.acquireLock("dispatchRegularSnapshots", 30 * 60);
@ -145,7 +171,9 @@ export const regularSnapshotsWorker = async (_job: Job) => {
const latestSnapshot = await getLatestVideoSnapshot(client, aid); const latestSnapshot = await getLatestVideoSnapshot(client, aid);
const now = Date.now(); const now = Date.now();
const lastSnapshotedAt = latestSnapshot?.time ?? now; const lastSnapshotedAt = latestSnapshot?.time ?? now;
const targetTime = truncate(lastSnapshotedAt + 24 * HOUR, now + 1, now + 100000 * WEEK); const interval = await getRegularSnapshotInterval(client, aid);
logger.log(`Schedule regular snapshot for aid ${aid} in ${interval} hours.`, "mq")
const targetTime = truncate(lastSnapshotedAt + interval * HOUR, now + 1, now + 100000 * WEEK);
await scheduleSnapshot(client, aid, "normal", targetTime); await scheduleSnapshot(client, aid, "normal", targetTime);
if (now - startedAt > 25 * MINUTE) { if (now - startedAt > 25 * MINUTE) {
return; return;
@ -168,23 +196,49 @@ export const takeSnapshotForVideoWorker = async (job: Job) => {
const retryInterval = type === "milestone" ? 5 * SECOND : 2 * MINUTE; const retryInterval = type === "milestone" ? 5 * SECOND : 2 * MINUTE;
const exists = await snapshotScheduleExists(client, id); const exists = await snapshotScheduleExists(client, id);
if (!exists) { if (!exists) {
client.release();
return; return;
} }
try { const status = await getBiliVideoStatus(client, aid);
if (await videoHasProcessingSchedule(client, aid)) { if (status !== 0) {
return `ALREADY_PROCESSING`; client.release();
return `REFUSE_WORKING_BILI_STATUS_${status}`;
} }
try {
await setSnapshotStatus(client, id, "processing"); await setSnapshotStatus(client, id, "processing");
const stat = await insertVideoSnapshot(client, aid, task); const stat = await insertVideoSnapshot(client, aid, task);
if (typeof stat === "number") { if (typeof stat === "number") {
await setBiliVideoStatus(client, aid, stat); await setBiliVideoStatus(client, aid, stat);
await setSnapshotStatus(client, id, "completed"); await setSnapshotStatus(client, id, "completed");
return `BILI_STATUS_${stat}`; return `GET_BILI_STATUS_${stat}`;
} }
await setSnapshotStatus(client, id, "completed"); await setSnapshotStatus(client, id, "completed");
if (type === "normal") { if (type === "normal") {
await scheduleSnapshot(client, aid, type, Date.now() + 24 * HOUR); const interval = await getRegularSnapshotInterval(client, aid);
logger.log(`Schedule regular snapshot for aid ${aid} in ${interval} hours.`, "mq")
await scheduleSnapshot(client, aid, type, Date.now() + interval * HOUR);
return `DONE`; return `DONE`;
} else if (type === "new") {
const publihsedAt = await getSongsPublihsedAt(client, aid);
const timeSincePublished = stat.time - publihsedAt!;
const viewsPerHour = stat.views / timeSincePublished * HOUR;
if (timeSincePublished > 48 * HOUR) {
return `DONE`;
}
if (timeSincePublished > 2 * HOUR && viewsPerHour < 10) {
return `DONE`;
}
let intervalMins = 240;
if (viewsPerHour > 50) {
intervalMins = 120;
}
if (viewsPerHour > 100) {
intervalMins = 60;
}
if (viewsPerHour > 1000) {
intervalMins = 15;
}
await scheduleSnapshot(client, aid, type, Date.now() + intervalMins * MINUTE, true);
} }
if (type !== "milestone") return `DONE`; if (type !== "milestone") return `DONE`;
const eta = await getAdjustedShortTermETA(client, aid); const eta = await getAdjustedShortTermETA(client, aid);
@ -210,3 +264,29 @@ export const takeSnapshotForVideoWorker = async (job: Job) => {
client.release(); client.release();
} }
}; };
export const scheduleCleanupWorker = async (_job: Job) => {
const client = await db.connect();
try {
const query = `
SELECT id, aid, type
FROM snapshot_schedule
WHERE status IN ('pending', 'processing')
AND started_at < NOW() - INTERVAL '5 minutes'
`;
const { rows } = await client.queryObject<{ id: bigint; aid: bigint; type: string }>(query);
if (rows.length === 0) return;
for (const row of rows) {
const id = Number(row.id);
const aid = Number(row.aid);
const type = row.type;
await setSnapshotStatus(client, id, "timeout");
await scheduleSnapshot(client, aid, type, Date.now() + 10 * SECOND);
logger.log(`Schedule ${id} has no response received for 5 minutes, rescheduled.`, "mq", "fn:scheduleCleanupWorker")
}
} catch (e) {
logger.error(e as Error, "mq", "fn:scheduleCleanupWorker");
} finally {
client.release();
}
};

View File

@ -1,20 +1,30 @@
import { MINUTE, SECOND } from "$std/datetime/constants.ts"; import { MINUTE, SECOND } from "$std/datetime/constants.ts";
import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "lib/mq/index.ts"; import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "lib/mq/index.ts";
import logger from "lib/log/logger.ts"; import logger from "lib/log/logger.ts";
import { initSnapshotWindowCounts } from "lib/db/snapshotSchedule.ts";
import { db } from "lib/db/init.ts";
import { redis } from "lib/db/redis.ts";
export async function initMQ() { export async function initMQ() {
const client = await db.connect();
try {
await initSnapshotWindowCounts(client, redis);
await LatestVideosQueue.upsertJobScheduler("getLatestVideos", { await LatestVideosQueue.upsertJobScheduler("getLatestVideos", {
every: 1 * MINUTE, every: 1 * MINUTE,
immediately: true, immediately: true,
}); });
await ClassifyVideoQueue.upsertJobScheduler("classifyVideos", { await ClassifyVideoQueue.upsertJobScheduler("classifyVideos", {
every: 5 * MINUTE, every: 5 * MINUTE,
immediately: true, immediately: true,
}); });
await LatestVideosQueue.upsertJobScheduler("collectSongs", { await LatestVideosQueue.upsertJobScheduler("collectSongs", {
every: 3 * MINUTE, every: 3 * MINUTE,
immediately: true, immediately: true,
}); });
await SnapshotQueue.upsertJobScheduler("snapshotTick", { await SnapshotQueue.upsertJobScheduler("snapshotTick", {
every: 1 * SECOND, every: 1 * SECOND,
immediately: true, immediately: true,
@ -35,7 +45,13 @@ export async function initMQ() {
immediately: true, immediately: true,
}); });
await SnapshotQueue.removeJobScheduler("scheduleSnapshotTick"); await SnapshotQueue.upsertJobScheduler("scheduleCleanup", {
every: 30 * MINUTE,
immediately: true,
});
logger.log("Message queue initialized."); logger.log("Message queue initialized.");
} finally {
client.release();
}
} }

View File

@ -333,6 +333,12 @@ const biliLimiterConfig: RateLimiterConfig[] = [
}, },
]; ];
const bili_test = biliLimiterConfig;
bili_test[0].max = 10;
bili_test[1].max = 36;
bili_test[2].max = 150;
bili_test[3].max = 1000;
/* /*
Execution order for setup: Execution order for setup:
@ -364,7 +370,7 @@ for (const region of regions) {
netScheduler.addTask("getVideoInfo", "bilibili", "all"); netScheduler.addTask("getVideoInfo", "bilibili", "all");
netScheduler.addTask("getLatestVideos", "bilibili", "all"); netScheduler.addTask("getLatestVideos", "bilibili", "all");
netScheduler.addTask("snapshotMilestoneVideo", "bilibili", regions.map((region) => `alicloud-${region}`)); netScheduler.addTask("snapshotMilestoneVideo", "bilibili", regions.map((region) => `alicloud-${region}`));
netScheduler.addTask("snapshotVideo", "bilibili", [ netScheduler.addTask("snapshotVideo", "bili_test", [
"alicloud-qingdao", "alicloud-qingdao",
"alicloud-shanghai", "alicloud-shanghai",
"alicloud-zhangjiakou", "alicloud-zhangjiakou",
@ -375,7 +381,8 @@ netScheduler.addTask("snapshotVideo", "bilibili", [
netScheduler.setTaskLimiter("getVideoInfo", videoInfoRateLimiterConfig); netScheduler.setTaskLimiter("getVideoInfo", videoInfoRateLimiterConfig);
netScheduler.setTaskLimiter("getLatestVideos", null); netScheduler.setTaskLimiter("getLatestVideos", null);
netScheduler.setTaskLimiter("snapshotMilestoneVideo", null); netScheduler.setTaskLimiter("snapshotMilestoneVideo", null);
netScheduler.setTaskLimiter("snapshotVideo", videoInfoRateLimiterConfig); netScheduler.setTaskLimiter("snapshotVideo", null);
netScheduler.setProviderLimiter("bilibili", biliLimiterConfig); netScheduler.setProviderLimiter("bilibili", biliLimiterConfig);
netScheduler.setProviderLimiter("bili_test", bili_test);
export default netScheduler; export default netScheduler;

View File

@ -1,6 +1,8 @@
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { aidExistsInSongs, getNotCollectedSongs } from "lib/db/songs.ts"; import { aidExistsInSongs, getNotCollectedSongs } from "lib/db/songs.ts";
import logger from "lib/log/logger.ts"; import logger from "lib/log/logger.ts";
import { scheduleSnapshot } from "lib/db/snapshotSchedule.ts";
import { MINUTE } from "$std/datetime/constants.ts";
export async function collectSongs(client: Client) { export async function collectSongs(client: Client) {
const aids = await getNotCollectedSongs(client); const aids = await getNotCollectedSongs(client);
@ -8,6 +10,7 @@ export async function collectSongs(client: Client) {
const exists = await aidExistsInSongs(client, aid); const exists = await aidExistsInSongs(client, aid);
if (exists) continue; if (exists) continue;
await insertIntoSongs(client, aid); await insertIntoSongs(client, aid);
await scheduleSnapshot(client, aid, "new", Date.now() + 10 * MINUTE, true);
logger.log(`Video ${aid} was added into the songs table.`, "mq", "fn:collectSongs"); logger.log(`Video ${aid} was added into the songs table.`, "mq", "fn:collectSongs");
} }
} }

18
lib/net/bilibili.d.ts vendored
View File

@ -9,6 +9,24 @@ export type VideoListResponse = BaseResponse<VideoListData>;
export type VideoDetailsResponse = BaseResponse<VideoDetailsData>; export type VideoDetailsResponse = BaseResponse<VideoDetailsData>;
export type VideoTagsResponse = BaseResponse<VideoTagsData>; export type VideoTagsResponse = BaseResponse<VideoTagsData>;
export type VideoInfoResponse = BaseResponse<VideoInfoData>; export type VideoInfoResponse = BaseResponse<VideoInfoData>;
export type MediaListInfoResponse = BaseResponse<MediaListInfoData>;
type MediaListInfoData = MediaListInfoItem[];
interface MediaListInfoItem {
bvid: string;
id: number;
cnt_info: {
coin: number;
collect: number;
danmaku: number;
play: number;
reply: number;
share: number;
thumb_up: number;
}
}
interface VideoInfoData { interface VideoInfoData {
bvid: string; bvid: string;

View File

@ -10,6 +10,7 @@ import {
regularSnapshotsWorker, regularSnapshotsWorker,
snapshotTickWorker, snapshotTickWorker,
takeSnapshotForVideoWorker, takeSnapshotForVideoWorker,
scheduleCleanupWorker
} from "lib/mq/exec/snapshotTick.ts"; } from "lib/mq/exec/snapshotTick.ts";
Deno.addSignalListener("SIGINT", async () => { Deno.addSignalListener("SIGINT", async () => {
@ -80,11 +81,14 @@ const snapshotWorker = new Worker(
case "dispatchRegularSnapshots": case "dispatchRegularSnapshots":
await regularSnapshotsWorker(job); await regularSnapshotsWorker(job);
break; break;
case "scheduleCleanup":
await scheduleCleanupWorker(job);
break;
default: default:
break; break;
} }
}, },
{ connection: redis as ConnectionOptions, concurrency: 10, removeOnComplete: { count: 2000 } }, { connection: redis as ConnectionOptions, concurrency: 50, removeOnComplete: { count: 2000 } },
); );
snapshotWorker.on("error", (err) => { snapshotWorker.on("error", (err) => {
@ -94,4 +98,4 @@ snapshotWorker.on("error", (err) => {
snapshotWorker.on("closed", async () => { snapshotWorker.on("closed", async () => {
await lockManager.releaseLock("dispatchRegularSnapshots"); await lockManager.releaseLock("dispatchRegularSnapshots");
}) });

View File

@ -1,18 +0,0 @@
import { assertEquals, assertInstanceOf, assertNotEquals } from "@std/assert";
import { findClosestSnapshot } from "lib/db/snapshotSchedule.ts";
import { postgresConfig } from "lib/db/pgConfig.ts";
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
Deno.test("Snapshot Schedule - getShortTermTimeFeaturesForVideo", async () => {
const client = new Client(postgresConfig);
try {
const result = await findClosestSnapshot(client, 247308539, new Date(1741983383000));
assertNotEquals(result, null);
const created_at = result!.created_at;
const views = result!.views;
assertInstanceOf(created_at, Date);
assertEquals(typeof views, "number");
} finally {
client.end();
}
});