Compare commits
No commits in common. "d44ba8a0aee3f93802905a3bbb1dada481349fdc" and "0455abce2e9916cc1ad0d3375b3894b789c13623" have entirely different histories.
d44ba8a0ae
...
0455abce2e
@ -12,7 +12,7 @@
|
|||||||
"update": "deno run -A -r https://fresh.deno.dev/update .",
|
"update": "deno run -A -r https://fresh.deno.dev/update .",
|
||||||
"worker:main": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write --allow-run ./src/worker.ts",
|
"worker:main": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write --allow-run ./src/worker.ts",
|
||||||
"worker:filter": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/filterWorker.ts",
|
"worker:filter": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/filterWorker.ts",
|
||||||
"adder": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts",
|
"adder": "deno run --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts",
|
||||||
"bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts",
|
"bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts",
|
||||||
"all": "concurrently 'deno task worker:main' 'deno task adder' 'deno task bullui' 'deno task worker:filter'",
|
"all": "concurrently 'deno task worker:main' 'deno task adder' 'deno task bullui' 'deno task worker:filter'",
|
||||||
"test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run"
|
"test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run"
|
||||||
|
@ -75,13 +75,3 @@ export async function setBiliVideoStatus(client: Client, aid: number, status: nu
|
|||||||
[status, aid],
|
[status, aid],
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function getBiliVideoStatus(client: Client, aid: number) {
|
|
||||||
const queryResult = await client.queryObject<{ status: number }>(
|
|
||||||
`SELECT status FROM bilibili_metadata WHERE aid = $1`,
|
|
||||||
[aid],
|
|
||||||
);
|
|
||||||
const rows = queryResult.rows;
|
|
||||||
if (rows.length === 0) return 0;
|
|
||||||
return rows[0].status;
|
|
||||||
}
|
|
||||||
|
@ -7,7 +7,6 @@ export async function getVideosNearMilestone(client: Client) {
|
|||||||
FROM latest_video_snapshot ls
|
FROM latest_video_snapshot ls
|
||||||
INNER JOIN
|
INNER JOIN
|
||||||
songs s ON ls.aid = s.aid
|
songs s ON ls.aid = s.aid
|
||||||
AND s.deleted = false
|
|
||||||
WHERE
|
WHERE
|
||||||
s.deleted = false AND
|
s.deleted = false AND
|
||||||
(views >= 90000 AND views < 100000) OR
|
(views >= 90000 AND views < 100000) OR
|
||||||
|
@ -3,62 +3,6 @@ import { formatTimestampToPsql } from "lib/utils/formatTimestampToPostgre.ts";
|
|||||||
import { SnapshotScheduleType } from "./schema.d.ts";
|
import { SnapshotScheduleType } from "./schema.d.ts";
|
||||||
import logger from "lib/log/logger.ts";
|
import logger from "lib/log/logger.ts";
|
||||||
import { MINUTE } from "$std/datetime/constants.ts";
|
import { MINUTE } from "$std/datetime/constants.ts";
|
||||||
import { redis } from "lib/db/redis.ts";
|
|
||||||
import { Redis } from "ioredis";
|
|
||||||
|
|
||||||
const WINDOW_SIZE = 2880;
|
|
||||||
const REDIS_KEY = "cvsa:snapshot_window_counts";
|
|
||||||
|
|
||||||
let lastAvailableWindow: { offset: number; count: number } | null = null;
|
|
||||||
|
|
||||||
function getCurrentWindowIndex(): number {
|
|
||||||
const now = new Date();
|
|
||||||
const minutesSinceMidnight = now.getHours() * 60 + now.getMinutes();
|
|
||||||
const currentWindow = Math.floor(minutesSinceMidnight / 5);
|
|
||||||
return currentWindow;
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function refreshSnapshotWindowCounts(client: Client, redisClient: Redis) {
|
|
||||||
const now = new Date();
|
|
||||||
const startTime = now.getTime();
|
|
||||||
|
|
||||||
const result = await client.queryObject<{ window_start: Date; count: number }>`
|
|
||||||
SELECT
|
|
||||||
date_trunc('hour', started_at) +
|
|
||||||
(EXTRACT(minute FROM started_at)::int / 5 * INTERVAL '5 minutes') AS window_start,
|
|
||||||
COUNT(*) AS count
|
|
||||||
FROM snapshot_schedule
|
|
||||||
WHERE started_at >= NOW() AND status = 'pending' AND started_at <= NOW() + INTERVAL '10 days'
|
|
||||||
GROUP BY 1
|
|
||||||
ORDER BY window_start
|
|
||||||
`
|
|
||||||
|
|
||||||
await redisClient.del(REDIS_KEY);
|
|
||||||
|
|
||||||
const currentWindow = getCurrentWindowIndex();
|
|
||||||
|
|
||||||
for (const row of result.rows) {
|
|
||||||
const targetOffset = Math.floor((row.window_start.getTime() - startTime) / (5 * MINUTE));
|
|
||||||
const offset = (currentWindow + targetOffset);
|
|
||||||
if (offset >= 0 && offset < WINDOW_SIZE) {
|
|
||||||
await redisClient.hset(REDIS_KEY, offset.toString(), Number(row.count));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
lastAvailableWindow = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function initSnapshotWindowCounts(client: Client, redisClient: Redis) {
|
|
||||||
await refreshSnapshotWindowCounts(client, redisClient);
|
|
||||||
setInterval(async () => {
|
|
||||||
await refreshSnapshotWindowCounts(client, redisClient);
|
|
||||||
}, 5 * MINUTE);
|
|
||||||
}
|
|
||||||
|
|
||||||
async function getWindowCount(redisClient: Redis, offset: number): Promise<number> {
|
|
||||||
const count = await redisClient.hget(REDIS_KEY, offset.toString());
|
|
||||||
return count ? parseInt(count, 10) : 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function snapshotScheduleExists(client: Client, id: number) {
|
export async function snapshotScheduleExists(client: Client, id: number) {
|
||||||
const res = await client.queryObject<{ id: number }>(
|
const res = await client.queryObject<{ id: number }>(
|
||||||
@ -68,6 +12,9 @@ export async function snapshotScheduleExists(client: Client, id: number) {
|
|||||||
return res.rows.length > 0;
|
return res.rows.length > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
Returns true if the specified `aid` has at least one record with "pending" or "processing" status.
|
||||||
|
*/
|
||||||
export async function videoHasActiveSchedule(client: Client, aid: number) {
|
export async function videoHasActiveSchedule(client: Client, aid: number) {
|
||||||
const res = await client.queryObject<{ status: string }>(
|
const res = await client.queryObject<{ status: string }>(
|
||||||
`SELECT status FROM snapshot_schedule WHERE aid = $1 AND (status = 'pending' OR status = 'processing')`,
|
`SELECT status FROM snapshot_schedule WHERE aid = $1 AND (status = 'pending' OR status = 'processing')`,
|
||||||
@ -113,31 +60,6 @@ export async function findClosestSnapshot(
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function findSnapshotBefore(
|
|
||||||
client: Client,
|
|
||||||
aid: number,
|
|
||||||
targetTime: Date,
|
|
||||||
): Promise<Snapshot | null> {
|
|
||||||
const query = `
|
|
||||||
SELECT created_at, views
|
|
||||||
FROM video_snapshot
|
|
||||||
WHERE aid = $1
|
|
||||||
AND created_at <= $2::timestamptz
|
|
||||||
ORDER BY created_at DESC
|
|
||||||
LIMIT 1
|
|
||||||
`;
|
|
||||||
const result = await client.queryObject<{ created_at: string; views: number }>(
|
|
||||||
query,
|
|
||||||
[aid, targetTime.toISOString()],
|
|
||||||
);
|
|
||||||
if (result.rows.length === 0) return null;
|
|
||||||
const row = result.rows[0];
|
|
||||||
return {
|
|
||||||
created_at: new Date(row.created_at).getTime(),
|
|
||||||
views: row.views,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function hasAtLeast2Snapshots(client: Client, aid: number) {
|
export async function hasAtLeast2Snapshots(client: Client, aid: number) {
|
||||||
const res = await client.queryObject<{ count: number }>(
|
const res = await client.queryObject<{ count: number }>(
|
||||||
`SELECT COUNT(*) FROM video_snapshot WHERE aid = $1`,
|
`SELECT COUNT(*) FROM video_snapshot WHERE aid = $1`,
|
||||||
@ -183,12 +105,10 @@ export async function getSnapshotScheduleCountWithinRange(client: Client, start:
|
|||||||
* @param aid The aid of the video.
|
* @param aid The aid of the video.
|
||||||
* @param targetTime Scheduled time for snapshot. (Timestamp in milliseconds)
|
* @param targetTime Scheduled time for snapshot. (Timestamp in milliseconds)
|
||||||
*/
|
*/
|
||||||
export async function scheduleSnapshot(client: Client, aid: number, type: string, targetTime: number, force: boolean = false) {
|
export async function scheduleSnapshot(client: Client, aid: number, type: string, targetTime: number) {
|
||||||
if (await videoHasActiveSchedule(client, aid) && !force) return;
|
if (await videoHasActiveSchedule(client, aid)) return;
|
||||||
let adjustedTime = new Date(targetTime);
|
const allowedCount = type === "milestone" ? 2000 : 800;
|
||||||
if (type !== "milestone" && type !== "new") {
|
const adjustedTime = await adjustSnapshotTime(client, new Date(targetTime), allowedCount);
|
||||||
adjustedTime = await adjustSnapshotTime(new Date(targetTime), 1000, redis);
|
|
||||||
}
|
|
||||||
logger.log(`Scheduled snapshot for ${aid} at ${adjustedTime.toISOString()}`, "mq", "fn:scheduleSnapshot");
|
logger.log(`Scheduled snapshot for ${aid} at ${adjustedTime.toISOString()}`, "mq", "fn:scheduleSnapshot");
|
||||||
return client.queryObject(
|
return client.queryObject(
|
||||||
`INSERT INTO snapshot_schedule (aid, type, started_at) VALUES ($1, $2, $3)`,
|
`INSERT INTO snapshot_schedule (aid, type, started_at) VALUES ($1, $2, $3)`,
|
||||||
@ -196,56 +116,76 @@ export async function scheduleSnapshot(client: Client, aid: number, type: string
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adjust the trigger time of the snapshot to ensure it does not exceed the frequency limit
|
||||||
|
* @param client PostgreSQL client
|
||||||
|
* @param expectedStartTime The expected snapshot time
|
||||||
|
* @param allowedCounts The number of snapshots allowed in a 5-minute window (default: 2000)
|
||||||
|
* @returns The adjusted actual snapshot time within the first available window
|
||||||
|
*/
|
||||||
export async function adjustSnapshotTime(
|
export async function adjustSnapshotTime(
|
||||||
|
client: Client,
|
||||||
expectedStartTime: Date,
|
expectedStartTime: Date,
|
||||||
allowedCounts: number = 1000,
|
allowedCounts: number = 2000,
|
||||||
redisClient: Redis,
|
|
||||||
): Promise<Date> {
|
): Promise<Date> {
|
||||||
const currentWindow = getCurrentWindowIndex();
|
// Query to find the closest available window by checking both past and future windows
|
||||||
const targetOffset = Math.floor((expectedStartTime.getTime() - Date.now()) / (5 * MINUTE)) - 6;
|
const findWindowQuery = `
|
||||||
|
WITH base AS (
|
||||||
|
SELECT
|
||||||
|
date_trunc('minute', $1::timestamp)
|
||||||
|
- (EXTRACT(minute FROM $1::timestamp)::int % 5 * INTERVAL '1 minute') AS base_time
|
||||||
|
),
|
||||||
|
offsets AS (
|
||||||
|
SELECT generate_series(-100, 100) AS "offset"
|
||||||
|
),
|
||||||
|
candidate_windows AS (
|
||||||
|
SELECT
|
||||||
|
(base.base_time + ("offset" * INTERVAL '5 minutes')) AS window_start,
|
||||||
|
ABS("offset") AS distance
|
||||||
|
FROM base
|
||||||
|
CROSS JOIN offsets
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
window_start
|
||||||
|
FROM
|
||||||
|
candidate_windows cw
|
||||||
|
LEFT JOIN
|
||||||
|
snapshot_schedule s
|
||||||
|
ON
|
||||||
|
s.started_at >= cw.window_start
|
||||||
|
AND s.started_at < cw.window_start + INTERVAL '5 minutes'
|
||||||
|
AND s.status = 'pending'
|
||||||
|
GROUP BY
|
||||||
|
cw.window_start, cw.distance
|
||||||
|
HAVING
|
||||||
|
COUNT(s.*) < $2
|
||||||
|
ORDER BY
|
||||||
|
cw.distance, cw.window_start
|
||||||
|
LIMIT 1;
|
||||||
|
`;
|
||||||
|
|
||||||
let initialOffset = currentWindow + Math.max(targetOffset, 0);
|
try {
|
||||||
|
// Execute query to find the first available window
|
||||||
|
const windowResult = await client.queryObject<{ window_start: Date }>(
|
||||||
|
findWindowQuery,
|
||||||
|
[expectedStartTime, allowedCounts],
|
||||||
|
);
|
||||||
|
|
||||||
if (lastAvailableWindow && lastAvailableWindow.count < allowedCounts) {
|
// If no available window found, return original time (may exceed limit)
|
||||||
initialOffset = Math.max(lastAvailableWindow.offset - 2, 0);
|
if (windowResult.rows.length === 0) {
|
||||||
}
|
return expectedStartTime;
|
||||||
|
|
||||||
let timePerIteration = 0;
|
|
||||||
const t = performance.now();
|
|
||||||
for (let i = initialOffset; i < WINDOW_SIZE; i++) {
|
|
||||||
const offset = i;
|
|
||||||
const count = await getWindowCount(redisClient, offset);
|
|
||||||
|
|
||||||
if (count < allowedCounts) {
|
|
||||||
const newCount = await redisClient.hincrby(REDIS_KEY, offset.toString(), 1);
|
|
||||||
lastAvailableWindow = { offset, count: newCount };
|
|
||||||
|
|
||||||
const startPoint = new Date();
|
|
||||||
startPoint.setHours(0, 0, 0, 0);
|
|
||||||
const startTime = startPoint.getTime();
|
|
||||||
const windowStart = startTime + offset * 5 * MINUTE;
|
|
||||||
const randomDelay = Math.floor(Math.random() * 5 * MINUTE);
|
|
||||||
const delayedDate = new Date(windowStart + randomDelay);
|
|
||||||
const now = new Date();
|
|
||||||
|
|
||||||
if (delayedDate.getTime() < now.getTime()) {
|
|
||||||
const elapsed = performance.now() - t;
|
|
||||||
timePerIteration = elapsed / (i+1);
|
|
||||||
logger.log(`${timePerIteration.toFixed(3)}ms * ${i+1}iterations`, "perf", "fn:adjustSnapshotTime");
|
|
||||||
return now;
|
|
||||||
}
|
|
||||||
const elapsed = performance.now() - t;
|
|
||||||
timePerIteration = elapsed / (i+1);
|
|
||||||
logger.log(`${timePerIteration.toFixed(3)}ms * ${i+1}iterations`, "perf", "fn:adjustSnapshotTime");
|
|
||||||
return delayedDate;
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
const elapsed = performance.now() - t;
|
|
||||||
timePerIteration = elapsed / WINDOW_SIZE;
|
|
||||||
logger.log(`${timePerIteration.toFixed(3)}ms * ${WINDOW_SIZE}iterations`, "perf", "fn:adjustSnapshotTime");
|
|
||||||
return expectedStartTime;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// Get the target window start time
|
||||||
|
const windowStart = windowResult.rows[0].window_start;
|
||||||
|
|
||||||
|
// Add random delay within the 5-minute window to distribute load
|
||||||
|
const randomDelay = Math.floor(Math.random() * 5 * MINUTE);
|
||||||
|
return new Date(windowStart.getTime() + randomDelay);
|
||||||
|
} catch {
|
||||||
|
return expectedStartTime; // Fallback to original time
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export async function getSnapshotsInNextSecond(client: Client) {
|
export async function getSnapshotsInNextSecond(client: Client) {
|
||||||
const query = `
|
const query = `
|
||||||
@ -258,7 +198,7 @@ export async function getSnapshotsInNextSecond(client: Client) {
|
|||||||
ELSE 1
|
ELSE 1
|
||||||
END,
|
END,
|
||||||
started_at
|
started_at
|
||||||
LIMIT 10;
|
LIMIT 3;
|
||||||
`;
|
`;
|
||||||
const res = await client.queryObject<SnapshotScheduleType>(query, []);
|
const res = await client.queryObject<SnapshotScheduleType>(query, []);
|
||||||
return res.rows;
|
return res.rows;
|
||||||
|
@ -1,5 +1,4 @@
|
|||||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts";
|
|
||||||
|
|
||||||
export async function getNotCollectedSongs(client: Client) {
|
export async function getNotCollectedSongs(client: Client) {
|
||||||
const queryResult = await client.queryObject<{ aid: number }>(`
|
const queryResult = await client.queryObject<{ aid: number }>(`
|
||||||
@ -23,23 +22,8 @@ export async function aidExistsInSongs(client: Client, aid: number) {
|
|||||||
FROM songs
|
FROM songs
|
||||||
WHERE aid = $1
|
WHERE aid = $1
|
||||||
);
|
);
|
||||||
`,
|
`,
|
||||||
[aid],
|
[aid],
|
||||||
);
|
);
|
||||||
return queryResult.rows[0].exists;
|
return queryResult.rows[0].exists;
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function getSongsPublihsedAt(client: Client, aid: number) {
|
|
||||||
const queryResult = await client.queryObject<{ published_at: string }>(
|
|
||||||
`
|
|
||||||
SELECT published_at
|
|
||||||
FROM songs
|
|
||||||
WHERE aid = $1;
|
|
||||||
`,
|
|
||||||
[aid],
|
|
||||||
);
|
|
||||||
if (queryResult.rows.length === 0) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
return parseTimestampFromPsql(queryResult.rows[0].published_at);
|
|
||||||
}
|
|
||||||
|
@ -7,8 +7,6 @@ import logger from "lib/log/logger.ts";
|
|||||||
import { lockManager } from "lib/mq/lockManager.ts";
|
import { lockManager } from "lib/mq/lockManager.ts";
|
||||||
import { aidExistsInSongs } from "lib/db/songs.ts";
|
import { aidExistsInSongs } from "lib/db/songs.ts";
|
||||||
import { insertIntoSongs } from "lib/mq/task/collectSongs.ts";
|
import { insertIntoSongs } from "lib/mq/task/collectSongs.ts";
|
||||||
import { scheduleSnapshot } from "lib/db/snapshotSchedule.ts";
|
|
||||||
import { MINUTE } from "$std/datetime/constants.ts";
|
|
||||||
|
|
||||||
export const classifyVideoWorker = async (job: Job) => {
|
export const classifyVideoWorker = async (job: Job) => {
|
||||||
const client = await db.connect();
|
const client = await db.connect();
|
||||||
@ -29,7 +27,6 @@ export const classifyVideoWorker = async (job: Job) => {
|
|||||||
|
|
||||||
const exists = await aidExistsInSongs(client, aid);
|
const exists = await aidExistsInSongs(client, aid);
|
||||||
if (!exists && label !== 0) {
|
if (!exists && label !== 0) {
|
||||||
await scheduleSnapshot(client, aid, "new", Date.now() + 10 * MINUTE, true);
|
|
||||||
await insertIntoSongs(client, aid);
|
await insertIntoSongs(client, aid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3,7 +3,6 @@ import { db } from "lib/db/init.ts";
|
|||||||
import { getLatestVideoSnapshot, getVideosNearMilestone } from "lib/db/snapshot.ts";
|
import { getLatestVideoSnapshot, getVideosNearMilestone } from "lib/db/snapshot.ts";
|
||||||
import {
|
import {
|
||||||
findClosestSnapshot,
|
findClosestSnapshot,
|
||||||
findSnapshotBefore,
|
|
||||||
getLatestSnapshot,
|
getLatestSnapshot,
|
||||||
getSnapshotsInNextSecond,
|
getSnapshotsInNextSecond,
|
||||||
getVideosWithoutActiveSnapshotSchedule,
|
getVideosWithoutActiveSnapshotSchedule,
|
||||||
@ -19,10 +18,9 @@ import logger from "lib/log/logger.ts";
|
|||||||
import { SnapshotQueue } from "lib/mq/index.ts";
|
import { SnapshotQueue } from "lib/mq/index.ts";
|
||||||
import { insertVideoSnapshot } from "lib/mq/task/getVideoStats.ts";
|
import { insertVideoSnapshot } from "lib/mq/task/getVideoStats.ts";
|
||||||
import { NetSchedulerError } from "lib/mq/scheduler.ts";
|
import { NetSchedulerError } from "lib/mq/scheduler.ts";
|
||||||
import { getBiliVideoStatus, setBiliVideoStatus } from "lib/db/allData.ts";
|
import { setBiliVideoStatus } from "lib/db/allData.ts";
|
||||||
import { truncate } from "lib/utils/truncate.ts";
|
import { truncate } from "lib/utils/truncate.ts";
|
||||||
import { lockManager } from "lib/mq/lockManager.ts";
|
import { lockManager } from "lib/mq/lockManager.ts";
|
||||||
import { getSongsPublihsedAt } from "lib/db/songs.ts";
|
|
||||||
|
|
||||||
const priorityMap: { [key: string]: number } = {
|
const priorityMap: { [key: string]: number } = {
|
||||||
"milestone": 1,
|
"milestone": 1,
|
||||||
@ -32,7 +30,6 @@ const priorityMap: { [key: string]: number } = {
|
|||||||
const snapshotTypeToTaskMap: { [key: string]: string } = {
|
const snapshotTypeToTaskMap: { [key: string]: string } = {
|
||||||
"milestone": "snapshotMilestoneVideo",
|
"milestone": "snapshotMilestoneVideo",
|
||||||
"normal": "snapshotVideo",
|
"normal": "snapshotVideo",
|
||||||
"new": "snapshotMilestoneVideo",
|
|
||||||
};
|
};
|
||||||
|
|
||||||
export const snapshotTickWorker = async (_job: Job) => {
|
export const snapshotTickWorker = async (_job: Job) => {
|
||||||
@ -40,15 +37,11 @@ export const snapshotTickWorker = async (_job: Job) => {
|
|||||||
try {
|
try {
|
||||||
const schedules = await getSnapshotsInNextSecond(client);
|
const schedules = await getSnapshotsInNextSecond(client);
|
||||||
for (const schedule of schedules) {
|
for (const schedule of schedules) {
|
||||||
if (await videoHasProcessingSchedule(client, schedule.aid)) {
|
|
||||||
return `ALREADY_PROCESSING`;
|
|
||||||
}
|
|
||||||
let priority = 3;
|
let priority = 3;
|
||||||
if (schedule.type && priorityMap[schedule.type]) {
|
if (schedule.type && priorityMap[schedule.type]) {
|
||||||
priority = priorityMap[schedule.type];
|
priority = priorityMap[schedule.type];
|
||||||
}
|
}
|
||||||
const aid = Number(schedule.aid);
|
const aid = Number(schedule.aid);
|
||||||
await setSnapshotStatus(client, schedule.id, "processing");
|
|
||||||
await SnapshotQueue.add("snapshotVideo", {
|
await SnapshotQueue.add("snapshotVideo", {
|
||||||
aid: aid,
|
aid: aid,
|
||||||
id: Number(schedule.id),
|
id: Number(schedule.id),
|
||||||
@ -137,30 +130,11 @@ export const collectMilestoneSnapshotsWorker = async (_job: Job) => {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
const getRegularSnapshotInterval = async (client: Client, aid: number) => {
|
|
||||||
const now = Date.now();
|
|
||||||
const date = new Date(now - 24 * HOUR);
|
|
||||||
const oldSnapshot = await findSnapshotBefore(client, aid, date);
|
|
||||||
const latestSnapshot = await getLatestSnapshot(client, aid);
|
|
||||||
if (!oldSnapshot || !latestSnapshot) return 0;
|
|
||||||
if (oldSnapshot.created_at === latestSnapshot.created_at) return 0;
|
|
||||||
const hoursDiff = (latestSnapshot.created_at - oldSnapshot.created_at) / HOUR;
|
|
||||||
if (hoursDiff < 8) return 24;
|
|
||||||
const viewsDiff = latestSnapshot.views - oldSnapshot.views;
|
|
||||||
if (viewsDiff === 0) return 72;
|
|
||||||
const speedPerDay = viewsDiff / hoursDiff * 24;
|
|
||||||
if (speedPerDay < 6) return 36;
|
|
||||||
if (speedPerDay < 120) return 24;
|
|
||||||
if (speedPerDay < 320) return 12;
|
|
||||||
return 6;
|
|
||||||
};
|
|
||||||
|
|
||||||
export const regularSnapshotsWorker = async (_job: Job) => {
|
export const regularSnapshotsWorker = async (_job: Job) => {
|
||||||
const client = await db.connect();
|
const client = await db.connect();
|
||||||
const startedAt = Date.now();
|
const startedAt = Date.now();
|
||||||
if (await lockManager.isLocked("dispatchRegularSnapshots")) {
|
if (await lockManager.isLocked("dispatchRegularSnapshots")) {
|
||||||
logger.log("dispatchRegularSnapshots is already running", "mq");
|
logger.log("dispatchRegularSnapshots is already running", "mq");
|
||||||
client.release();
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
await lockManager.acquireLock("dispatchRegularSnapshots", 30 * 60);
|
await lockManager.acquireLock("dispatchRegularSnapshots", 30 * 60);
|
||||||
@ -171,12 +145,10 @@ export const regularSnapshotsWorker = async (_job: Job) => {
|
|||||||
const latestSnapshot = await getLatestVideoSnapshot(client, aid);
|
const latestSnapshot = await getLatestVideoSnapshot(client, aid);
|
||||||
const now = Date.now();
|
const now = Date.now();
|
||||||
const lastSnapshotedAt = latestSnapshot?.time ?? now;
|
const lastSnapshotedAt = latestSnapshot?.time ?? now;
|
||||||
const interval = await getRegularSnapshotInterval(client, aid);
|
const targetTime = truncate(lastSnapshotedAt + 24 * HOUR, now + 1, now + 100000 * WEEK);
|
||||||
logger.log(`Schedule regular snapshot for aid ${aid} in ${interval} hours.`, "mq")
|
|
||||||
const targetTime = truncate(lastSnapshotedAt + interval * HOUR, now + 1, now + 100000 * WEEK);
|
|
||||||
await scheduleSnapshot(client, aid, "normal", targetTime);
|
await scheduleSnapshot(client, aid, "normal", targetTime);
|
||||||
if (now - startedAt > 25 * MINUTE) {
|
if (now - startedAt > 25 * MINUTE) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
@ -196,49 +168,23 @@ export const takeSnapshotForVideoWorker = async (job: Job) => {
|
|||||||
const retryInterval = type === "milestone" ? 5 * SECOND : 2 * MINUTE;
|
const retryInterval = type === "milestone" ? 5 * SECOND : 2 * MINUTE;
|
||||||
const exists = await snapshotScheduleExists(client, id);
|
const exists = await snapshotScheduleExists(client, id);
|
||||||
if (!exists) {
|
if (!exists) {
|
||||||
client.release();
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
const status = await getBiliVideoStatus(client, aid);
|
|
||||||
if (status !== 0) {
|
|
||||||
client.release();
|
|
||||||
return `REFUSE_WORKING_BILI_STATUS_${status}`;
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
|
if (await videoHasProcessingSchedule(client, aid)) {
|
||||||
|
return `ALREADY_PROCESSING`;
|
||||||
|
}
|
||||||
await setSnapshotStatus(client, id, "processing");
|
await setSnapshotStatus(client, id, "processing");
|
||||||
const stat = await insertVideoSnapshot(client, aid, task);
|
const stat = await insertVideoSnapshot(client, aid, task);
|
||||||
if (typeof stat === "number") {
|
if (typeof stat === "number") {
|
||||||
await setBiliVideoStatus(client, aid, stat);
|
await setBiliVideoStatus(client, aid, stat);
|
||||||
await setSnapshotStatus(client, id, "completed");
|
await setSnapshotStatus(client, id, "completed");
|
||||||
return `GET_BILI_STATUS_${stat}`;
|
return `BILI_STATUS_${stat}`;
|
||||||
}
|
}
|
||||||
await setSnapshotStatus(client, id, "completed");
|
await setSnapshotStatus(client, id, "completed");
|
||||||
if (type === "normal") {
|
if (type === "normal") {
|
||||||
const interval = await getRegularSnapshotInterval(client, aid);
|
await scheduleSnapshot(client, aid, type, Date.now() + 24 * HOUR);
|
||||||
logger.log(`Schedule regular snapshot for aid ${aid} in ${interval} hours.`, "mq")
|
|
||||||
await scheduleSnapshot(client, aid, type, Date.now() + interval * HOUR);
|
|
||||||
return `DONE`;
|
return `DONE`;
|
||||||
} else if (type === "new") {
|
|
||||||
const publihsedAt = await getSongsPublihsedAt(client, aid);
|
|
||||||
const timeSincePublished = stat.time - publihsedAt!;
|
|
||||||
const viewsPerHour = stat.views / timeSincePublished * HOUR;
|
|
||||||
if (timeSincePublished > 48 * HOUR) {
|
|
||||||
return `DONE`;
|
|
||||||
}
|
|
||||||
if (timeSincePublished > 2 * HOUR && viewsPerHour < 10) {
|
|
||||||
return `DONE`;
|
|
||||||
}
|
|
||||||
let intervalMins = 240;
|
|
||||||
if (viewsPerHour > 50) {
|
|
||||||
intervalMins = 120;
|
|
||||||
}
|
|
||||||
if (viewsPerHour > 100) {
|
|
||||||
intervalMins = 60;
|
|
||||||
}
|
|
||||||
if (viewsPerHour > 1000) {
|
|
||||||
intervalMins = 15;
|
|
||||||
}
|
|
||||||
await scheduleSnapshot(client, aid, type, Date.now() + intervalMins * MINUTE, true);
|
|
||||||
}
|
}
|
||||||
if (type !== "milestone") return `DONE`;
|
if (type !== "milestone") return `DONE`;
|
||||||
const eta = await getAdjustedShortTermETA(client, aid);
|
const eta = await getAdjustedShortTermETA(client, aid);
|
||||||
@ -264,29 +210,3 @@ export const takeSnapshotForVideoWorker = async (job: Job) => {
|
|||||||
client.release();
|
client.release();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
export const scheduleCleanupWorker = async (_job: Job) => {
|
|
||||||
const client = await db.connect();
|
|
||||||
try {
|
|
||||||
const query = `
|
|
||||||
SELECT id, aid, type
|
|
||||||
FROM snapshot_schedule
|
|
||||||
WHERE status IN ('pending', 'processing')
|
|
||||||
AND started_at < NOW() - INTERVAL '5 minutes'
|
|
||||||
`;
|
|
||||||
const { rows } = await client.queryObject<{ id: bigint; aid: bigint; type: string }>(query);
|
|
||||||
if (rows.length === 0) return;
|
|
||||||
for (const row of rows) {
|
|
||||||
const id = Number(row.id);
|
|
||||||
const aid = Number(row.aid);
|
|
||||||
const type = row.type;
|
|
||||||
await setSnapshotStatus(client, id, "timeout");
|
|
||||||
await scheduleSnapshot(client, aid, type, Date.now() + 10 * SECOND);
|
|
||||||
logger.log(`Schedule ${id} has no response received for 5 minutes, rescheduled.`, "mq", "fn:scheduleCleanupWorker")
|
|
||||||
}
|
|
||||||
} catch (e) {
|
|
||||||
logger.error(e as Error, "mq", "fn:scheduleCleanupWorker");
|
|
||||||
} finally {
|
|
||||||
client.release();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
@ -1,57 +1,41 @@
|
|||||||
import { MINUTE, SECOND } from "$std/datetime/constants.ts";
|
import { MINUTE, SECOND } from "$std/datetime/constants.ts";
|
||||||
import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "lib/mq/index.ts";
|
import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "lib/mq/index.ts";
|
||||||
import logger from "lib/log/logger.ts";
|
import logger from "lib/log/logger.ts";
|
||||||
import { initSnapshotWindowCounts } from "lib/db/snapshotSchedule.ts";
|
|
||||||
import { db } from "lib/db/init.ts";
|
|
||||||
import { redis } from "lib/db/redis.ts";
|
|
||||||
|
|
||||||
export async function initMQ() {
|
export async function initMQ() {
|
||||||
const client = await db.connect();
|
await LatestVideosQueue.upsertJobScheduler("getLatestVideos", {
|
||||||
try {
|
every: 1 * MINUTE,
|
||||||
await initSnapshotWindowCounts(client, redis);
|
immediately: true,
|
||||||
|
});
|
||||||
|
await ClassifyVideoQueue.upsertJobScheduler("classifyVideos", {
|
||||||
|
every: 5 * MINUTE,
|
||||||
|
immediately: true,
|
||||||
|
});
|
||||||
|
await LatestVideosQueue.upsertJobScheduler("collectSongs", {
|
||||||
|
every: 3 * MINUTE,
|
||||||
|
immediately: true,
|
||||||
|
});
|
||||||
|
await SnapshotQueue.upsertJobScheduler("snapshotTick", {
|
||||||
|
every: 1 * SECOND,
|
||||||
|
immediately: true,
|
||||||
|
}, {
|
||||||
|
opts: {
|
||||||
|
removeOnComplete: 1,
|
||||||
|
removeOnFail: 1,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
await LatestVideosQueue.upsertJobScheduler("getLatestVideos", {
|
await SnapshotQueue.upsertJobScheduler("collectMilestoneSnapshots", {
|
||||||
every: 1 * MINUTE,
|
every: 5 * MINUTE,
|
||||||
immediately: true,
|
immediately: true,
|
||||||
});
|
});
|
||||||
|
|
||||||
await ClassifyVideoQueue.upsertJobScheduler("classifyVideos", {
|
await SnapshotQueue.upsertJobScheduler("dispatchRegularSnapshots", {
|
||||||
every: 5 * MINUTE,
|
every: 30 * MINUTE,
|
||||||
immediately: true,
|
immediately: true,
|
||||||
});
|
});
|
||||||
|
|
||||||
await LatestVideosQueue.upsertJobScheduler("collectSongs", {
|
await SnapshotQueue.removeJobScheduler("scheduleSnapshotTick");
|
||||||
every: 3 * MINUTE,
|
|
||||||
immediately: true,
|
|
||||||
});
|
|
||||||
|
|
||||||
await SnapshotQueue.upsertJobScheduler("snapshotTick", {
|
logger.log("Message queue initialized.");
|
||||||
every: 1 * SECOND,
|
|
||||||
immediately: true,
|
|
||||||
}, {
|
|
||||||
opts: {
|
|
||||||
removeOnComplete: 1,
|
|
||||||
removeOnFail: 1,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
await SnapshotQueue.upsertJobScheduler("collectMilestoneSnapshots", {
|
|
||||||
every: 5 * MINUTE,
|
|
||||||
immediately: true,
|
|
||||||
});
|
|
||||||
|
|
||||||
await SnapshotQueue.upsertJobScheduler("dispatchRegularSnapshots", {
|
|
||||||
every: 30 * MINUTE,
|
|
||||||
immediately: true,
|
|
||||||
});
|
|
||||||
|
|
||||||
await SnapshotQueue.upsertJobScheduler("scheduleCleanup", {
|
|
||||||
every: 30 * MINUTE,
|
|
||||||
immediately: true,
|
|
||||||
});
|
|
||||||
|
|
||||||
logger.log("Message queue initialized.");
|
|
||||||
} finally {
|
|
||||||
client.release();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -333,12 +333,6 @@ const biliLimiterConfig: RateLimiterConfig[] = [
|
|||||||
},
|
},
|
||||||
];
|
];
|
||||||
|
|
||||||
const bili_test = biliLimiterConfig;
|
|
||||||
bili_test[0].max = 10;
|
|
||||||
bili_test[1].max = 36;
|
|
||||||
bili_test[2].max = 150;
|
|
||||||
bili_test[3].max = 1000;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Execution order for setup:
|
Execution order for setup:
|
||||||
|
|
||||||
@ -370,7 +364,7 @@ for (const region of regions) {
|
|||||||
netScheduler.addTask("getVideoInfo", "bilibili", "all");
|
netScheduler.addTask("getVideoInfo", "bilibili", "all");
|
||||||
netScheduler.addTask("getLatestVideos", "bilibili", "all");
|
netScheduler.addTask("getLatestVideos", "bilibili", "all");
|
||||||
netScheduler.addTask("snapshotMilestoneVideo", "bilibili", regions.map((region) => `alicloud-${region}`));
|
netScheduler.addTask("snapshotMilestoneVideo", "bilibili", regions.map((region) => `alicloud-${region}`));
|
||||||
netScheduler.addTask("snapshotVideo", "bili_test", [
|
netScheduler.addTask("snapshotVideo", "bilibili", [
|
||||||
"alicloud-qingdao",
|
"alicloud-qingdao",
|
||||||
"alicloud-shanghai",
|
"alicloud-shanghai",
|
||||||
"alicloud-zhangjiakou",
|
"alicloud-zhangjiakou",
|
||||||
@ -381,8 +375,7 @@ netScheduler.addTask("snapshotVideo", "bili_test", [
|
|||||||
netScheduler.setTaskLimiter("getVideoInfo", videoInfoRateLimiterConfig);
|
netScheduler.setTaskLimiter("getVideoInfo", videoInfoRateLimiterConfig);
|
||||||
netScheduler.setTaskLimiter("getLatestVideos", null);
|
netScheduler.setTaskLimiter("getLatestVideos", null);
|
||||||
netScheduler.setTaskLimiter("snapshotMilestoneVideo", null);
|
netScheduler.setTaskLimiter("snapshotMilestoneVideo", null);
|
||||||
netScheduler.setTaskLimiter("snapshotVideo", null);
|
netScheduler.setTaskLimiter("snapshotVideo", videoInfoRateLimiterConfig);
|
||||||
netScheduler.setProviderLimiter("bilibili", biliLimiterConfig);
|
netScheduler.setProviderLimiter("bilibili", biliLimiterConfig);
|
||||||
netScheduler.setProviderLimiter("bili_test", bili_test);
|
|
||||||
|
|
||||||
export default netScheduler;
|
export default netScheduler;
|
||||||
|
@ -1,8 +1,6 @@
|
|||||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
import { aidExistsInSongs, getNotCollectedSongs } from "lib/db/songs.ts";
|
import { aidExistsInSongs, getNotCollectedSongs } from "lib/db/songs.ts";
|
||||||
import logger from "lib/log/logger.ts";
|
import logger from "lib/log/logger.ts";
|
||||||
import { scheduleSnapshot } from "lib/db/snapshotSchedule.ts";
|
|
||||||
import { MINUTE } from "$std/datetime/constants.ts";
|
|
||||||
|
|
||||||
export async function collectSongs(client: Client) {
|
export async function collectSongs(client: Client) {
|
||||||
const aids = await getNotCollectedSongs(client);
|
const aids = await getNotCollectedSongs(client);
|
||||||
@ -10,7 +8,6 @@ export async function collectSongs(client: Client) {
|
|||||||
const exists = await aidExistsInSongs(client, aid);
|
const exists = await aidExistsInSongs(client, aid);
|
||||||
if (exists) continue;
|
if (exists) continue;
|
||||||
await insertIntoSongs(client, aid);
|
await insertIntoSongs(client, aid);
|
||||||
await scheduleSnapshot(client, aid, "new", Date.now() + 10 * MINUTE, true);
|
|
||||||
logger.log(`Video ${aid} was added into the songs table.`, "mq", "fn:collectSongs");
|
logger.log(`Video ${aid} was added into the songs table.`, "mq", "fn:collectSongs");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
18
lib/net/bilibili.d.ts
vendored
18
lib/net/bilibili.d.ts
vendored
@ -9,24 +9,6 @@ export type VideoListResponse = BaseResponse<VideoListData>;
|
|||||||
export type VideoDetailsResponse = BaseResponse<VideoDetailsData>;
|
export type VideoDetailsResponse = BaseResponse<VideoDetailsData>;
|
||||||
export type VideoTagsResponse = BaseResponse<VideoTagsData>;
|
export type VideoTagsResponse = BaseResponse<VideoTagsData>;
|
||||||
export type VideoInfoResponse = BaseResponse<VideoInfoData>;
|
export type VideoInfoResponse = BaseResponse<VideoInfoData>;
|
||||||
export type MediaListInfoResponse = BaseResponse<MediaListInfoData>;
|
|
||||||
|
|
||||||
type MediaListInfoData = MediaListInfoItem[];
|
|
||||||
|
|
||||||
|
|
||||||
interface MediaListInfoItem {
|
|
||||||
bvid: string;
|
|
||||||
id: number;
|
|
||||||
cnt_info: {
|
|
||||||
coin: number;
|
|
||||||
collect: number;
|
|
||||||
danmaku: number;
|
|
||||||
play: number;
|
|
||||||
reply: number;
|
|
||||||
share: number;
|
|
||||||
thumb_up: number;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
interface VideoInfoData {
|
interface VideoInfoData {
|
||||||
bvid: string;
|
bvid: string;
|
||||||
|
@ -10,7 +10,6 @@ import {
|
|||||||
regularSnapshotsWorker,
|
regularSnapshotsWorker,
|
||||||
snapshotTickWorker,
|
snapshotTickWorker,
|
||||||
takeSnapshotForVideoWorker,
|
takeSnapshotForVideoWorker,
|
||||||
scheduleCleanupWorker
|
|
||||||
} from "lib/mq/exec/snapshotTick.ts";
|
} from "lib/mq/exec/snapshotTick.ts";
|
||||||
|
|
||||||
Deno.addSignalListener("SIGINT", async () => {
|
Deno.addSignalListener("SIGINT", async () => {
|
||||||
@ -81,14 +80,11 @@ const snapshotWorker = new Worker(
|
|||||||
case "dispatchRegularSnapshots":
|
case "dispatchRegularSnapshots":
|
||||||
await regularSnapshotsWorker(job);
|
await regularSnapshotsWorker(job);
|
||||||
break;
|
break;
|
||||||
case "scheduleCleanup":
|
|
||||||
await scheduleCleanupWorker(job);
|
|
||||||
break;
|
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
{ connection: redis as ConnectionOptions, concurrency: 50, removeOnComplete: { count: 2000 } },
|
{ connection: redis as ConnectionOptions, concurrency: 10, removeOnComplete: { count: 2000 } },
|
||||||
);
|
);
|
||||||
|
|
||||||
snapshotWorker.on("error", (err) => {
|
snapshotWorker.on("error", (err) => {
|
||||||
@ -98,4 +94,4 @@ snapshotWorker.on("error", (err) => {
|
|||||||
|
|
||||||
snapshotWorker.on("closed", async () => {
|
snapshotWorker.on("closed", async () => {
|
||||||
await lockManager.releaseLock("dispatchRegularSnapshots");
|
await lockManager.releaseLock("dispatchRegularSnapshots");
|
||||||
});
|
})
|
||||||
|
18
test/db/snapshotSchedule.test.ts
Normal file
18
test/db/snapshotSchedule.test.ts
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
import { assertEquals, assertInstanceOf, assertNotEquals } from "@std/assert";
|
||||||
|
import { findClosestSnapshot } from "lib/db/snapshotSchedule.ts";
|
||||||
|
import { postgresConfig } from "lib/db/pgConfig.ts";
|
||||||
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
|
|
||||||
|
Deno.test("Snapshot Schedule - getShortTermTimeFeaturesForVideo", async () => {
|
||||||
|
const client = new Client(postgresConfig);
|
||||||
|
try {
|
||||||
|
const result = await findClosestSnapshot(client, 247308539, new Date(1741983383000));
|
||||||
|
assertNotEquals(result, null);
|
||||||
|
const created_at = result!.created_at;
|
||||||
|
const views = result!.views;
|
||||||
|
assertInstanceOf(created_at, Date);
|
||||||
|
assertEquals(typeof views, "number");
|
||||||
|
} finally {
|
||||||
|
client.end();
|
||||||
|
}
|
||||||
|
});
|
Loading…
Reference in New Issue
Block a user