Compare commits
4 Commits
b201bfd64d
...
7768a202b2
Author | SHA1 | Date | |
---|---|---|---|
7768a202b2 | |||
8652ac8fb7 | |||
18fc9752bb | |||
2c12310e8c |
@ -68,3 +68,10 @@ export async function getUnArchivedBiliUsers(client: Client) {
|
||||
const rows = queryResult.rows;
|
||||
return rows.map((row) => row.uid);
|
||||
}
|
||||
|
||||
export async function setBiliVideoStatus(client: Client, aid: number, status: number) {
|
||||
return await client.queryObject(
|
||||
`UPDATE bilibili_metadata SET status = $1 WHERE aid = $2`,
|
||||
[status, aid],
|
||||
);
|
||||
}
|
||||
|
12
lib/db/schema.d.ts
vendored
12
lib/db/schema.d.ts
vendored
@ -32,6 +32,18 @@ export interface VideoSnapshotType {
|
||||
replies: number;
|
||||
}
|
||||
|
||||
export interface LatestSnapshotType {
|
||||
aid: number;
|
||||
time: number;
|
||||
views: number;
|
||||
danmakus: number;
|
||||
replies: number;
|
||||
likes: number;
|
||||
coins: number;
|
||||
shares: number;
|
||||
favorites: number;
|
||||
}
|
||||
|
||||
export interface SnapshotScheduleType {
|
||||
id: number;
|
||||
aid: number;
|
||||
|
@ -1,35 +1,17 @@
|
||||
import { DAY, SECOND } from "$std/datetime/constants.ts";
|
||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||
import { VideoSnapshotType } from "lib/db/schema.d.ts";
|
||||
import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts";
|
||||
import { LatestSnapshotType } from "lib/db/schema.d.ts";
|
||||
|
||||
export async function getVideosNearMilestone(client: Client) {
|
||||
const queryResult = await client.queryObject<VideoSnapshotType>(`
|
||||
WITH filtered_snapshots AS (
|
||||
SELECT
|
||||
vs.*
|
||||
FROM
|
||||
video_snapshot vs
|
||||
WHERE
|
||||
(vs.views >= 90000 AND vs.views < 100000) OR
|
||||
(vs.views >= 900000 AND vs.views < 1000000)
|
||||
),
|
||||
ranked_snapshots AS (
|
||||
SELECT
|
||||
fs.*,
|
||||
ROW_NUMBER() OVER (PARTITION BY fs.aid ORDER BY fs.created_at DESC) as rn,
|
||||
MAX(fs.views) OVER (PARTITION BY fs.aid) as max_views_per_aid
|
||||
FROM
|
||||
filtered_snapshots fs
|
||||
INNER JOIN
|
||||
songs s ON fs.aid = s.aid
|
||||
)
|
||||
SELECT
|
||||
rs.id, rs.created_at, rs.views, rs.coins, rs.likes, rs.favorites, rs.shares, rs.danmakus, rs.aid, rs.replies
|
||||
FROM
|
||||
ranked_snapshots rs
|
||||
WHERE
|
||||
rs.rn = 1;
|
||||
const queryResult = await client.queryObject<LatestSnapshotType>(`
|
||||
SELECT ls.*
|
||||
FROM latest_video_snapshot ls
|
||||
INNER JOIN
|
||||
songs s ON ls.aid = s.aid
|
||||
WHERE
|
||||
s.deleted = false AND
|
||||
(views >= 90000 AND views < 100000) OR
|
||||
(views >= 900000 AND views < 1000000) OR
|
||||
(views >= 9900000 AND views < 10000000)
|
||||
`);
|
||||
return queryResult.rows.map((row) => {
|
||||
return {
|
||||
@ -39,160 +21,19 @@ export async function getVideosNearMilestone(client: Client) {
|
||||
});
|
||||
}
|
||||
|
||||
export async function getUnsnapshotedSongs(client: Client) {
|
||||
const queryResult = await client.queryObject<{ aid: bigint }>(`
|
||||
SELECT DISTINCT s.aid
|
||||
FROM songs s
|
||||
LEFT JOIN video_snapshot v ON s.aid = v.aid
|
||||
WHERE v.aid IS NULL;
|
||||
`);
|
||||
return queryResult.rows.map((row) => Number(row.aid));
|
||||
}
|
||||
|
||||
export async function getSongSnapshotCount(client: Client, aid: number) {
|
||||
const queryResult = await client.queryObject<{ count: number }>(
|
||||
`
|
||||
SELECT COUNT(*) AS count
|
||||
FROM video_snapshot
|
||||
WHERE aid = $1;
|
||||
`,
|
||||
[aid],
|
||||
);
|
||||
return queryResult.rows[0].count;
|
||||
}
|
||||
|
||||
export async function getShortTermEtaPrediction(client: Client, aid: number) {
|
||||
const queryResult = await client.queryObject<{ eta: number }>(
|
||||
`
|
||||
WITH old_snapshot AS (
|
||||
SELECT created_at, views
|
||||
FROM video_snapshot
|
||||
WHERE aid = $1 AND
|
||||
NOW() - created_at > '20 min'
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 1
|
||||
),
|
||||
new_snapshot AS (
|
||||
SELECT created_at, views
|
||||
FROM video_snapshot
|
||||
WHERE aid = $1
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 1
|
||||
)
|
||||
SELECT
|
||||
CASE
|
||||
WHEN n.views > 100000
|
||||
THEN
|
||||
(1000000 - n.views) -- Views remaining
|
||||
/
|
||||
(
|
||||
(n.views - o.views) -- Views delta
|
||||
/
|
||||
(EXTRACT(EPOCH FROM (n.created_at - o.created_at)) + 0.001) -- Time delta in seconds
|
||||
+ 0.001
|
||||
) -- Increment per second
|
||||
ELSE
|
||||
(100000 - n.views) -- Views remaining
|
||||
/
|
||||
(
|
||||
(n.views - o.views) -- Views delta
|
||||
/
|
||||
(EXTRACT(EPOCH FROM (n.created_at - o.created_at)) + 0.001) -- Time delta in seconds
|
||||
+ 0.001
|
||||
) -- Increment per second
|
||||
END AS eta
|
||||
FROM old_snapshot o, new_snapshot n;
|
||||
`,
|
||||
[aid],
|
||||
);
|
||||
export async function getLatestVideoSnapshot(client: Client, aid: number): Promise<null | LatestSnapshotType> {
|
||||
const queryResult = await client.queryObject<LatestSnapshotType>(`
|
||||
SELECT *
|
||||
FROM latest_video_snapshot
|
||||
WHERE aid = $1
|
||||
`, [aid]);
|
||||
if (queryResult.rows.length === 0) {
|
||||
return null;
|
||||
}
|
||||
return queryResult.rows[0].eta;
|
||||
}
|
||||
|
||||
export async function getIntervalFromLastSnapshotToNow(client: Client, aid: number) {
|
||||
const queryResult = await client.queryObject<{ interval: number }>(
|
||||
`
|
||||
SELECT EXTRACT(EPOCH FROM (NOW() - created_at)) AS interval
|
||||
FROM video_snapshot
|
||||
WHERE aid = $1
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 1;
|
||||
`,
|
||||
[aid],
|
||||
);
|
||||
if (queryResult.rows.length === 0) {
|
||||
return null;
|
||||
}
|
||||
return queryResult.rows[0].interval;
|
||||
}
|
||||
|
||||
export async function songEligibleForMilestoneSnapshot(client: Client, aid: number) {
|
||||
const count = await getSongSnapshotCount(client, aid);
|
||||
if (count < 2) {
|
||||
return true;
|
||||
}
|
||||
const queryResult = await client.queryObject<
|
||||
{ views1: number; created_at1: string; views2: number; created_at2: string }
|
||||
>(
|
||||
`
|
||||
WITH latest_snapshot AS (
|
||||
SELECT
|
||||
aid,
|
||||
views,
|
||||
created_at
|
||||
FROM video_snapshot
|
||||
WHERE aid = $1
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 1
|
||||
),
|
||||
pairs AS (
|
||||
SELECT
|
||||
a.views AS views1,
|
||||
a.created_at AS created_at1,
|
||||
b.views AS views2,
|
||||
b.created_at AS created_at2,
|
||||
(b.created_at - a.created_at) AS interval
|
||||
FROM video_snapshot a
|
||||
JOIN latest_snapshot b
|
||||
ON a.aid = b.aid
|
||||
AND a.created_at < b.created_at
|
||||
)
|
||||
SELECT
|
||||
views1,
|
||||
created_at1,
|
||||
views2,
|
||||
created_at2
|
||||
FROM (
|
||||
SELECT
|
||||
*,
|
||||
ROW_NUMBER() OVER (
|
||||
ORDER BY
|
||||
CASE WHEN interval <= INTERVAL '3 days' THEN 0 ELSE 1 END,
|
||||
CASE WHEN interval <= INTERVAL '3 days' THEN -interval ELSE interval END
|
||||
) AS rn
|
||||
FROM pairs
|
||||
) ranked
|
||||
WHERE rn = 1;
|
||||
`,
|
||||
[aid],
|
||||
);
|
||||
if (queryResult.rows.length === 0) {
|
||||
return true;
|
||||
}
|
||||
const recentViewsData = queryResult.rows[0];
|
||||
const time1 = parseTimestampFromPsql(recentViewsData.created_at1);
|
||||
const time2 = parseTimestampFromPsql(recentViewsData.created_at2);
|
||||
const intervalSec = (time2 - time1) / SECOND;
|
||||
const views1 = recentViewsData.views1;
|
||||
const views2 = recentViewsData.views2;
|
||||
const viewsDiff = views2 - views1;
|
||||
if (viewsDiff == 0) {
|
||||
return false;
|
||||
}
|
||||
const nextMilestone = views2 >= 100000 ? 1000000 : 100000;
|
||||
const expectedViewsDiff = nextMilestone - views2;
|
||||
const expectedIntervalSec = expectedViewsDiff / viewsDiff * intervalSec;
|
||||
return expectedIntervalSec <= 3 * DAY;
|
||||
return queryResult.rows.map((row) => {
|
||||
return {
|
||||
...row,
|
||||
aid: Number(row.aid),
|
||||
}
|
||||
})[0];
|
||||
}
|
||||
|
@ -1,7 +1,8 @@
|
||||
import { DAY, MINUTE } from "$std/datetime/constants.ts";
|
||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||
import { formatTimestampToPsql } from "lib/utils/formatTimestampToPostgre.ts";
|
||||
import { SnapshotScheduleType } from "./schema.d.ts";
|
||||
import {DAY, HOUR, MINUTE} from "$std/datetime/constants.ts";
|
||||
import {Client} from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||
import {formatTimestampToPsql} from "lib/utils/formatTimestampToPostgre.ts";
|
||||
import {SnapshotScheduleType} from "./schema.d.ts";
|
||||
import logger from "../log/logger.ts";
|
||||
|
||||
/*
|
||||
Returns true if the specified `aid` has at least one record with "pending" or "processing" status.
|
||||
@ -14,6 +15,14 @@ export async function videoHasActiveSchedule(client: Client, aid: number) {
|
||||
return res.rows.length > 0;
|
||||
}
|
||||
|
||||
export async function videoHasProcessingSchedule(client: Client, aid: number) {
|
||||
const res = await client.queryObject<{ status: string }>(
|
||||
`SELECT status FROM snapshot_schedule WHERE aid = $1 AND status = 'processing'`,
|
||||
[aid],
|
||||
);
|
||||
return res.rows.length > 0;
|
||||
}
|
||||
|
||||
interface Snapshot {
|
||||
created_at: number;
|
||||
views: number;
|
||||
@ -43,6 +52,14 @@ export async function findClosestSnapshot(
|
||||
};
|
||||
}
|
||||
|
||||
export async function hasAtLeast2Snapshots(client: Client, aid: number) {
|
||||
const res = await client.queryObject<{ count: number }>(
|
||||
`SELECT COUNT(*) FROM video_snapshot WHERE aid = $1`,
|
||||
[aid],
|
||||
);
|
||||
return res.rows[0].count >= 2;
|
||||
}
|
||||
|
||||
export async function getLatestSnapshot(client: Client, aid: number): Promise<Snapshot | null> {
|
||||
const res = await client.queryObject<{ created_at: string; views: number }>(
|
||||
`SELECT created_at, views FROM video_snapshot WHERE aid = $1 ORDER BY created_at DESC LIMIT 1`,
|
||||
@ -81,10 +98,12 @@ export async function getSnapshotScheduleCountWithinRange(client: Client, start:
|
||||
* @param targetTime Scheduled time for snapshot. (Timestamp in milliseconds)
|
||||
*/
|
||||
export async function scheduleSnapshot(client: Client, aid: number, type: string, targetTime: number) {
|
||||
const ajustedTime = await adjustSnapshotTime(client, new Date(targetTime));
|
||||
const allowedCount = type === "milestone" ? 2000 : 800;
|
||||
const adjustedTime = await adjustSnapshotTime(client, new Date(targetTime), allowedCount);
|
||||
logger.log(`Scheduled snapshot for ${aid} at ${adjustedTime.toISOString()}`, "mq", "fn:scheduleSnapshot");
|
||||
return client.queryObject(
|
||||
`INSERT INTO snapshot_schedule (aid, type, started_at) VALUES ($1, $2, $3)`,
|
||||
[aid, type, ajustedTime.toISOString()],
|
||||
[aid, type, adjustedTime.toISOString()],
|
||||
);
|
||||
}
|
||||
|
||||
@ -92,18 +111,20 @@ export async function scheduleSnapshot(client: Client, aid: number, type: string
|
||||
* Adjust the trigger time of the snapshot to ensure it does not exceed the frequency limit
|
||||
* @param client PostgreSQL client
|
||||
* @param expectedStartTime The expected snapshot time
|
||||
* @param allowedCounts The number of snapshots allowed in the 5-minutes windows.
|
||||
* @returns The adjusted actual snapshot time
|
||||
*/
|
||||
export async function adjustSnapshotTime(
|
||||
client: Client,
|
||||
expectedStartTime: Date,
|
||||
allowedCounts: number = 2000
|
||||
): Promise<Date> {
|
||||
const findWindowQuery = `
|
||||
WITH windows AS (
|
||||
SELECT generate_series(
|
||||
$1::timestamp, -- Start time: current time truncated to the nearest 5-minute window
|
||||
$2::timestamp, -- End time: 24 hours after the target time window starts
|
||||
INTERVAL '5 MINUTES'
|
||||
$1::timestamp, -- Start time: current time truncated to the nearest 5-minute window
|
||||
$2::timestamp, -- End time: 24 hours after the target time window starts
|
||||
INTERVAL '5 MINUTES'
|
||||
) AS window_start
|
||||
)
|
||||
SELECT w.window_start
|
||||
@ -112,29 +133,36 @@ export async function adjustSnapshotTime(
|
||||
AND s.started_at < w.window_start + INTERVAL '5 MINUTES'
|
||||
AND s.status = 'pending'
|
||||
GROUP BY w.window_start
|
||||
HAVING COUNT(s.*) < 2000
|
||||
HAVING COUNT(s.*) < ${allowedCounts}
|
||||
ORDER BY w.window_start
|
||||
LIMIT 1;
|
||||
`;
|
||||
for (let i = 0; i < 7; i++) {
|
||||
const now = new Date(new Date().getTime() + 5 * MINUTE);
|
||||
const nowTruncated = truncateTo5MinInterval(now);
|
||||
const currentWindowStart = truncateTo5MinInterval(expectedStartTime);
|
||||
const end = new Date(currentWindowStart.getTime() + 1 * DAY);
|
||||
|
||||
const windowResult = await client.queryObject<{ window_start: Date }>(
|
||||
findWindowQuery,
|
||||
[nowTruncated, end],
|
||||
);
|
||||
|
||||
const windowStart = windowResult.rows[0]?.window_start;
|
||||
if (!windowStart) {
|
||||
continue;
|
||||
}
|
||||
|
||||
return windowStart;
|
||||
const now = new Date();
|
||||
const targetTime = expectedStartTime.getTime();
|
||||
let start = new Date(targetTime - 2 * HOUR);
|
||||
if (start.getTime() <= now.getTime()) {
|
||||
start = now;
|
||||
}
|
||||
const startTruncated = truncateTo5MinInterval(start);
|
||||
const end = new Date(startTruncated.getTime() + 1 * DAY);
|
||||
|
||||
const windowResult = await client.queryObject<{ window_start: Date }>(
|
||||
findWindowQuery,
|
||||
[startTruncated, end],
|
||||
);
|
||||
|
||||
|
||||
const windowStart = windowResult.rows[0]?.window_start;
|
||||
if (!windowStart) {
|
||||
return expectedStartTime;
|
||||
}
|
||||
|
||||
if (windowStart.getTime() > new Date().getTime() + 5 * MINUTE) {
|
||||
const randomDelay = Math.floor(Math.random() * 5 * MINUTE);
|
||||
return new Date(windowStart.getTime() + randomDelay);
|
||||
} else {
|
||||
return expectedStartTime;
|
||||
}
|
||||
return expectedStartTime;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -156,9 +184,31 @@ function truncateTo5MinInterval(timestamp: Date): Date {
|
||||
}
|
||||
|
||||
export async function getSnapshotsInNextSecond(client: Client) {
|
||||
const res = await client.queryObject<SnapshotScheduleType>(
|
||||
`SELECT * FROM cvsa.public.snapshot_schedule WHERE started_at <= NOW() + INTERVAL '1 second'`,
|
||||
[],
|
||||
);
|
||||
const query = `
|
||||
SELECT *
|
||||
FROM snapshot_schedule
|
||||
WHERE started_at
|
||||
BETWEEN NOW() - INTERVAL '5 seconds'
|
||||
AND NOW() + INTERVAL '1 seconds'
|
||||
`;
|
||||
const res = await client.queryObject<SnapshotScheduleType>(query, []);
|
||||
return res.rows;
|
||||
}
|
||||
|
||||
export async function setSnapshotStatus(client: Client, id: number, status: string) {
|
||||
return await client.queryObject(
|
||||
`UPDATE snapshot_schedule SET status = $2 WHERE id = $1`,
|
||||
[id, status],
|
||||
);
|
||||
}
|
||||
|
||||
export async function getVideosWithoutActiveSnapshotSchedule(client: Client) {
|
||||
const query: string = `
|
||||
SELECT s.aid
|
||||
FROM songs s
|
||||
LEFT JOIN snapshot_schedule ss ON s.aid = ss.aid AND (ss.status = 'pending' OR ss.status = 'processing')
|
||||
WHERE ss.aid IS NULL
|
||||
`;
|
||||
const res = await client.queryObject<{ aid: number }>(query, []);
|
||||
return res.rows.map((r) => Number(r.aid));
|
||||
}
|
||||
|
@ -1,20 +1,33 @@
|
||||
import { Job } from "bullmq";
|
||||
import { db } from "lib/db/init.ts";
|
||||
import { getVideosNearMilestone } from "lib/db/snapshot.ts";
|
||||
import {getLatestVideoSnapshot, getVideosNearMilestone} from "lib/db/snapshot.ts";
|
||||
import {
|
||||
findClosestSnapshot,
|
||||
getLatestSnapshot,
|
||||
getSnapshotsInNextSecond,
|
||||
getSnapshotsInNextSecond, getVideosWithoutActiveSnapshotSchedule,
|
||||
hasAtLeast2Snapshots,
|
||||
scheduleSnapshot,
|
||||
setSnapshotStatus,
|
||||
videoHasActiveSchedule,
|
||||
videoHasProcessingSchedule,
|
||||
} from "lib/db/snapshotSchedule.ts";
|
||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||
import { HOUR, MINUTE } from "$std/datetime/constants.ts";
|
||||
import { HOUR, MINUTE, SECOND } from "$std/datetime/constants.ts";
|
||||
import logger from "lib/log/logger.ts";
|
||||
import { SnapshotQueue } from "lib/mq/index.ts";
|
||||
import { insertVideoSnapshot } from "lib/mq/task/getVideoStats.ts";
|
||||
import { NetSchedulerError } from "lib/mq/scheduler.ts";
|
||||
import { setBiliVideoStatus } from "lib/db/allData.ts";
|
||||
import { truncate } from "lib/utils/truncate.ts";
|
||||
|
||||
const priorityMap: { [key: string]: number } = {
|
||||
"milestone": 1,
|
||||
"normal": 3,
|
||||
};
|
||||
|
||||
const snapshotTypeToTaskMap: { [key: string]: string } = {
|
||||
"milestone": "snapshotMilestoneVideo",
|
||||
"normal": "snapshotVideo",
|
||||
};
|
||||
|
||||
export const snapshotTickWorker = async (_job: Job) => {
|
||||
@ -23,10 +36,18 @@ export const snapshotTickWorker = async (_job: Job) => {
|
||||
const schedules = await getSnapshotsInNextSecond(client);
|
||||
for (const schedule of schedules) {
|
||||
let priority = 3;
|
||||
if (schedule.type && priorityMap[schedule.type])
|
||||
if (schedule.type && priorityMap[schedule.type]) {
|
||||
priority = priorityMap[schedule.type];
|
||||
await SnapshotQueue.add("snapshotVideo", { aid: schedule.aid, priority });
|
||||
}
|
||||
const aid = Number(schedule.aid);
|
||||
await SnapshotQueue.add("snapshotVideo", {
|
||||
aid: aid,
|
||||
id: Number(schedule.id),
|
||||
type: schedule.type ?? "normal",
|
||||
}, { priority });
|
||||
}
|
||||
} catch (e) {
|
||||
logger.error(e as Error);
|
||||
} finally {
|
||||
client.release();
|
||||
}
|
||||
@ -50,9 +71,11 @@ const getAdjustedShortTermETA = async (client: Client, aid: number) => {
|
||||
const latestSnapshot = await getLatestSnapshot(client, aid);
|
||||
// Immediately dispatch a snapshot if there is no snapshot yet
|
||||
if (!latestSnapshot) return 0;
|
||||
const snapshotsEnough = await hasAtLeast2Snapshots(client, aid);
|
||||
if (!snapshotsEnough) return 0;
|
||||
|
||||
const currentTimestamp = Date.now();
|
||||
const timeIntervals = [20 * MINUTE, 1 * HOUR, 3 * HOUR, 6 * HOUR];
|
||||
const currentTimestamp = new Date().getTime();
|
||||
const timeIntervals = [3 * MINUTE, 20 * MINUTE, 1 * HOUR, 3 * HOUR, 6 * HOUR];
|
||||
const DELTA = 0.00001;
|
||||
let minETAHours = Infinity;
|
||||
|
||||
@ -60,13 +83,15 @@ const getAdjustedShortTermETA = async (client: Client, aid: number) => {
|
||||
const date = new Date(currentTimestamp - timeInterval);
|
||||
const snapshot = await findClosestSnapshot(client, aid, date);
|
||||
if (!snapshot) continue;
|
||||
const hoursDiff = (currentTimestamp - snapshot.created_at) / HOUR;
|
||||
const viewsDiff = snapshot.views - latestSnapshot.views;
|
||||
const hoursDiff = (latestSnapshot.created_at - snapshot.created_at) / HOUR;
|
||||
const viewsDiff = latestSnapshot.views - snapshot.views;
|
||||
if (viewsDiff <= 0) continue;
|
||||
const speed = viewsDiff / (hoursDiff + DELTA);
|
||||
const target = closetMilestone(latestSnapshot.views);
|
||||
const viewsToIncrease = target - latestSnapshot.views;
|
||||
const eta = viewsToIncrease / (speed + DELTA);
|
||||
const factor = log(2.97 / log(viewsToIncrease + 1), 1.14);
|
||||
let factor = log(2.97 / log(viewsToIncrease + 1), 1.14);
|
||||
factor = truncate(factor, 3, 100);
|
||||
const adjustedETA = eta / factor;
|
||||
if (adjustedETA < minETAHours) {
|
||||
minETAHours = adjustedETA;
|
||||
@ -80,12 +105,17 @@ export const collectMilestoneSnapshotsWorker = async (_job: Job) => {
|
||||
try {
|
||||
const videos = await getVideosNearMilestone(client);
|
||||
for (const video of videos) {
|
||||
if (await videoHasActiveSchedule(client, video.aid)) continue;
|
||||
const eta = await getAdjustedShortTermETA(client, video.aid);
|
||||
const aid = Number(video.aid);
|
||||
if (await videoHasActiveSchedule(client, aid)) continue;
|
||||
const eta = await getAdjustedShortTermETA(client, aid);
|
||||
if (eta > 72) continue;
|
||||
const now = Date.now();
|
||||
const targetTime = now + eta * HOUR;
|
||||
await scheduleSnapshot(client, video.aid, "milestone", targetTime);
|
||||
const scheduledNextSnapshotDelay = eta * HOUR;
|
||||
const maxInterval = 4 * HOUR;
|
||||
const minInterval = 1 * SECOND;
|
||||
const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval);
|
||||
const targetTime = now + delay;
|
||||
await scheduleSnapshot(client, aid, "milestone", targetTime);
|
||||
}
|
||||
} catch (e) {
|
||||
logger.error(e as Error, "mq", "fn:collectMilestoneSnapshotsWorker");
|
||||
@ -94,6 +124,70 @@ export const collectMilestoneSnapshotsWorker = async (_job: Job) => {
|
||||
}
|
||||
};
|
||||
|
||||
export const takeSnapshotForVideoWorker = async (_job: Job) => {
|
||||
// TODO: implement
|
||||
export const regularSnapshotsWorker = async (_job: Job) => {
|
||||
const client = await db.connect();
|
||||
try {
|
||||
const aids = await getVideosWithoutActiveSnapshotSchedule(client);
|
||||
for (const rawAid of aids) {
|
||||
const aid = Number(rawAid);
|
||||
if (await videoHasActiveSchedule(client, aid)) continue;
|
||||
const latestSnapshot = await getLatestVideoSnapshot(client, aid);
|
||||
const now = Date.now();
|
||||
const lastSnapshotedAt = latestSnapshot?.time ?? now;
|
||||
const targetTime = truncate(lastSnapshotedAt + 24 * HOUR, now + 1, Infinity);
|
||||
await scheduleSnapshot(client, aid, "normal", targetTime);
|
||||
}
|
||||
} catch (e) {
|
||||
logger.error(e as Error, "mq", "fn:regularSnapshotsWorker");
|
||||
} finally {
|
||||
client.release();
|
||||
}
|
||||
};
|
||||
|
||||
export const takeSnapshotForVideoWorker = async (job: Job) => {
|
||||
const id = job.data.id;
|
||||
const aid = Number(job.data.aid);
|
||||
const type = job.data.type;
|
||||
const task = snapshotTypeToTaskMap[type] ?? "snapshotVideo";
|
||||
const client = await db.connect();
|
||||
const retryInterval = type === "milestone" ? 5 * SECOND : 2 * MINUTE;
|
||||
try {
|
||||
if (await videoHasProcessingSchedule(client, aid)) {
|
||||
return `ALREADY_PROCESSING`;
|
||||
}
|
||||
await setSnapshotStatus(client, id, "processing");
|
||||
const stat = await insertVideoSnapshot(client, aid, task);
|
||||
if (typeof stat === "number") {
|
||||
await setBiliVideoStatus(client, aid, stat);
|
||||
await setSnapshotStatus(client, id, "completed");
|
||||
return `BILI_STATUS_${stat}`;
|
||||
}
|
||||
await setSnapshotStatus(client, id, "completed");
|
||||
if (type === "normal") {
|
||||
await scheduleSnapshot(client, aid, type, Date.now() + 24 * HOUR);
|
||||
return `DONE`;
|
||||
}
|
||||
if (type !== "milestone") return `DONE`;
|
||||
const eta = await getAdjustedShortTermETA(client, aid);
|
||||
if (eta > 72) return "ETA_TOO_LONG";
|
||||
const now = Date.now();
|
||||
const targetTime = now + eta * HOUR;
|
||||
await scheduleSnapshot(client, aid, type, targetTime);
|
||||
return `DONE`;
|
||||
} catch (e) {
|
||||
if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") {
|
||||
logger.warn(
|
||||
`No available proxy for aid ${job.data.aid}.`,
|
||||
"mq",
|
||||
"fn:takeSnapshotForVideoWorker",
|
||||
);
|
||||
await setSnapshotStatus(client, id, "completed");
|
||||
await scheduleSnapshot(client, aid, type, Date.now() + retryInterval);
|
||||
return;
|
||||
}
|
||||
logger.error(e as Error, "mq", "fn:takeSnapshotForVideoWorker");
|
||||
await setSnapshotStatus(client, id, "failed");
|
||||
} finally {
|
||||
client.release();
|
||||
}
|
||||
};
|
||||
|
@ -18,11 +18,22 @@ export async function initMQ() {
|
||||
await SnapshotQueue.upsertJobScheduler("snapshotTick", {
|
||||
every: 1 * SECOND,
|
||||
immediately: true,
|
||||
}, {
|
||||
opts: {
|
||||
removeOnComplete: 1,
|
||||
removeOnFail: 1,
|
||||
},
|
||||
});
|
||||
|
||||
await SnapshotQueue.upsertJobScheduler("collectMilestoneSnapshots", {
|
||||
every: 5 * MINUTE,
|
||||
immediately: true,
|
||||
});
|
||||
|
||||
await SnapshotQueue.upsertJobScheduler("dispatchRegularSnapshots", {
|
||||
every: 30 * MINUTE,
|
||||
immediately: true,
|
||||
});
|
||||
|
||||
logger.log("Message queue initialized.");
|
||||
}
|
||||
|
@ -21,7 +21,7 @@ interface ProxiesMap {
|
||||
}
|
||||
|
||||
type NetSchedulerErrorCode =
|
||||
| "NO_AVAILABLE_PROXY"
|
||||
| "NO_PROXY_AVAILABLE"
|
||||
| "PROXY_RATE_LIMITED"
|
||||
| "PROXY_NOT_FOUND"
|
||||
| "FETCH_ERROR"
|
||||
@ -143,10 +143,10 @@ class NetScheduler {
|
||||
* @param {string} method - The HTTP method to use for the request. Default is "GET".
|
||||
* @returns {Promise<any>} - A promise that resolves to the response body.
|
||||
* @throws {NetSchedulerError} - The error will be thrown in following cases:
|
||||
* - No available proxy currently: with error code NO_AVAILABLE_PROXY
|
||||
* - Proxy is under rate limit: with error code PROXY_RATE_LIMITED
|
||||
* - The native `fetch` function threw an error: with error code FETCH_ERROR
|
||||
* - The proxy type is not supported: with error code NOT_IMPLEMENTED
|
||||
* - No proxy is available currently: with error code `NO_PROXY_AVAILABLE`
|
||||
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
|
||||
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
|
||||
* - The proxy type is not supported: with error code `NOT_IMPLEMENTED`
|
||||
*/
|
||||
async request<R>(url: string, task: string, method: string = "GET"): Promise<R> {
|
||||
// find a available proxy
|
||||
@ -156,7 +156,7 @@ class NetScheduler {
|
||||
return await this.proxyRequest<R>(url, proxyName, task, method);
|
||||
}
|
||||
}
|
||||
throw new NetSchedulerError("No available proxy currently.", "NO_AVAILABLE_PROXY");
|
||||
throw new NetSchedulerError("No proxy is available currently.", "NO_PROXY_AVAILABLE");
|
||||
}
|
||||
|
||||
/*
|
||||
@ -168,10 +168,11 @@ class NetScheduler {
|
||||
* @param {boolean} force - If true, the request will be made even if the proxy is rate limited. Default is false.
|
||||
* @returns {Promise<any>} - A promise that resolves to the response body.
|
||||
* @throws {NetSchedulerError} - The error will be thrown in following cases:
|
||||
* - Proxy not found: with error code PROXY_NOT_FOUND
|
||||
* - Proxy is under rate limit: with error code PROXY_RATE_LIMITED
|
||||
* - The native `fetch` function threw an error: with error code FETCH_ERROR
|
||||
* - The proxy type is not supported: with error code NOT_IMPLEMENTED
|
||||
* - Proxy not found: with error code `PROXY_NOT_FOUND`
|
||||
* - Proxy is under rate limit: with error code `PROXY_RATE_LIMITED`
|
||||
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
|
||||
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
|
||||
* - The proxy type is not supported: with error code `NOT_IMPLEMENTED`
|
||||
*/
|
||||
async proxyRequest<R>(
|
||||
url: string,
|
||||
@ -255,8 +256,6 @@ class NetScheduler {
|
||||
}
|
||||
|
||||
private async alicloudFcRequest<R>(url: string, region: string): Promise<R> {
|
||||
let rawOutput: null | Uint8Array = null;
|
||||
let rawErr: null | Uint8Array = null;
|
||||
try {
|
||||
const decoder = new TextDecoder();
|
||||
const output = await new Deno.Command("aliyun", {
|
||||
@ -280,19 +279,9 @@ class NetScheduler {
|
||||
`CVSA-${region}`,
|
||||
],
|
||||
}).output();
|
||||
rawOutput = output.stdout;
|
||||
rawErr = output.stderr;
|
||||
const out = decoder.decode(output.stdout);
|
||||
const rawData = JSON.parse(out);
|
||||
if (rawData.statusCode !== 200) {
|
||||
const fileId = randomUUID();
|
||||
await Deno.writeFile(`./logs/files/${fileId}.stdout`, output.stdout);
|
||||
await Deno.writeFile(`./logs/files/${fileId}.stderr`, output.stderr);
|
||||
logger.log(
|
||||
`Returned non-200 status code. Raw ouput saved to ./logs/files/${fileId}.stdout/stderr`,
|
||||
"net",
|
||||
"fn:alicloudFcRequest",
|
||||
);
|
||||
throw new NetSchedulerError(
|
||||
`Error proxying ${url} to ali-fc region ${region}, code: ${rawData.statusCode}.`,
|
||||
"ALICLOUD_PROXY_ERR",
|
||||
@ -301,16 +290,6 @@ class NetScheduler {
|
||||
return JSON.parse(JSON.parse(rawData.body)) as R;
|
||||
}
|
||||
} catch (e) {
|
||||
if (rawOutput !== null || rawErr !== null) {
|
||||
const fileId = randomUUID();
|
||||
rawOutput && await Deno.writeFile(`./logs/files/${fileId}.stdout`, rawOutput);
|
||||
rawErr && await Deno.writeFile(`./logs/files/${fileId}.stderr`, rawErr);
|
||||
logger.log(
|
||||
`Error occurred. Raw ouput saved to ./logs/files/${fileId}.stdout/stderr`,
|
||||
"net",
|
||||
"fn:alicloudFcRequest",
|
||||
);
|
||||
}
|
||||
logger.error(e as Error, "net", "fn:alicloudFcRequest");
|
||||
throw new NetSchedulerError(`Unhandled error: Cannot proxy ${url} to ali-fc.`, "ALICLOUD_PROXY_ERR", e);
|
||||
}
|
||||
@ -369,7 +348,8 @@ Execution order for setup:
|
||||
- Call after addProxy and addTask. Configures rate limiters specifically for tasks and their associated proxies.
|
||||
- Depends on tasks and proxies being defined to apply limiters correctly.
|
||||
4. setProviderLimiter(providerName, config):
|
||||
- Call after addProxy and addTask. Sets rate limiters at the provider level, affecting all proxies used by tasks of that provider.
|
||||
- Call after addProxy and addTask.
|
||||
- It sets rate limiters at the provider level, affecting all proxies used by tasks of that provider.
|
||||
- Depends on tasks and proxies being defined to identify which proxies to apply provider-level limiters to.
|
||||
|
||||
In summary: addProxy -> addTask -> (setTaskLimiter and/or setProviderLimiter).
|
||||
|
@ -1,12 +1,27 @@
|
||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||
import { getVideoInfo } from "lib/net/getVideoInfo.ts";
|
||||
import { LatestSnapshotType } from "lib/db/schema.d.ts";
|
||||
|
||||
export async function insertVideoStats(client: Client, aid: number, task: string) {
|
||||
/*
|
||||
* Fetch video stats from bilibili API and insert into database
|
||||
* @returns {Promise<number|VideoSnapshot>}
|
||||
* A number indicating the status code when receiving non-0 status code from bilibili,
|
||||
* otherwise an VideoSnapshot object containing the video stats
|
||||
* @throws {NetSchedulerError} - The error will be thrown in following cases:
|
||||
* - No proxy is available currently: with error code `NO_PROXY_AVAILABLE`
|
||||
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
|
||||
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
|
||||
*/
|
||||
export async function insertVideoSnapshot(
|
||||
client: Client,
|
||||
aid: number,
|
||||
task: string,
|
||||
): Promise<number | LatestSnapshotType> {
|
||||
const data = await getVideoInfo(aid, task);
|
||||
const time = new Date().getTime();
|
||||
if (typeof data == "number") {
|
||||
return data;
|
||||
}
|
||||
const time = new Date().getTime();
|
||||
const views = data.stat.view;
|
||||
const danmakus = data.stat.danmaku;
|
||||
const replies = data.stat.reply;
|
||||
@ -14,14 +29,17 @@ export async function insertVideoStats(client: Client, aid: number, task: string
|
||||
const coins = data.stat.coin;
|
||||
const shares = data.stat.share;
|
||||
const favorites = data.stat.favorite;
|
||||
await client.queryObject(
|
||||
`
|
||||
|
||||
const query: string = `
|
||||
INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||
`,
|
||||
`;
|
||||
await client.queryObject(
|
||||
query,
|
||||
[aid, views, danmakus, replies, likes, coins, shares, favorites],
|
||||
);
|
||||
return {
|
||||
|
||||
const snapshot: LatestSnapshotType = {
|
||||
aid,
|
||||
views,
|
||||
danmakus,
|
||||
@ -32,4 +50,6 @@ export async function insertVideoStats(client: Client, aid: number, task: string
|
||||
favorites,
|
||||
time,
|
||||
};
|
||||
|
||||
return snapshot;
|
||||
}
|
||||
|
@ -2,6 +2,19 @@ import netScheduler from "lib/mq/scheduler.ts";
|
||||
import { VideoInfoData, VideoInfoResponse } from "lib/net/bilibili.d.ts";
|
||||
import logger from "lib/log/logger.ts";
|
||||
|
||||
/*
|
||||
* Fetch video metadata from bilibili API
|
||||
* @param {number} aid - The video's aid
|
||||
* @param {string} task - The task name used in scheduler. It can be one of the following:
|
||||
* - snapshotVideo
|
||||
* - getVideoInfo
|
||||
* - snapshotMilestoneVideo
|
||||
* @returns {Promise<VideoInfoData | number>} VideoInfoData or the error code returned by bilibili API
|
||||
* @throws {NetSchedulerError} - The error will be thrown in following cases:
|
||||
* - No proxy is available currently: with error code `NO_PROXY_AVAILABLE`
|
||||
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
|
||||
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
|
||||
*/
|
||||
export async function getVideoInfo(aid: number, task: string): Promise<VideoInfoData | number> {
|
||||
const url = `https://api.bilibili.com/x/web-interface/view?aid=${aid}`;
|
||||
const data = await netScheduler.request<VideoInfoResponse>(url, task);
|
||||
|
@ -6,7 +6,7 @@ import { lockManager } from "lib/mq/lockManager.ts";
|
||||
import { WorkerError } from "lib/mq/schema.ts";
|
||||
import { getVideoInfoWorker } from "lib/mq/exec/getLatestVideos.ts";
|
||||
import {
|
||||
collectMilestoneSnapshotsWorker,
|
||||
collectMilestoneSnapshotsWorker, regularSnapshotsWorker,
|
||||
snapshotTickWorker,
|
||||
takeSnapshotForVideoWorker,
|
||||
} from "lib/mq/exec/snapshotTick.ts";
|
||||
@ -14,12 +14,14 @@ import {
|
||||
Deno.addSignalListener("SIGINT", async () => {
|
||||
logger.log("SIGINT Received: Shutting down workers...", "mq");
|
||||
await latestVideoWorker.close(true);
|
||||
await snapshotWorker.close(true);
|
||||
Deno.exit();
|
||||
});
|
||||
|
||||
Deno.addSignalListener("SIGTERM", async () => {
|
||||
logger.log("SIGTERM Received: Shutting down workers...", "mq");
|
||||
await latestVideoWorker.close(true);
|
||||
await snapshotWorker.close(true);
|
||||
Deno.exit();
|
||||
});
|
||||
|
||||
@ -40,7 +42,12 @@ const latestVideoWorker = new Worker(
|
||||
break;
|
||||
}
|
||||
},
|
||||
{ connection: redis as ConnectionOptions, concurrency: 6, removeOnComplete: { count: 1440 }, removeOnFail: { count: 0 } },
|
||||
{
|
||||
connection: redis as ConnectionOptions,
|
||||
concurrency: 6,
|
||||
removeOnComplete: { count: 1440 },
|
||||
removeOnFail: { count: 0 },
|
||||
},
|
||||
);
|
||||
|
||||
latestVideoWorker.on("active", () => {
|
||||
@ -69,6 +76,9 @@ const snapshotWorker = new Worker(
|
||||
case "collectMilestoneSnapshots":
|
||||
await collectMilestoneSnapshotsWorker(job);
|
||||
break;
|
||||
case "dispatchRegularSnapshots":
|
||||
await regularSnapshotsWorker(job);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user