Compare commits
4 Commits
b201bfd64d
...
7768a202b2
Author | SHA1 | Date | |
---|---|---|---|
7768a202b2 | |||
8652ac8fb7 | |||
18fc9752bb | |||
2c12310e8c |
@ -68,3 +68,10 @@ export async function getUnArchivedBiliUsers(client: Client) {
|
|||||||
const rows = queryResult.rows;
|
const rows = queryResult.rows;
|
||||||
return rows.map((row) => row.uid);
|
return rows.map((row) => row.uid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function setBiliVideoStatus(client: Client, aid: number, status: number) {
|
||||||
|
return await client.queryObject(
|
||||||
|
`UPDATE bilibili_metadata SET status = $1 WHERE aid = $2`,
|
||||||
|
[status, aid],
|
||||||
|
);
|
||||||
|
}
|
||||||
|
12
lib/db/schema.d.ts
vendored
12
lib/db/schema.d.ts
vendored
@ -32,6 +32,18 @@ export interface VideoSnapshotType {
|
|||||||
replies: number;
|
replies: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export interface LatestSnapshotType {
|
||||||
|
aid: number;
|
||||||
|
time: number;
|
||||||
|
views: number;
|
||||||
|
danmakus: number;
|
||||||
|
replies: number;
|
||||||
|
likes: number;
|
||||||
|
coins: number;
|
||||||
|
shares: number;
|
||||||
|
favorites: number;
|
||||||
|
}
|
||||||
|
|
||||||
export interface SnapshotScheduleType {
|
export interface SnapshotScheduleType {
|
||||||
id: number;
|
id: number;
|
||||||
aid: number;
|
aid: number;
|
||||||
|
@ -1,35 +1,17 @@
|
|||||||
import { DAY, SECOND } from "$std/datetime/constants.ts";
|
|
||||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
import { VideoSnapshotType } from "lib/db/schema.d.ts";
|
import { LatestSnapshotType } from "lib/db/schema.d.ts";
|
||||||
import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts";
|
|
||||||
|
|
||||||
export async function getVideosNearMilestone(client: Client) {
|
export async function getVideosNearMilestone(client: Client) {
|
||||||
const queryResult = await client.queryObject<VideoSnapshotType>(`
|
const queryResult = await client.queryObject<LatestSnapshotType>(`
|
||||||
WITH filtered_snapshots AS (
|
SELECT ls.*
|
||||||
SELECT
|
FROM latest_video_snapshot ls
|
||||||
vs.*
|
|
||||||
FROM
|
|
||||||
video_snapshot vs
|
|
||||||
WHERE
|
|
||||||
(vs.views >= 90000 AND vs.views < 100000) OR
|
|
||||||
(vs.views >= 900000 AND vs.views < 1000000)
|
|
||||||
),
|
|
||||||
ranked_snapshots AS (
|
|
||||||
SELECT
|
|
||||||
fs.*,
|
|
||||||
ROW_NUMBER() OVER (PARTITION BY fs.aid ORDER BY fs.created_at DESC) as rn,
|
|
||||||
MAX(fs.views) OVER (PARTITION BY fs.aid) as max_views_per_aid
|
|
||||||
FROM
|
|
||||||
filtered_snapshots fs
|
|
||||||
INNER JOIN
|
INNER JOIN
|
||||||
songs s ON fs.aid = s.aid
|
songs s ON ls.aid = s.aid
|
||||||
)
|
|
||||||
SELECT
|
|
||||||
rs.id, rs.created_at, rs.views, rs.coins, rs.likes, rs.favorites, rs.shares, rs.danmakus, rs.aid, rs.replies
|
|
||||||
FROM
|
|
||||||
ranked_snapshots rs
|
|
||||||
WHERE
|
WHERE
|
||||||
rs.rn = 1;
|
s.deleted = false AND
|
||||||
|
(views >= 90000 AND views < 100000) OR
|
||||||
|
(views >= 900000 AND views < 1000000) OR
|
||||||
|
(views >= 9900000 AND views < 10000000)
|
||||||
`);
|
`);
|
||||||
return queryResult.rows.map((row) => {
|
return queryResult.rows.map((row) => {
|
||||||
return {
|
return {
|
||||||
@ -39,160 +21,19 @@ export async function getVideosNearMilestone(client: Client) {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function getUnsnapshotedSongs(client: Client) {
|
export async function getLatestVideoSnapshot(client: Client, aid: number): Promise<null | LatestSnapshotType> {
|
||||||
const queryResult = await client.queryObject<{ aid: bigint }>(`
|
const queryResult = await client.queryObject<LatestSnapshotType>(`
|
||||||
SELECT DISTINCT s.aid
|
SELECT *
|
||||||
FROM songs s
|
FROM latest_video_snapshot
|
||||||
LEFT JOIN video_snapshot v ON s.aid = v.aid
|
|
||||||
WHERE v.aid IS NULL;
|
|
||||||
`);
|
|
||||||
return queryResult.rows.map((row) => Number(row.aid));
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function getSongSnapshotCount(client: Client, aid: number) {
|
|
||||||
const queryResult = await client.queryObject<{ count: number }>(
|
|
||||||
`
|
|
||||||
SELECT COUNT(*) AS count
|
|
||||||
FROM video_snapshot
|
|
||||||
WHERE aid = $1;
|
|
||||||
`,
|
|
||||||
[aid],
|
|
||||||
);
|
|
||||||
return queryResult.rows[0].count;
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function getShortTermEtaPrediction(client: Client, aid: number) {
|
|
||||||
const queryResult = await client.queryObject<{ eta: number }>(
|
|
||||||
`
|
|
||||||
WITH old_snapshot AS (
|
|
||||||
SELECT created_at, views
|
|
||||||
FROM video_snapshot
|
|
||||||
WHERE aid = $1 AND
|
|
||||||
NOW() - created_at > '20 min'
|
|
||||||
ORDER BY created_at DESC
|
|
||||||
LIMIT 1
|
|
||||||
),
|
|
||||||
new_snapshot AS (
|
|
||||||
SELECT created_at, views
|
|
||||||
FROM video_snapshot
|
|
||||||
WHERE aid = $1
|
WHERE aid = $1
|
||||||
ORDER BY created_at DESC
|
`, [aid]);
|
||||||
LIMIT 1
|
|
||||||
)
|
|
||||||
SELECT
|
|
||||||
CASE
|
|
||||||
WHEN n.views > 100000
|
|
||||||
THEN
|
|
||||||
(1000000 - n.views) -- Views remaining
|
|
||||||
/
|
|
||||||
(
|
|
||||||
(n.views - o.views) -- Views delta
|
|
||||||
/
|
|
||||||
(EXTRACT(EPOCH FROM (n.created_at - o.created_at)) + 0.001) -- Time delta in seconds
|
|
||||||
+ 0.001
|
|
||||||
) -- Increment per second
|
|
||||||
ELSE
|
|
||||||
(100000 - n.views) -- Views remaining
|
|
||||||
/
|
|
||||||
(
|
|
||||||
(n.views - o.views) -- Views delta
|
|
||||||
/
|
|
||||||
(EXTRACT(EPOCH FROM (n.created_at - o.created_at)) + 0.001) -- Time delta in seconds
|
|
||||||
+ 0.001
|
|
||||||
) -- Increment per second
|
|
||||||
END AS eta
|
|
||||||
FROM old_snapshot o, new_snapshot n;
|
|
||||||
`,
|
|
||||||
[aid],
|
|
||||||
);
|
|
||||||
if (queryResult.rows.length === 0) {
|
if (queryResult.rows.length === 0) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
return queryResult.rows[0].eta;
|
return queryResult.rows.map((row) => {
|
||||||
}
|
return {
|
||||||
|
...row,
|
||||||
export async function getIntervalFromLastSnapshotToNow(client: Client, aid: number) {
|
aid: Number(row.aid),
|
||||||
const queryResult = await client.queryObject<{ interval: number }>(
|
}
|
||||||
`
|
})[0];
|
||||||
SELECT EXTRACT(EPOCH FROM (NOW() - created_at)) AS interval
|
|
||||||
FROM video_snapshot
|
|
||||||
WHERE aid = $1
|
|
||||||
ORDER BY created_at DESC
|
|
||||||
LIMIT 1;
|
|
||||||
`,
|
|
||||||
[aid],
|
|
||||||
);
|
|
||||||
if (queryResult.rows.length === 0) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
return queryResult.rows[0].interval;
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function songEligibleForMilestoneSnapshot(client: Client, aid: number) {
|
|
||||||
const count = await getSongSnapshotCount(client, aid);
|
|
||||||
if (count < 2) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
const queryResult = await client.queryObject<
|
|
||||||
{ views1: number; created_at1: string; views2: number; created_at2: string }
|
|
||||||
>(
|
|
||||||
`
|
|
||||||
WITH latest_snapshot AS (
|
|
||||||
SELECT
|
|
||||||
aid,
|
|
||||||
views,
|
|
||||||
created_at
|
|
||||||
FROM video_snapshot
|
|
||||||
WHERE aid = $1
|
|
||||||
ORDER BY created_at DESC
|
|
||||||
LIMIT 1
|
|
||||||
),
|
|
||||||
pairs AS (
|
|
||||||
SELECT
|
|
||||||
a.views AS views1,
|
|
||||||
a.created_at AS created_at1,
|
|
||||||
b.views AS views2,
|
|
||||||
b.created_at AS created_at2,
|
|
||||||
(b.created_at - a.created_at) AS interval
|
|
||||||
FROM video_snapshot a
|
|
||||||
JOIN latest_snapshot b
|
|
||||||
ON a.aid = b.aid
|
|
||||||
AND a.created_at < b.created_at
|
|
||||||
)
|
|
||||||
SELECT
|
|
||||||
views1,
|
|
||||||
created_at1,
|
|
||||||
views2,
|
|
||||||
created_at2
|
|
||||||
FROM (
|
|
||||||
SELECT
|
|
||||||
*,
|
|
||||||
ROW_NUMBER() OVER (
|
|
||||||
ORDER BY
|
|
||||||
CASE WHEN interval <= INTERVAL '3 days' THEN 0 ELSE 1 END,
|
|
||||||
CASE WHEN interval <= INTERVAL '3 days' THEN -interval ELSE interval END
|
|
||||||
) AS rn
|
|
||||||
FROM pairs
|
|
||||||
) ranked
|
|
||||||
WHERE rn = 1;
|
|
||||||
`,
|
|
||||||
[aid],
|
|
||||||
);
|
|
||||||
if (queryResult.rows.length === 0) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
const recentViewsData = queryResult.rows[0];
|
|
||||||
const time1 = parseTimestampFromPsql(recentViewsData.created_at1);
|
|
||||||
const time2 = parseTimestampFromPsql(recentViewsData.created_at2);
|
|
||||||
const intervalSec = (time2 - time1) / SECOND;
|
|
||||||
const views1 = recentViewsData.views1;
|
|
||||||
const views2 = recentViewsData.views2;
|
|
||||||
const viewsDiff = views2 - views1;
|
|
||||||
if (viewsDiff == 0) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
const nextMilestone = views2 >= 100000 ? 1000000 : 100000;
|
|
||||||
const expectedViewsDiff = nextMilestone - views2;
|
|
||||||
const expectedIntervalSec = expectedViewsDiff / viewsDiff * intervalSec;
|
|
||||||
return expectedIntervalSec <= 3 * DAY;
|
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,8 @@
|
|||||||
import { DAY, MINUTE } from "$std/datetime/constants.ts";
|
import {DAY, HOUR, MINUTE} from "$std/datetime/constants.ts";
|
||||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
import {Client} from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
import { formatTimestampToPsql } from "lib/utils/formatTimestampToPostgre.ts";
|
import {formatTimestampToPsql} from "lib/utils/formatTimestampToPostgre.ts";
|
||||||
import { SnapshotScheduleType } from "./schema.d.ts";
|
import {SnapshotScheduleType} from "./schema.d.ts";
|
||||||
|
import logger from "../log/logger.ts";
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Returns true if the specified `aid` has at least one record with "pending" or "processing" status.
|
Returns true if the specified `aid` has at least one record with "pending" or "processing" status.
|
||||||
@ -14,6 +15,14 @@ export async function videoHasActiveSchedule(client: Client, aid: number) {
|
|||||||
return res.rows.length > 0;
|
return res.rows.length > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function videoHasProcessingSchedule(client: Client, aid: number) {
|
||||||
|
const res = await client.queryObject<{ status: string }>(
|
||||||
|
`SELECT status FROM snapshot_schedule WHERE aid = $1 AND status = 'processing'`,
|
||||||
|
[aid],
|
||||||
|
);
|
||||||
|
return res.rows.length > 0;
|
||||||
|
}
|
||||||
|
|
||||||
interface Snapshot {
|
interface Snapshot {
|
||||||
created_at: number;
|
created_at: number;
|
||||||
views: number;
|
views: number;
|
||||||
@ -43,6 +52,14 @@ export async function findClosestSnapshot(
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function hasAtLeast2Snapshots(client: Client, aid: number) {
|
||||||
|
const res = await client.queryObject<{ count: number }>(
|
||||||
|
`SELECT COUNT(*) FROM video_snapshot WHERE aid = $1`,
|
||||||
|
[aid],
|
||||||
|
);
|
||||||
|
return res.rows[0].count >= 2;
|
||||||
|
}
|
||||||
|
|
||||||
export async function getLatestSnapshot(client: Client, aid: number): Promise<Snapshot | null> {
|
export async function getLatestSnapshot(client: Client, aid: number): Promise<Snapshot | null> {
|
||||||
const res = await client.queryObject<{ created_at: string; views: number }>(
|
const res = await client.queryObject<{ created_at: string; views: number }>(
|
||||||
`SELECT created_at, views FROM video_snapshot WHERE aid = $1 ORDER BY created_at DESC LIMIT 1`,
|
`SELECT created_at, views FROM video_snapshot WHERE aid = $1 ORDER BY created_at DESC LIMIT 1`,
|
||||||
@ -81,10 +98,12 @@ export async function getSnapshotScheduleCountWithinRange(client: Client, start:
|
|||||||
* @param targetTime Scheduled time for snapshot. (Timestamp in milliseconds)
|
* @param targetTime Scheduled time for snapshot. (Timestamp in milliseconds)
|
||||||
*/
|
*/
|
||||||
export async function scheduleSnapshot(client: Client, aid: number, type: string, targetTime: number) {
|
export async function scheduleSnapshot(client: Client, aid: number, type: string, targetTime: number) {
|
||||||
const ajustedTime = await adjustSnapshotTime(client, new Date(targetTime));
|
const allowedCount = type === "milestone" ? 2000 : 800;
|
||||||
|
const adjustedTime = await adjustSnapshotTime(client, new Date(targetTime), allowedCount);
|
||||||
|
logger.log(`Scheduled snapshot for ${aid} at ${adjustedTime.toISOString()}`, "mq", "fn:scheduleSnapshot");
|
||||||
return client.queryObject(
|
return client.queryObject(
|
||||||
`INSERT INTO snapshot_schedule (aid, type, started_at) VALUES ($1, $2, $3)`,
|
`INSERT INTO snapshot_schedule (aid, type, started_at) VALUES ($1, $2, $3)`,
|
||||||
[aid, type, ajustedTime.toISOString()],
|
[aid, type, adjustedTime.toISOString()],
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -92,11 +111,13 @@ export async function scheduleSnapshot(client: Client, aid: number, type: string
|
|||||||
* Adjust the trigger time of the snapshot to ensure it does not exceed the frequency limit
|
* Adjust the trigger time of the snapshot to ensure it does not exceed the frequency limit
|
||||||
* @param client PostgreSQL client
|
* @param client PostgreSQL client
|
||||||
* @param expectedStartTime The expected snapshot time
|
* @param expectedStartTime The expected snapshot time
|
||||||
|
* @param allowedCounts The number of snapshots allowed in the 5-minutes windows.
|
||||||
* @returns The adjusted actual snapshot time
|
* @returns The adjusted actual snapshot time
|
||||||
*/
|
*/
|
||||||
export async function adjustSnapshotTime(
|
export async function adjustSnapshotTime(
|
||||||
client: Client,
|
client: Client,
|
||||||
expectedStartTime: Date,
|
expectedStartTime: Date,
|
||||||
|
allowedCounts: number = 2000
|
||||||
): Promise<Date> {
|
): Promise<Date> {
|
||||||
const findWindowQuery = `
|
const findWindowQuery = `
|
||||||
WITH windows AS (
|
WITH windows AS (
|
||||||
@ -112,29 +133,36 @@ export async function adjustSnapshotTime(
|
|||||||
AND s.started_at < w.window_start + INTERVAL '5 MINUTES'
|
AND s.started_at < w.window_start + INTERVAL '5 MINUTES'
|
||||||
AND s.status = 'pending'
|
AND s.status = 'pending'
|
||||||
GROUP BY w.window_start
|
GROUP BY w.window_start
|
||||||
HAVING COUNT(s.*) < 2000
|
HAVING COUNT(s.*) < ${allowedCounts}
|
||||||
ORDER BY w.window_start
|
ORDER BY w.window_start
|
||||||
LIMIT 1;
|
LIMIT 1;
|
||||||
`;
|
`;
|
||||||
for (let i = 0; i < 7; i++) {
|
const now = new Date();
|
||||||
const now = new Date(new Date().getTime() + 5 * MINUTE);
|
const targetTime = expectedStartTime.getTime();
|
||||||
const nowTruncated = truncateTo5MinInterval(now);
|
let start = new Date(targetTime - 2 * HOUR);
|
||||||
const currentWindowStart = truncateTo5MinInterval(expectedStartTime);
|
if (start.getTime() <= now.getTime()) {
|
||||||
const end = new Date(currentWindowStart.getTime() + 1 * DAY);
|
start = now;
|
||||||
|
}
|
||||||
|
const startTruncated = truncateTo5MinInterval(start);
|
||||||
|
const end = new Date(startTruncated.getTime() + 1 * DAY);
|
||||||
|
|
||||||
const windowResult = await client.queryObject<{ window_start: Date }>(
|
const windowResult = await client.queryObject<{ window_start: Date }>(
|
||||||
findWindowQuery,
|
findWindowQuery,
|
||||||
[nowTruncated, end],
|
[startTruncated, end],
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
||||||
const windowStart = windowResult.rows[0]?.window_start;
|
const windowStart = windowResult.rows[0]?.window_start;
|
||||||
if (!windowStart) {
|
if (!windowStart) {
|
||||||
continue;
|
return expectedStartTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
return windowStart;
|
if (windowStart.getTime() > new Date().getTime() + 5 * MINUTE) {
|
||||||
}
|
const randomDelay = Math.floor(Math.random() * 5 * MINUTE);
|
||||||
|
return new Date(windowStart.getTime() + randomDelay);
|
||||||
|
} else {
|
||||||
return expectedStartTime;
|
return expectedStartTime;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -156,9 +184,31 @@ function truncateTo5MinInterval(timestamp: Date): Date {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export async function getSnapshotsInNextSecond(client: Client) {
|
export async function getSnapshotsInNextSecond(client: Client) {
|
||||||
const res = await client.queryObject<SnapshotScheduleType>(
|
const query = `
|
||||||
`SELECT * FROM cvsa.public.snapshot_schedule WHERE started_at <= NOW() + INTERVAL '1 second'`,
|
SELECT *
|
||||||
[],
|
FROM snapshot_schedule
|
||||||
);
|
WHERE started_at
|
||||||
|
BETWEEN NOW() - INTERVAL '5 seconds'
|
||||||
|
AND NOW() + INTERVAL '1 seconds'
|
||||||
|
`;
|
||||||
|
const res = await client.queryObject<SnapshotScheduleType>(query, []);
|
||||||
return res.rows;
|
return res.rows;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function setSnapshotStatus(client: Client, id: number, status: string) {
|
||||||
|
return await client.queryObject(
|
||||||
|
`UPDATE snapshot_schedule SET status = $2 WHERE id = $1`,
|
||||||
|
[id, status],
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function getVideosWithoutActiveSnapshotSchedule(client: Client) {
|
||||||
|
const query: string = `
|
||||||
|
SELECT s.aid
|
||||||
|
FROM songs s
|
||||||
|
LEFT JOIN snapshot_schedule ss ON s.aid = ss.aid AND (ss.status = 'pending' OR ss.status = 'processing')
|
||||||
|
WHERE ss.aid IS NULL
|
||||||
|
`;
|
||||||
|
const res = await client.queryObject<{ aid: number }>(query, []);
|
||||||
|
return res.rows.map((r) => Number(r.aid));
|
||||||
|
}
|
||||||
|
@ -1,20 +1,33 @@
|
|||||||
import { Job } from "bullmq";
|
import { Job } from "bullmq";
|
||||||
import { db } from "lib/db/init.ts";
|
import { db } from "lib/db/init.ts";
|
||||||
import { getVideosNearMilestone } from "lib/db/snapshot.ts";
|
import {getLatestVideoSnapshot, getVideosNearMilestone} from "lib/db/snapshot.ts";
|
||||||
import {
|
import {
|
||||||
findClosestSnapshot,
|
findClosestSnapshot,
|
||||||
getLatestSnapshot,
|
getLatestSnapshot,
|
||||||
getSnapshotsInNextSecond,
|
getSnapshotsInNextSecond, getVideosWithoutActiveSnapshotSchedule,
|
||||||
|
hasAtLeast2Snapshots,
|
||||||
scheduleSnapshot,
|
scheduleSnapshot,
|
||||||
|
setSnapshotStatus,
|
||||||
videoHasActiveSchedule,
|
videoHasActiveSchedule,
|
||||||
|
videoHasProcessingSchedule,
|
||||||
} from "lib/db/snapshotSchedule.ts";
|
} from "lib/db/snapshotSchedule.ts";
|
||||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
import { HOUR, MINUTE } from "$std/datetime/constants.ts";
|
import { HOUR, MINUTE, SECOND } from "$std/datetime/constants.ts";
|
||||||
import logger from "lib/log/logger.ts";
|
import logger from "lib/log/logger.ts";
|
||||||
import { SnapshotQueue } from "lib/mq/index.ts";
|
import { SnapshotQueue } from "lib/mq/index.ts";
|
||||||
|
import { insertVideoSnapshot } from "lib/mq/task/getVideoStats.ts";
|
||||||
|
import { NetSchedulerError } from "lib/mq/scheduler.ts";
|
||||||
|
import { setBiliVideoStatus } from "lib/db/allData.ts";
|
||||||
|
import { truncate } from "lib/utils/truncate.ts";
|
||||||
|
|
||||||
const priorityMap: { [key: string]: number } = {
|
const priorityMap: { [key: string]: number } = {
|
||||||
"milestone": 1,
|
"milestone": 1,
|
||||||
|
"normal": 3,
|
||||||
|
};
|
||||||
|
|
||||||
|
const snapshotTypeToTaskMap: { [key: string]: string } = {
|
||||||
|
"milestone": "snapshotMilestoneVideo",
|
||||||
|
"normal": "snapshotVideo",
|
||||||
};
|
};
|
||||||
|
|
||||||
export const snapshotTickWorker = async (_job: Job) => {
|
export const snapshotTickWorker = async (_job: Job) => {
|
||||||
@ -23,10 +36,18 @@ export const snapshotTickWorker = async (_job: Job) => {
|
|||||||
const schedules = await getSnapshotsInNextSecond(client);
|
const schedules = await getSnapshotsInNextSecond(client);
|
||||||
for (const schedule of schedules) {
|
for (const schedule of schedules) {
|
||||||
let priority = 3;
|
let priority = 3;
|
||||||
if (schedule.type && priorityMap[schedule.type])
|
if (schedule.type && priorityMap[schedule.type]) {
|
||||||
priority = priorityMap[schedule.type];
|
priority = priorityMap[schedule.type];
|
||||||
await SnapshotQueue.add("snapshotVideo", { aid: schedule.aid, priority });
|
|
||||||
}
|
}
|
||||||
|
const aid = Number(schedule.aid);
|
||||||
|
await SnapshotQueue.add("snapshotVideo", {
|
||||||
|
aid: aid,
|
||||||
|
id: Number(schedule.id),
|
||||||
|
type: schedule.type ?? "normal",
|
||||||
|
}, { priority });
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
logger.error(e as Error);
|
||||||
} finally {
|
} finally {
|
||||||
client.release();
|
client.release();
|
||||||
}
|
}
|
||||||
@ -50,9 +71,11 @@ const getAdjustedShortTermETA = async (client: Client, aid: number) => {
|
|||||||
const latestSnapshot = await getLatestSnapshot(client, aid);
|
const latestSnapshot = await getLatestSnapshot(client, aid);
|
||||||
// Immediately dispatch a snapshot if there is no snapshot yet
|
// Immediately dispatch a snapshot if there is no snapshot yet
|
||||||
if (!latestSnapshot) return 0;
|
if (!latestSnapshot) return 0;
|
||||||
|
const snapshotsEnough = await hasAtLeast2Snapshots(client, aid);
|
||||||
|
if (!snapshotsEnough) return 0;
|
||||||
|
|
||||||
const currentTimestamp = Date.now();
|
const currentTimestamp = new Date().getTime();
|
||||||
const timeIntervals = [20 * MINUTE, 1 * HOUR, 3 * HOUR, 6 * HOUR];
|
const timeIntervals = [3 * MINUTE, 20 * MINUTE, 1 * HOUR, 3 * HOUR, 6 * HOUR];
|
||||||
const DELTA = 0.00001;
|
const DELTA = 0.00001;
|
||||||
let minETAHours = Infinity;
|
let minETAHours = Infinity;
|
||||||
|
|
||||||
@ -60,13 +83,15 @@ const getAdjustedShortTermETA = async (client: Client, aid: number) => {
|
|||||||
const date = new Date(currentTimestamp - timeInterval);
|
const date = new Date(currentTimestamp - timeInterval);
|
||||||
const snapshot = await findClosestSnapshot(client, aid, date);
|
const snapshot = await findClosestSnapshot(client, aid, date);
|
||||||
if (!snapshot) continue;
|
if (!snapshot) continue;
|
||||||
const hoursDiff = (currentTimestamp - snapshot.created_at) / HOUR;
|
const hoursDiff = (latestSnapshot.created_at - snapshot.created_at) / HOUR;
|
||||||
const viewsDiff = snapshot.views - latestSnapshot.views;
|
const viewsDiff = latestSnapshot.views - snapshot.views;
|
||||||
|
if (viewsDiff <= 0) continue;
|
||||||
const speed = viewsDiff / (hoursDiff + DELTA);
|
const speed = viewsDiff / (hoursDiff + DELTA);
|
||||||
const target = closetMilestone(latestSnapshot.views);
|
const target = closetMilestone(latestSnapshot.views);
|
||||||
const viewsToIncrease = target - latestSnapshot.views;
|
const viewsToIncrease = target - latestSnapshot.views;
|
||||||
const eta = viewsToIncrease / (speed + DELTA);
|
const eta = viewsToIncrease / (speed + DELTA);
|
||||||
const factor = log(2.97 / log(viewsToIncrease + 1), 1.14);
|
let factor = log(2.97 / log(viewsToIncrease + 1), 1.14);
|
||||||
|
factor = truncate(factor, 3, 100);
|
||||||
const adjustedETA = eta / factor;
|
const adjustedETA = eta / factor;
|
||||||
if (adjustedETA < minETAHours) {
|
if (adjustedETA < minETAHours) {
|
||||||
minETAHours = adjustedETA;
|
minETAHours = adjustedETA;
|
||||||
@ -80,12 +105,17 @@ export const collectMilestoneSnapshotsWorker = async (_job: Job) => {
|
|||||||
try {
|
try {
|
||||||
const videos = await getVideosNearMilestone(client);
|
const videos = await getVideosNearMilestone(client);
|
||||||
for (const video of videos) {
|
for (const video of videos) {
|
||||||
if (await videoHasActiveSchedule(client, video.aid)) continue;
|
const aid = Number(video.aid);
|
||||||
const eta = await getAdjustedShortTermETA(client, video.aid);
|
if (await videoHasActiveSchedule(client, aid)) continue;
|
||||||
|
const eta = await getAdjustedShortTermETA(client, aid);
|
||||||
if (eta > 72) continue;
|
if (eta > 72) continue;
|
||||||
const now = Date.now();
|
const now = Date.now();
|
||||||
const targetTime = now + eta * HOUR;
|
const scheduledNextSnapshotDelay = eta * HOUR;
|
||||||
await scheduleSnapshot(client, video.aid, "milestone", targetTime);
|
const maxInterval = 4 * HOUR;
|
||||||
|
const minInterval = 1 * SECOND;
|
||||||
|
const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval);
|
||||||
|
const targetTime = now + delay;
|
||||||
|
await scheduleSnapshot(client, aid, "milestone", targetTime);
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
logger.error(e as Error, "mq", "fn:collectMilestoneSnapshotsWorker");
|
logger.error(e as Error, "mq", "fn:collectMilestoneSnapshotsWorker");
|
||||||
@ -94,6 +124,70 @@ export const collectMilestoneSnapshotsWorker = async (_job: Job) => {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
export const takeSnapshotForVideoWorker = async (_job: Job) => {
|
export const regularSnapshotsWorker = async (_job: Job) => {
|
||||||
// TODO: implement
|
const client = await db.connect();
|
||||||
|
try {
|
||||||
|
const aids = await getVideosWithoutActiveSnapshotSchedule(client);
|
||||||
|
for (const rawAid of aids) {
|
||||||
|
const aid = Number(rawAid);
|
||||||
|
if (await videoHasActiveSchedule(client, aid)) continue;
|
||||||
|
const latestSnapshot = await getLatestVideoSnapshot(client, aid);
|
||||||
|
const now = Date.now();
|
||||||
|
const lastSnapshotedAt = latestSnapshot?.time ?? now;
|
||||||
|
const targetTime = truncate(lastSnapshotedAt + 24 * HOUR, now + 1, Infinity);
|
||||||
|
await scheduleSnapshot(client, aid, "normal", targetTime);
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
logger.error(e as Error, "mq", "fn:regularSnapshotsWorker");
|
||||||
|
} finally {
|
||||||
|
client.release();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
export const takeSnapshotForVideoWorker = async (job: Job) => {
|
||||||
|
const id = job.data.id;
|
||||||
|
const aid = Number(job.data.aid);
|
||||||
|
const type = job.data.type;
|
||||||
|
const task = snapshotTypeToTaskMap[type] ?? "snapshotVideo";
|
||||||
|
const client = await db.connect();
|
||||||
|
const retryInterval = type === "milestone" ? 5 * SECOND : 2 * MINUTE;
|
||||||
|
try {
|
||||||
|
if (await videoHasProcessingSchedule(client, aid)) {
|
||||||
|
return `ALREADY_PROCESSING`;
|
||||||
|
}
|
||||||
|
await setSnapshotStatus(client, id, "processing");
|
||||||
|
const stat = await insertVideoSnapshot(client, aid, task);
|
||||||
|
if (typeof stat === "number") {
|
||||||
|
await setBiliVideoStatus(client, aid, stat);
|
||||||
|
await setSnapshotStatus(client, id, "completed");
|
||||||
|
return `BILI_STATUS_${stat}`;
|
||||||
|
}
|
||||||
|
await setSnapshotStatus(client, id, "completed");
|
||||||
|
if (type === "normal") {
|
||||||
|
await scheduleSnapshot(client, aid, type, Date.now() + 24 * HOUR);
|
||||||
|
return `DONE`;
|
||||||
|
}
|
||||||
|
if (type !== "milestone") return `DONE`;
|
||||||
|
const eta = await getAdjustedShortTermETA(client, aid);
|
||||||
|
if (eta > 72) return "ETA_TOO_LONG";
|
||||||
|
const now = Date.now();
|
||||||
|
const targetTime = now + eta * HOUR;
|
||||||
|
await scheduleSnapshot(client, aid, type, targetTime);
|
||||||
|
return `DONE`;
|
||||||
|
} catch (e) {
|
||||||
|
if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") {
|
||||||
|
logger.warn(
|
||||||
|
`No available proxy for aid ${job.data.aid}.`,
|
||||||
|
"mq",
|
||||||
|
"fn:takeSnapshotForVideoWorker",
|
||||||
|
);
|
||||||
|
await setSnapshotStatus(client, id, "completed");
|
||||||
|
await scheduleSnapshot(client, aid, type, Date.now() + retryInterval);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
logger.error(e as Error, "mq", "fn:takeSnapshotForVideoWorker");
|
||||||
|
await setSnapshotStatus(client, id, "failed");
|
||||||
|
} finally {
|
||||||
|
client.release();
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
@ -18,11 +18,22 @@ export async function initMQ() {
|
|||||||
await SnapshotQueue.upsertJobScheduler("snapshotTick", {
|
await SnapshotQueue.upsertJobScheduler("snapshotTick", {
|
||||||
every: 1 * SECOND,
|
every: 1 * SECOND,
|
||||||
immediately: true,
|
immediately: true,
|
||||||
|
}, {
|
||||||
|
opts: {
|
||||||
|
removeOnComplete: 1,
|
||||||
|
removeOnFail: 1,
|
||||||
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
await SnapshotQueue.upsertJobScheduler("collectMilestoneSnapshots", {
|
await SnapshotQueue.upsertJobScheduler("collectMilestoneSnapshots", {
|
||||||
every: 5 * MINUTE,
|
every: 5 * MINUTE,
|
||||||
immediately: true,
|
immediately: true,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
await SnapshotQueue.upsertJobScheduler("dispatchRegularSnapshots", {
|
||||||
|
every: 30 * MINUTE,
|
||||||
|
immediately: true,
|
||||||
|
});
|
||||||
|
|
||||||
logger.log("Message queue initialized.");
|
logger.log("Message queue initialized.");
|
||||||
}
|
}
|
||||||
|
@ -21,7 +21,7 @@ interface ProxiesMap {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type NetSchedulerErrorCode =
|
type NetSchedulerErrorCode =
|
||||||
| "NO_AVAILABLE_PROXY"
|
| "NO_PROXY_AVAILABLE"
|
||||||
| "PROXY_RATE_LIMITED"
|
| "PROXY_RATE_LIMITED"
|
||||||
| "PROXY_NOT_FOUND"
|
| "PROXY_NOT_FOUND"
|
||||||
| "FETCH_ERROR"
|
| "FETCH_ERROR"
|
||||||
@ -143,10 +143,10 @@ class NetScheduler {
|
|||||||
* @param {string} method - The HTTP method to use for the request. Default is "GET".
|
* @param {string} method - The HTTP method to use for the request. Default is "GET".
|
||||||
* @returns {Promise<any>} - A promise that resolves to the response body.
|
* @returns {Promise<any>} - A promise that resolves to the response body.
|
||||||
* @throws {NetSchedulerError} - The error will be thrown in following cases:
|
* @throws {NetSchedulerError} - The error will be thrown in following cases:
|
||||||
* - No available proxy currently: with error code NO_AVAILABLE_PROXY
|
* - No proxy is available currently: with error code `NO_PROXY_AVAILABLE`
|
||||||
* - Proxy is under rate limit: with error code PROXY_RATE_LIMITED
|
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
|
||||||
* - The native `fetch` function threw an error: with error code FETCH_ERROR
|
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
|
||||||
* - The proxy type is not supported: with error code NOT_IMPLEMENTED
|
* - The proxy type is not supported: with error code `NOT_IMPLEMENTED`
|
||||||
*/
|
*/
|
||||||
async request<R>(url: string, task: string, method: string = "GET"): Promise<R> {
|
async request<R>(url: string, task: string, method: string = "GET"): Promise<R> {
|
||||||
// find a available proxy
|
// find a available proxy
|
||||||
@ -156,7 +156,7 @@ class NetScheduler {
|
|||||||
return await this.proxyRequest<R>(url, proxyName, task, method);
|
return await this.proxyRequest<R>(url, proxyName, task, method);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
throw new NetSchedulerError("No available proxy currently.", "NO_AVAILABLE_PROXY");
|
throw new NetSchedulerError("No proxy is available currently.", "NO_PROXY_AVAILABLE");
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -168,10 +168,11 @@ class NetScheduler {
|
|||||||
* @param {boolean} force - If true, the request will be made even if the proxy is rate limited. Default is false.
|
* @param {boolean} force - If true, the request will be made even if the proxy is rate limited. Default is false.
|
||||||
* @returns {Promise<any>} - A promise that resolves to the response body.
|
* @returns {Promise<any>} - A promise that resolves to the response body.
|
||||||
* @throws {NetSchedulerError} - The error will be thrown in following cases:
|
* @throws {NetSchedulerError} - The error will be thrown in following cases:
|
||||||
* - Proxy not found: with error code PROXY_NOT_FOUND
|
* - Proxy not found: with error code `PROXY_NOT_FOUND`
|
||||||
* - Proxy is under rate limit: with error code PROXY_RATE_LIMITED
|
* - Proxy is under rate limit: with error code `PROXY_RATE_LIMITED`
|
||||||
* - The native `fetch` function threw an error: with error code FETCH_ERROR
|
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
|
||||||
* - The proxy type is not supported: with error code NOT_IMPLEMENTED
|
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
|
||||||
|
* - The proxy type is not supported: with error code `NOT_IMPLEMENTED`
|
||||||
*/
|
*/
|
||||||
async proxyRequest<R>(
|
async proxyRequest<R>(
|
||||||
url: string,
|
url: string,
|
||||||
@ -255,8 +256,6 @@ class NetScheduler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private async alicloudFcRequest<R>(url: string, region: string): Promise<R> {
|
private async alicloudFcRequest<R>(url: string, region: string): Promise<R> {
|
||||||
let rawOutput: null | Uint8Array = null;
|
|
||||||
let rawErr: null | Uint8Array = null;
|
|
||||||
try {
|
try {
|
||||||
const decoder = new TextDecoder();
|
const decoder = new TextDecoder();
|
||||||
const output = await new Deno.Command("aliyun", {
|
const output = await new Deno.Command("aliyun", {
|
||||||
@ -280,19 +279,9 @@ class NetScheduler {
|
|||||||
`CVSA-${region}`,
|
`CVSA-${region}`,
|
||||||
],
|
],
|
||||||
}).output();
|
}).output();
|
||||||
rawOutput = output.stdout;
|
|
||||||
rawErr = output.stderr;
|
|
||||||
const out = decoder.decode(output.stdout);
|
const out = decoder.decode(output.stdout);
|
||||||
const rawData = JSON.parse(out);
|
const rawData = JSON.parse(out);
|
||||||
if (rawData.statusCode !== 200) {
|
if (rawData.statusCode !== 200) {
|
||||||
const fileId = randomUUID();
|
|
||||||
await Deno.writeFile(`./logs/files/${fileId}.stdout`, output.stdout);
|
|
||||||
await Deno.writeFile(`./logs/files/${fileId}.stderr`, output.stderr);
|
|
||||||
logger.log(
|
|
||||||
`Returned non-200 status code. Raw ouput saved to ./logs/files/${fileId}.stdout/stderr`,
|
|
||||||
"net",
|
|
||||||
"fn:alicloudFcRequest",
|
|
||||||
);
|
|
||||||
throw new NetSchedulerError(
|
throw new NetSchedulerError(
|
||||||
`Error proxying ${url} to ali-fc region ${region}, code: ${rawData.statusCode}.`,
|
`Error proxying ${url} to ali-fc region ${region}, code: ${rawData.statusCode}.`,
|
||||||
"ALICLOUD_PROXY_ERR",
|
"ALICLOUD_PROXY_ERR",
|
||||||
@ -301,16 +290,6 @@ class NetScheduler {
|
|||||||
return JSON.parse(JSON.parse(rawData.body)) as R;
|
return JSON.parse(JSON.parse(rawData.body)) as R;
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
if (rawOutput !== null || rawErr !== null) {
|
|
||||||
const fileId = randomUUID();
|
|
||||||
rawOutput && await Deno.writeFile(`./logs/files/${fileId}.stdout`, rawOutput);
|
|
||||||
rawErr && await Deno.writeFile(`./logs/files/${fileId}.stderr`, rawErr);
|
|
||||||
logger.log(
|
|
||||||
`Error occurred. Raw ouput saved to ./logs/files/${fileId}.stdout/stderr`,
|
|
||||||
"net",
|
|
||||||
"fn:alicloudFcRequest",
|
|
||||||
);
|
|
||||||
}
|
|
||||||
logger.error(e as Error, "net", "fn:alicloudFcRequest");
|
logger.error(e as Error, "net", "fn:alicloudFcRequest");
|
||||||
throw new NetSchedulerError(`Unhandled error: Cannot proxy ${url} to ali-fc.`, "ALICLOUD_PROXY_ERR", e);
|
throw new NetSchedulerError(`Unhandled error: Cannot proxy ${url} to ali-fc.`, "ALICLOUD_PROXY_ERR", e);
|
||||||
}
|
}
|
||||||
@ -369,7 +348,8 @@ Execution order for setup:
|
|||||||
- Call after addProxy and addTask. Configures rate limiters specifically for tasks and their associated proxies.
|
- Call after addProxy and addTask. Configures rate limiters specifically for tasks and their associated proxies.
|
||||||
- Depends on tasks and proxies being defined to apply limiters correctly.
|
- Depends on tasks and proxies being defined to apply limiters correctly.
|
||||||
4. setProviderLimiter(providerName, config):
|
4. setProviderLimiter(providerName, config):
|
||||||
- Call after addProxy and addTask. Sets rate limiters at the provider level, affecting all proxies used by tasks of that provider.
|
- Call after addProxy and addTask.
|
||||||
|
- It sets rate limiters at the provider level, affecting all proxies used by tasks of that provider.
|
||||||
- Depends on tasks and proxies being defined to identify which proxies to apply provider-level limiters to.
|
- Depends on tasks and proxies being defined to identify which proxies to apply provider-level limiters to.
|
||||||
|
|
||||||
In summary: addProxy -> addTask -> (setTaskLimiter and/or setProviderLimiter).
|
In summary: addProxy -> addTask -> (setTaskLimiter and/or setProviderLimiter).
|
||||||
|
@ -1,12 +1,27 @@
|
|||||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
import { getVideoInfo } from "lib/net/getVideoInfo.ts";
|
import { getVideoInfo } from "lib/net/getVideoInfo.ts";
|
||||||
|
import { LatestSnapshotType } from "lib/db/schema.d.ts";
|
||||||
|
|
||||||
export async function insertVideoStats(client: Client, aid: number, task: string) {
|
/*
|
||||||
|
* Fetch video stats from bilibili API and insert into database
|
||||||
|
* @returns {Promise<number|VideoSnapshot>}
|
||||||
|
* A number indicating the status code when receiving non-0 status code from bilibili,
|
||||||
|
* otherwise an VideoSnapshot object containing the video stats
|
||||||
|
* @throws {NetSchedulerError} - The error will be thrown in following cases:
|
||||||
|
* - No proxy is available currently: with error code `NO_PROXY_AVAILABLE`
|
||||||
|
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
|
||||||
|
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
|
||||||
|
*/
|
||||||
|
export async function insertVideoSnapshot(
|
||||||
|
client: Client,
|
||||||
|
aid: number,
|
||||||
|
task: string,
|
||||||
|
): Promise<number | LatestSnapshotType> {
|
||||||
const data = await getVideoInfo(aid, task);
|
const data = await getVideoInfo(aid, task);
|
||||||
const time = new Date().getTime();
|
|
||||||
if (typeof data == "number") {
|
if (typeof data == "number") {
|
||||||
return data;
|
return data;
|
||||||
}
|
}
|
||||||
|
const time = new Date().getTime();
|
||||||
const views = data.stat.view;
|
const views = data.stat.view;
|
||||||
const danmakus = data.stat.danmaku;
|
const danmakus = data.stat.danmaku;
|
||||||
const replies = data.stat.reply;
|
const replies = data.stat.reply;
|
||||||
@ -14,14 +29,17 @@ export async function insertVideoStats(client: Client, aid: number, task: string
|
|||||||
const coins = data.stat.coin;
|
const coins = data.stat.coin;
|
||||||
const shares = data.stat.share;
|
const shares = data.stat.share;
|
||||||
const favorites = data.stat.favorite;
|
const favorites = data.stat.favorite;
|
||||||
await client.queryObject(
|
|
||||||
`
|
const query: string = `
|
||||||
INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites)
|
INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites)
|
||||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||||
`,
|
`;
|
||||||
|
await client.queryObject(
|
||||||
|
query,
|
||||||
[aid, views, danmakus, replies, likes, coins, shares, favorites],
|
[aid, views, danmakus, replies, likes, coins, shares, favorites],
|
||||||
);
|
);
|
||||||
return {
|
|
||||||
|
const snapshot: LatestSnapshotType = {
|
||||||
aid,
|
aid,
|
||||||
views,
|
views,
|
||||||
danmakus,
|
danmakus,
|
||||||
@ -32,4 +50,6 @@ export async function insertVideoStats(client: Client, aid: number, task: string
|
|||||||
favorites,
|
favorites,
|
||||||
time,
|
time,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
return snapshot;
|
||||||
}
|
}
|
||||||
|
@ -2,6 +2,19 @@ import netScheduler from "lib/mq/scheduler.ts";
|
|||||||
import { VideoInfoData, VideoInfoResponse } from "lib/net/bilibili.d.ts";
|
import { VideoInfoData, VideoInfoResponse } from "lib/net/bilibili.d.ts";
|
||||||
import logger from "lib/log/logger.ts";
|
import logger from "lib/log/logger.ts";
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Fetch video metadata from bilibili API
|
||||||
|
* @param {number} aid - The video's aid
|
||||||
|
* @param {string} task - The task name used in scheduler. It can be one of the following:
|
||||||
|
* - snapshotVideo
|
||||||
|
* - getVideoInfo
|
||||||
|
* - snapshotMilestoneVideo
|
||||||
|
* @returns {Promise<VideoInfoData | number>} VideoInfoData or the error code returned by bilibili API
|
||||||
|
* @throws {NetSchedulerError} - The error will be thrown in following cases:
|
||||||
|
* - No proxy is available currently: with error code `NO_PROXY_AVAILABLE`
|
||||||
|
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
|
||||||
|
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
|
||||||
|
*/
|
||||||
export async function getVideoInfo(aid: number, task: string): Promise<VideoInfoData | number> {
|
export async function getVideoInfo(aid: number, task: string): Promise<VideoInfoData | number> {
|
||||||
const url = `https://api.bilibili.com/x/web-interface/view?aid=${aid}`;
|
const url = `https://api.bilibili.com/x/web-interface/view?aid=${aid}`;
|
||||||
const data = await netScheduler.request<VideoInfoResponse>(url, task);
|
const data = await netScheduler.request<VideoInfoResponse>(url, task);
|
||||||
|
@ -6,7 +6,7 @@ import { lockManager } from "lib/mq/lockManager.ts";
|
|||||||
import { WorkerError } from "lib/mq/schema.ts";
|
import { WorkerError } from "lib/mq/schema.ts";
|
||||||
import { getVideoInfoWorker } from "lib/mq/exec/getLatestVideos.ts";
|
import { getVideoInfoWorker } from "lib/mq/exec/getLatestVideos.ts";
|
||||||
import {
|
import {
|
||||||
collectMilestoneSnapshotsWorker,
|
collectMilestoneSnapshotsWorker, regularSnapshotsWorker,
|
||||||
snapshotTickWorker,
|
snapshotTickWorker,
|
||||||
takeSnapshotForVideoWorker,
|
takeSnapshotForVideoWorker,
|
||||||
} from "lib/mq/exec/snapshotTick.ts";
|
} from "lib/mq/exec/snapshotTick.ts";
|
||||||
@ -14,12 +14,14 @@ import {
|
|||||||
Deno.addSignalListener("SIGINT", async () => {
|
Deno.addSignalListener("SIGINT", async () => {
|
||||||
logger.log("SIGINT Received: Shutting down workers...", "mq");
|
logger.log("SIGINT Received: Shutting down workers...", "mq");
|
||||||
await latestVideoWorker.close(true);
|
await latestVideoWorker.close(true);
|
||||||
|
await snapshotWorker.close(true);
|
||||||
Deno.exit();
|
Deno.exit();
|
||||||
});
|
});
|
||||||
|
|
||||||
Deno.addSignalListener("SIGTERM", async () => {
|
Deno.addSignalListener("SIGTERM", async () => {
|
||||||
logger.log("SIGTERM Received: Shutting down workers...", "mq");
|
logger.log("SIGTERM Received: Shutting down workers...", "mq");
|
||||||
await latestVideoWorker.close(true);
|
await latestVideoWorker.close(true);
|
||||||
|
await snapshotWorker.close(true);
|
||||||
Deno.exit();
|
Deno.exit();
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -40,7 +42,12 @@ const latestVideoWorker = new Worker(
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
{ connection: redis as ConnectionOptions, concurrency: 6, removeOnComplete: { count: 1440 }, removeOnFail: { count: 0 } },
|
{
|
||||||
|
connection: redis as ConnectionOptions,
|
||||||
|
concurrency: 6,
|
||||||
|
removeOnComplete: { count: 1440 },
|
||||||
|
removeOnFail: { count: 0 },
|
||||||
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
latestVideoWorker.on("active", () => {
|
latestVideoWorker.on("active", () => {
|
||||||
@ -69,6 +76,9 @@ const snapshotWorker = new Worker(
|
|||||||
case "collectMilestoneSnapshots":
|
case "collectMilestoneSnapshots":
|
||||||
await collectMilestoneSnapshotsWorker(job);
|
await collectMilestoneSnapshotsWorker(job);
|
||||||
break;
|
break;
|
||||||
|
case "dispatchRegularSnapshots":
|
||||||
|
await regularSnapshotsWorker(job);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user