Compare commits

...

4 Commits

10 changed files with 309 additions and 271 deletions

View File

@ -68,3 +68,10 @@ export async function getUnArchivedBiliUsers(client: Client) {
const rows = queryResult.rows;
return rows.map((row) => row.uid);
}
export async function setBiliVideoStatus(client: Client, aid: number, status: number) {
return await client.queryObject(
`UPDATE bilibili_metadata SET status = $1 WHERE aid = $2`,
[status, aid],
);
}

12
lib/db/schema.d.ts vendored
View File

@ -32,6 +32,18 @@ export interface VideoSnapshotType {
replies: number;
}
export interface LatestSnapshotType {
aid: number;
time: number;
views: number;
danmakus: number;
replies: number;
likes: number;
coins: number;
shares: number;
favorites: number;
}
export interface SnapshotScheduleType {
id: number;
aid: number;

View File

@ -1,35 +1,17 @@
import { DAY, SECOND } from "$std/datetime/constants.ts";
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { VideoSnapshotType } from "lib/db/schema.d.ts";
import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts";
import { LatestSnapshotType } from "lib/db/schema.d.ts";
export async function getVideosNearMilestone(client: Client) {
const queryResult = await client.queryObject<VideoSnapshotType>(`
WITH filtered_snapshots AS (
SELECT
vs.*
FROM
video_snapshot vs
WHERE
(vs.views >= 90000 AND vs.views < 100000) OR
(vs.views >= 900000 AND vs.views < 1000000)
),
ranked_snapshots AS (
SELECT
fs.*,
ROW_NUMBER() OVER (PARTITION BY fs.aid ORDER BY fs.created_at DESC) as rn,
MAX(fs.views) OVER (PARTITION BY fs.aid) as max_views_per_aid
FROM
filtered_snapshots fs
INNER JOIN
songs s ON fs.aid = s.aid
)
SELECT
rs.id, rs.created_at, rs.views, rs.coins, rs.likes, rs.favorites, rs.shares, rs.danmakus, rs.aid, rs.replies
FROM
ranked_snapshots rs
WHERE
rs.rn = 1;
const queryResult = await client.queryObject<LatestSnapshotType>(`
SELECT ls.*
FROM latest_video_snapshot ls
INNER JOIN
songs s ON ls.aid = s.aid
WHERE
s.deleted = false AND
(views >= 90000 AND views < 100000) OR
(views >= 900000 AND views < 1000000) OR
(views >= 9900000 AND views < 10000000)
`);
return queryResult.rows.map((row) => {
return {
@ -39,160 +21,19 @@ export async function getVideosNearMilestone(client: Client) {
});
}
export async function getUnsnapshotedSongs(client: Client) {
const queryResult = await client.queryObject<{ aid: bigint }>(`
SELECT DISTINCT s.aid
FROM songs s
LEFT JOIN video_snapshot v ON s.aid = v.aid
WHERE v.aid IS NULL;
`);
return queryResult.rows.map((row) => Number(row.aid));
}
export async function getSongSnapshotCount(client: Client, aid: number) {
const queryResult = await client.queryObject<{ count: number }>(
`
SELECT COUNT(*) AS count
FROM video_snapshot
WHERE aid = $1;
`,
[aid],
);
return queryResult.rows[0].count;
}
export async function getShortTermEtaPrediction(client: Client, aid: number) {
const queryResult = await client.queryObject<{ eta: number }>(
`
WITH old_snapshot AS (
SELECT created_at, views
FROM video_snapshot
WHERE aid = $1 AND
NOW() - created_at > '20 min'
ORDER BY created_at DESC
LIMIT 1
),
new_snapshot AS (
SELECT created_at, views
FROM video_snapshot
WHERE aid = $1
ORDER BY created_at DESC
LIMIT 1
)
SELECT
CASE
WHEN n.views > 100000
THEN
(1000000 - n.views) -- Views remaining
/
(
(n.views - o.views) -- Views delta
/
(EXTRACT(EPOCH FROM (n.created_at - o.created_at)) + 0.001) -- Time delta in seconds
+ 0.001
) -- Increment per second
ELSE
(100000 - n.views) -- Views remaining
/
(
(n.views - o.views) -- Views delta
/
(EXTRACT(EPOCH FROM (n.created_at - o.created_at)) + 0.001) -- Time delta in seconds
+ 0.001
) -- Increment per second
END AS eta
FROM old_snapshot o, new_snapshot n;
`,
[aid],
);
export async function getLatestVideoSnapshot(client: Client, aid: number): Promise<null | LatestSnapshotType> {
const queryResult = await client.queryObject<LatestSnapshotType>(`
SELECT *
FROM latest_video_snapshot
WHERE aid = $1
`, [aid]);
if (queryResult.rows.length === 0) {
return null;
}
return queryResult.rows[0].eta;
}
export async function getIntervalFromLastSnapshotToNow(client: Client, aid: number) {
const queryResult = await client.queryObject<{ interval: number }>(
`
SELECT EXTRACT(EPOCH FROM (NOW() - created_at)) AS interval
FROM video_snapshot
WHERE aid = $1
ORDER BY created_at DESC
LIMIT 1;
`,
[aid],
);
if (queryResult.rows.length === 0) {
return null;
}
return queryResult.rows[0].interval;
}
export async function songEligibleForMilestoneSnapshot(client: Client, aid: number) {
const count = await getSongSnapshotCount(client, aid);
if (count < 2) {
return true;
}
const queryResult = await client.queryObject<
{ views1: number; created_at1: string; views2: number; created_at2: string }
>(
`
WITH latest_snapshot AS (
SELECT
aid,
views,
created_at
FROM video_snapshot
WHERE aid = $1
ORDER BY created_at DESC
LIMIT 1
),
pairs AS (
SELECT
a.views AS views1,
a.created_at AS created_at1,
b.views AS views2,
b.created_at AS created_at2,
(b.created_at - a.created_at) AS interval
FROM video_snapshot a
JOIN latest_snapshot b
ON a.aid = b.aid
AND a.created_at < b.created_at
)
SELECT
views1,
created_at1,
views2,
created_at2
FROM (
SELECT
*,
ROW_NUMBER() OVER (
ORDER BY
CASE WHEN interval <= INTERVAL '3 days' THEN 0 ELSE 1 END,
CASE WHEN interval <= INTERVAL '3 days' THEN -interval ELSE interval END
) AS rn
FROM pairs
) ranked
WHERE rn = 1;
`,
[aid],
);
if (queryResult.rows.length === 0) {
return true;
}
const recentViewsData = queryResult.rows[0];
const time1 = parseTimestampFromPsql(recentViewsData.created_at1);
const time2 = parseTimestampFromPsql(recentViewsData.created_at2);
const intervalSec = (time2 - time1) / SECOND;
const views1 = recentViewsData.views1;
const views2 = recentViewsData.views2;
const viewsDiff = views2 - views1;
if (viewsDiff == 0) {
return false;
}
const nextMilestone = views2 >= 100000 ? 1000000 : 100000;
const expectedViewsDiff = nextMilestone - views2;
const expectedIntervalSec = expectedViewsDiff / viewsDiff * intervalSec;
return expectedIntervalSec <= 3 * DAY;
return queryResult.rows.map((row) => {
return {
...row,
aid: Number(row.aid),
}
})[0];
}

View File

@ -1,7 +1,8 @@
import { DAY, MINUTE } from "$std/datetime/constants.ts";
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { formatTimestampToPsql } from "lib/utils/formatTimestampToPostgre.ts";
import { SnapshotScheduleType } from "./schema.d.ts";
import {DAY, HOUR, MINUTE} from "$std/datetime/constants.ts";
import {Client} from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import {formatTimestampToPsql} from "lib/utils/formatTimestampToPostgre.ts";
import {SnapshotScheduleType} from "./schema.d.ts";
import logger from "../log/logger.ts";
/*
Returns true if the specified `aid` has at least one record with "pending" or "processing" status.
@ -14,6 +15,14 @@ export async function videoHasActiveSchedule(client: Client, aid: number) {
return res.rows.length > 0;
}
export async function videoHasProcessingSchedule(client: Client, aid: number) {
const res = await client.queryObject<{ status: string }>(
`SELECT status FROM snapshot_schedule WHERE aid = $1 AND status = 'processing'`,
[aid],
);
return res.rows.length > 0;
}
interface Snapshot {
created_at: number;
views: number;
@ -43,6 +52,14 @@ export async function findClosestSnapshot(
};
}
export async function hasAtLeast2Snapshots(client: Client, aid: number) {
const res = await client.queryObject<{ count: number }>(
`SELECT COUNT(*) FROM video_snapshot WHERE aid = $1`,
[aid],
);
return res.rows[0].count >= 2;
}
export async function getLatestSnapshot(client: Client, aid: number): Promise<Snapshot | null> {
const res = await client.queryObject<{ created_at: string; views: number }>(
`SELECT created_at, views FROM video_snapshot WHERE aid = $1 ORDER BY created_at DESC LIMIT 1`,
@ -81,10 +98,12 @@ export async function getSnapshotScheduleCountWithinRange(client: Client, start:
* @param targetTime Scheduled time for snapshot. (Timestamp in milliseconds)
*/
export async function scheduleSnapshot(client: Client, aid: number, type: string, targetTime: number) {
const ajustedTime = await adjustSnapshotTime(client, new Date(targetTime));
const allowedCount = type === "milestone" ? 2000 : 800;
const adjustedTime = await adjustSnapshotTime(client, new Date(targetTime), allowedCount);
logger.log(`Scheduled snapshot for ${aid} at ${adjustedTime.toISOString()}`, "mq", "fn:scheduleSnapshot");
return client.queryObject(
`INSERT INTO snapshot_schedule (aid, type, started_at) VALUES ($1, $2, $3)`,
[aid, type, ajustedTime.toISOString()],
[aid, type, adjustedTime.toISOString()],
);
}
@ -92,18 +111,20 @@ export async function scheduleSnapshot(client: Client, aid: number, type: string
* Adjust the trigger time of the snapshot to ensure it does not exceed the frequency limit
* @param client PostgreSQL client
* @param expectedStartTime The expected snapshot time
* @param allowedCounts The number of snapshots allowed in the 5-minutes windows.
* @returns The adjusted actual snapshot time
*/
export async function adjustSnapshotTime(
client: Client,
expectedStartTime: Date,
allowedCounts: number = 2000
): Promise<Date> {
const findWindowQuery = `
WITH windows AS (
SELECT generate_series(
$1::timestamp, -- Start time: current time truncated to the nearest 5-minute window
$2::timestamp, -- End time: 24 hours after the target time window starts
INTERVAL '5 MINUTES'
$1::timestamp, -- Start time: current time truncated to the nearest 5-minute window
$2::timestamp, -- End time: 24 hours after the target time window starts
INTERVAL '5 MINUTES'
) AS window_start
)
SELECT w.window_start
@ -112,29 +133,36 @@ export async function adjustSnapshotTime(
AND s.started_at < w.window_start + INTERVAL '5 MINUTES'
AND s.status = 'pending'
GROUP BY w.window_start
HAVING COUNT(s.*) < 2000
HAVING COUNT(s.*) < ${allowedCounts}
ORDER BY w.window_start
LIMIT 1;
`;
for (let i = 0; i < 7; i++) {
const now = new Date(new Date().getTime() + 5 * MINUTE);
const nowTruncated = truncateTo5MinInterval(now);
const currentWindowStart = truncateTo5MinInterval(expectedStartTime);
const end = new Date(currentWindowStart.getTime() + 1 * DAY);
const windowResult = await client.queryObject<{ window_start: Date }>(
findWindowQuery,
[nowTruncated, end],
);
const windowStart = windowResult.rows[0]?.window_start;
if (!windowStart) {
continue;
}
return windowStart;
const now = new Date();
const targetTime = expectedStartTime.getTime();
let start = new Date(targetTime - 2 * HOUR);
if (start.getTime() <= now.getTime()) {
start = now;
}
const startTruncated = truncateTo5MinInterval(start);
const end = new Date(startTruncated.getTime() + 1 * DAY);
const windowResult = await client.queryObject<{ window_start: Date }>(
findWindowQuery,
[startTruncated, end],
);
const windowStart = windowResult.rows[0]?.window_start;
if (!windowStart) {
return expectedStartTime;
}
if (windowStart.getTime() > new Date().getTime() + 5 * MINUTE) {
const randomDelay = Math.floor(Math.random() * 5 * MINUTE);
return new Date(windowStart.getTime() + randomDelay);
} else {
return expectedStartTime;
}
return expectedStartTime;
}
/**
@ -156,9 +184,31 @@ function truncateTo5MinInterval(timestamp: Date): Date {
}
export async function getSnapshotsInNextSecond(client: Client) {
const res = await client.queryObject<SnapshotScheduleType>(
`SELECT * FROM cvsa.public.snapshot_schedule WHERE started_at <= NOW() + INTERVAL '1 second'`,
[],
);
const query = `
SELECT *
FROM snapshot_schedule
WHERE started_at
BETWEEN NOW() - INTERVAL '5 seconds'
AND NOW() + INTERVAL '1 seconds'
`;
const res = await client.queryObject<SnapshotScheduleType>(query, []);
return res.rows;
}
export async function setSnapshotStatus(client: Client, id: number, status: string) {
return await client.queryObject(
`UPDATE snapshot_schedule SET status = $2 WHERE id = $1`,
[id, status],
);
}
export async function getVideosWithoutActiveSnapshotSchedule(client: Client) {
const query: string = `
SELECT s.aid
FROM songs s
LEFT JOIN snapshot_schedule ss ON s.aid = ss.aid AND (ss.status = 'pending' OR ss.status = 'processing')
WHERE ss.aid IS NULL
`;
const res = await client.queryObject<{ aid: number }>(query, []);
return res.rows.map((r) => Number(r.aid));
}

View File

@ -1,20 +1,33 @@
import { Job } from "bullmq";
import { db } from "lib/db/init.ts";
import { getVideosNearMilestone } from "lib/db/snapshot.ts";
import {getLatestVideoSnapshot, getVideosNearMilestone} from "lib/db/snapshot.ts";
import {
findClosestSnapshot,
getLatestSnapshot,
getSnapshotsInNextSecond,
getSnapshotsInNextSecond, getVideosWithoutActiveSnapshotSchedule,
hasAtLeast2Snapshots,
scheduleSnapshot,
setSnapshotStatus,
videoHasActiveSchedule,
videoHasProcessingSchedule,
} from "lib/db/snapshotSchedule.ts";
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { HOUR, MINUTE } from "$std/datetime/constants.ts";
import { HOUR, MINUTE, SECOND } from "$std/datetime/constants.ts";
import logger from "lib/log/logger.ts";
import { SnapshotQueue } from "lib/mq/index.ts";
import { insertVideoSnapshot } from "lib/mq/task/getVideoStats.ts";
import { NetSchedulerError } from "lib/mq/scheduler.ts";
import { setBiliVideoStatus } from "lib/db/allData.ts";
import { truncate } from "lib/utils/truncate.ts";
const priorityMap: { [key: string]: number } = {
"milestone": 1,
"normal": 3,
};
const snapshotTypeToTaskMap: { [key: string]: string } = {
"milestone": "snapshotMilestoneVideo",
"normal": "snapshotVideo",
};
export const snapshotTickWorker = async (_job: Job) => {
@ -23,10 +36,18 @@ export const snapshotTickWorker = async (_job: Job) => {
const schedules = await getSnapshotsInNextSecond(client);
for (const schedule of schedules) {
let priority = 3;
if (schedule.type && priorityMap[schedule.type])
if (schedule.type && priorityMap[schedule.type]) {
priority = priorityMap[schedule.type];
await SnapshotQueue.add("snapshotVideo", { aid: schedule.aid, priority });
}
const aid = Number(schedule.aid);
await SnapshotQueue.add("snapshotVideo", {
aid: aid,
id: Number(schedule.id),
type: schedule.type ?? "normal",
}, { priority });
}
} catch (e) {
logger.error(e as Error);
} finally {
client.release();
}
@ -50,9 +71,11 @@ const getAdjustedShortTermETA = async (client: Client, aid: number) => {
const latestSnapshot = await getLatestSnapshot(client, aid);
// Immediately dispatch a snapshot if there is no snapshot yet
if (!latestSnapshot) return 0;
const snapshotsEnough = await hasAtLeast2Snapshots(client, aid);
if (!snapshotsEnough) return 0;
const currentTimestamp = Date.now();
const timeIntervals = [20 * MINUTE, 1 * HOUR, 3 * HOUR, 6 * HOUR];
const currentTimestamp = new Date().getTime();
const timeIntervals = [3 * MINUTE, 20 * MINUTE, 1 * HOUR, 3 * HOUR, 6 * HOUR];
const DELTA = 0.00001;
let minETAHours = Infinity;
@ -60,13 +83,15 @@ const getAdjustedShortTermETA = async (client: Client, aid: number) => {
const date = new Date(currentTimestamp - timeInterval);
const snapshot = await findClosestSnapshot(client, aid, date);
if (!snapshot) continue;
const hoursDiff = (currentTimestamp - snapshot.created_at) / HOUR;
const viewsDiff = snapshot.views - latestSnapshot.views;
const hoursDiff = (latestSnapshot.created_at - snapshot.created_at) / HOUR;
const viewsDiff = latestSnapshot.views - snapshot.views;
if (viewsDiff <= 0) continue;
const speed = viewsDiff / (hoursDiff + DELTA);
const target = closetMilestone(latestSnapshot.views);
const viewsToIncrease = target - latestSnapshot.views;
const eta = viewsToIncrease / (speed + DELTA);
const factor = log(2.97 / log(viewsToIncrease + 1), 1.14);
let factor = log(2.97 / log(viewsToIncrease + 1), 1.14);
factor = truncate(factor, 3, 100);
const adjustedETA = eta / factor;
if (adjustedETA < minETAHours) {
minETAHours = adjustedETA;
@ -80,12 +105,17 @@ export const collectMilestoneSnapshotsWorker = async (_job: Job) => {
try {
const videos = await getVideosNearMilestone(client);
for (const video of videos) {
if (await videoHasActiveSchedule(client, video.aid)) continue;
const eta = await getAdjustedShortTermETA(client, video.aid);
const aid = Number(video.aid);
if (await videoHasActiveSchedule(client, aid)) continue;
const eta = await getAdjustedShortTermETA(client, aid);
if (eta > 72) continue;
const now = Date.now();
const targetTime = now + eta * HOUR;
await scheduleSnapshot(client, video.aid, "milestone", targetTime);
const scheduledNextSnapshotDelay = eta * HOUR;
const maxInterval = 4 * HOUR;
const minInterval = 1 * SECOND;
const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval);
const targetTime = now + delay;
await scheduleSnapshot(client, aid, "milestone", targetTime);
}
} catch (e) {
logger.error(e as Error, "mq", "fn:collectMilestoneSnapshotsWorker");
@ -94,6 +124,70 @@ export const collectMilestoneSnapshotsWorker = async (_job: Job) => {
}
};
export const takeSnapshotForVideoWorker = async (_job: Job) => {
// TODO: implement
export const regularSnapshotsWorker = async (_job: Job) => {
const client = await db.connect();
try {
const aids = await getVideosWithoutActiveSnapshotSchedule(client);
for (const rawAid of aids) {
const aid = Number(rawAid);
if (await videoHasActiveSchedule(client, aid)) continue;
const latestSnapshot = await getLatestVideoSnapshot(client, aid);
const now = Date.now();
const lastSnapshotedAt = latestSnapshot?.time ?? now;
const targetTime = truncate(lastSnapshotedAt + 24 * HOUR, now + 1, Infinity);
await scheduleSnapshot(client, aid, "normal", targetTime);
}
} catch (e) {
logger.error(e as Error, "mq", "fn:regularSnapshotsWorker");
} finally {
client.release();
}
};
export const takeSnapshotForVideoWorker = async (job: Job) => {
const id = job.data.id;
const aid = Number(job.data.aid);
const type = job.data.type;
const task = snapshotTypeToTaskMap[type] ?? "snapshotVideo";
const client = await db.connect();
const retryInterval = type === "milestone" ? 5 * SECOND : 2 * MINUTE;
try {
if (await videoHasProcessingSchedule(client, aid)) {
return `ALREADY_PROCESSING`;
}
await setSnapshotStatus(client, id, "processing");
const stat = await insertVideoSnapshot(client, aid, task);
if (typeof stat === "number") {
await setBiliVideoStatus(client, aid, stat);
await setSnapshotStatus(client, id, "completed");
return `BILI_STATUS_${stat}`;
}
await setSnapshotStatus(client, id, "completed");
if (type === "normal") {
await scheduleSnapshot(client, aid, type, Date.now() + 24 * HOUR);
return `DONE`;
}
if (type !== "milestone") return `DONE`;
const eta = await getAdjustedShortTermETA(client, aid);
if (eta > 72) return "ETA_TOO_LONG";
const now = Date.now();
const targetTime = now + eta * HOUR;
await scheduleSnapshot(client, aid, type, targetTime);
return `DONE`;
} catch (e) {
if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") {
logger.warn(
`No available proxy for aid ${job.data.aid}.`,
"mq",
"fn:takeSnapshotForVideoWorker",
);
await setSnapshotStatus(client, id, "completed");
await scheduleSnapshot(client, aid, type, Date.now() + retryInterval);
return;
}
logger.error(e as Error, "mq", "fn:takeSnapshotForVideoWorker");
await setSnapshotStatus(client, id, "failed");
} finally {
client.release();
}
};

View File

@ -18,11 +18,22 @@ export async function initMQ() {
await SnapshotQueue.upsertJobScheduler("snapshotTick", {
every: 1 * SECOND,
immediately: true,
}, {
opts: {
removeOnComplete: 1,
removeOnFail: 1,
},
});
await SnapshotQueue.upsertJobScheduler("collectMilestoneSnapshots", {
every: 5 * MINUTE,
immediately: true,
});
await SnapshotQueue.upsertJobScheduler("dispatchRegularSnapshots", {
every: 30 * MINUTE,
immediately: true,
});
logger.log("Message queue initialized.");
}

View File

@ -21,7 +21,7 @@ interface ProxiesMap {
}
type NetSchedulerErrorCode =
| "NO_AVAILABLE_PROXY"
| "NO_PROXY_AVAILABLE"
| "PROXY_RATE_LIMITED"
| "PROXY_NOT_FOUND"
| "FETCH_ERROR"
@ -143,10 +143,10 @@ class NetScheduler {
* @param {string} method - The HTTP method to use for the request. Default is "GET".
* @returns {Promise<any>} - A promise that resolves to the response body.
* @throws {NetSchedulerError} - The error will be thrown in following cases:
* - No available proxy currently: with error code NO_AVAILABLE_PROXY
* - Proxy is under rate limit: with error code PROXY_RATE_LIMITED
* - The native `fetch` function threw an error: with error code FETCH_ERROR
* - The proxy type is not supported: with error code NOT_IMPLEMENTED
* - No proxy is available currently: with error code `NO_PROXY_AVAILABLE`
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
* - The proxy type is not supported: with error code `NOT_IMPLEMENTED`
*/
async request<R>(url: string, task: string, method: string = "GET"): Promise<R> {
// find a available proxy
@ -156,7 +156,7 @@ class NetScheduler {
return await this.proxyRequest<R>(url, proxyName, task, method);
}
}
throw new NetSchedulerError("No available proxy currently.", "NO_AVAILABLE_PROXY");
throw new NetSchedulerError("No proxy is available currently.", "NO_PROXY_AVAILABLE");
}
/*
@ -168,10 +168,11 @@ class NetScheduler {
* @param {boolean} force - If true, the request will be made even if the proxy is rate limited. Default is false.
* @returns {Promise<any>} - A promise that resolves to the response body.
* @throws {NetSchedulerError} - The error will be thrown in following cases:
* - Proxy not found: with error code PROXY_NOT_FOUND
* - Proxy is under rate limit: with error code PROXY_RATE_LIMITED
* - The native `fetch` function threw an error: with error code FETCH_ERROR
* - The proxy type is not supported: with error code NOT_IMPLEMENTED
* - Proxy not found: with error code `PROXY_NOT_FOUND`
* - Proxy is under rate limit: with error code `PROXY_RATE_LIMITED`
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
* - The proxy type is not supported: with error code `NOT_IMPLEMENTED`
*/
async proxyRequest<R>(
url: string,
@ -255,8 +256,6 @@ class NetScheduler {
}
private async alicloudFcRequest<R>(url: string, region: string): Promise<R> {
let rawOutput: null | Uint8Array = null;
let rawErr: null | Uint8Array = null;
try {
const decoder = new TextDecoder();
const output = await new Deno.Command("aliyun", {
@ -280,19 +279,9 @@ class NetScheduler {
`CVSA-${region}`,
],
}).output();
rawOutput = output.stdout;
rawErr = output.stderr;
const out = decoder.decode(output.stdout);
const rawData = JSON.parse(out);
if (rawData.statusCode !== 200) {
const fileId = randomUUID();
await Deno.writeFile(`./logs/files/${fileId}.stdout`, output.stdout);
await Deno.writeFile(`./logs/files/${fileId}.stderr`, output.stderr);
logger.log(
`Returned non-200 status code. Raw ouput saved to ./logs/files/${fileId}.stdout/stderr`,
"net",
"fn:alicloudFcRequest",
);
throw new NetSchedulerError(
`Error proxying ${url} to ali-fc region ${region}, code: ${rawData.statusCode}.`,
"ALICLOUD_PROXY_ERR",
@ -301,16 +290,6 @@ class NetScheduler {
return JSON.parse(JSON.parse(rawData.body)) as R;
}
} catch (e) {
if (rawOutput !== null || rawErr !== null) {
const fileId = randomUUID();
rawOutput && await Deno.writeFile(`./logs/files/${fileId}.stdout`, rawOutput);
rawErr && await Deno.writeFile(`./logs/files/${fileId}.stderr`, rawErr);
logger.log(
`Error occurred. Raw ouput saved to ./logs/files/${fileId}.stdout/stderr`,
"net",
"fn:alicloudFcRequest",
);
}
logger.error(e as Error, "net", "fn:alicloudFcRequest");
throw new NetSchedulerError(`Unhandled error: Cannot proxy ${url} to ali-fc.`, "ALICLOUD_PROXY_ERR", e);
}
@ -369,7 +348,8 @@ Execution order for setup:
- Call after addProxy and addTask. Configures rate limiters specifically for tasks and their associated proxies.
- Depends on tasks and proxies being defined to apply limiters correctly.
4. setProviderLimiter(providerName, config):
- Call after addProxy and addTask. Sets rate limiters at the provider level, affecting all proxies used by tasks of that provider.
- Call after addProxy and addTask.
- It sets rate limiters at the provider level, affecting all proxies used by tasks of that provider.
- Depends on tasks and proxies being defined to identify which proxies to apply provider-level limiters to.
In summary: addProxy -> addTask -> (setTaskLimiter and/or setProviderLimiter).

View File

@ -1,12 +1,27 @@
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { getVideoInfo } from "lib/net/getVideoInfo.ts";
import { LatestSnapshotType } from "lib/db/schema.d.ts";
export async function insertVideoStats(client: Client, aid: number, task: string) {
/*
* Fetch video stats from bilibili API and insert into database
* @returns {Promise<number|VideoSnapshot>}
* A number indicating the status code when receiving non-0 status code from bilibili,
* otherwise an VideoSnapshot object containing the video stats
* @throws {NetSchedulerError} - The error will be thrown in following cases:
* - No proxy is available currently: with error code `NO_PROXY_AVAILABLE`
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
*/
export async function insertVideoSnapshot(
client: Client,
aid: number,
task: string,
): Promise<number | LatestSnapshotType> {
const data = await getVideoInfo(aid, task);
const time = new Date().getTime();
if (typeof data == "number") {
return data;
}
const time = new Date().getTime();
const views = data.stat.view;
const danmakus = data.stat.danmaku;
const replies = data.stat.reply;
@ -14,14 +29,17 @@ export async function insertVideoStats(client: Client, aid: number, task: string
const coins = data.stat.coin;
const shares = data.stat.share;
const favorites = data.stat.favorite;
await client.queryObject(
`
const query: string = `
INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`,
`;
await client.queryObject(
query,
[aid, views, danmakus, replies, likes, coins, shares, favorites],
);
return {
const snapshot: LatestSnapshotType = {
aid,
views,
danmakus,
@ -32,4 +50,6 @@ export async function insertVideoStats(client: Client, aid: number, task: string
favorites,
time,
};
return snapshot;
}

View File

@ -2,6 +2,19 @@ import netScheduler from "lib/mq/scheduler.ts";
import { VideoInfoData, VideoInfoResponse } from "lib/net/bilibili.d.ts";
import logger from "lib/log/logger.ts";
/*
* Fetch video metadata from bilibili API
* @param {number} aid - The video's aid
* @param {string} task - The task name used in scheduler. It can be one of the following:
* - snapshotVideo
* - getVideoInfo
* - snapshotMilestoneVideo
* @returns {Promise<VideoInfoData | number>} VideoInfoData or the error code returned by bilibili API
* @throws {NetSchedulerError} - The error will be thrown in following cases:
* - No proxy is available currently: with error code `NO_PROXY_AVAILABLE`
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
*/
export async function getVideoInfo(aid: number, task: string): Promise<VideoInfoData | number> {
const url = `https://api.bilibili.com/x/web-interface/view?aid=${aid}`;
const data = await netScheduler.request<VideoInfoResponse>(url, task);

View File

@ -6,7 +6,7 @@ import { lockManager } from "lib/mq/lockManager.ts";
import { WorkerError } from "lib/mq/schema.ts";
import { getVideoInfoWorker } from "lib/mq/exec/getLatestVideos.ts";
import {
collectMilestoneSnapshotsWorker,
collectMilestoneSnapshotsWorker, regularSnapshotsWorker,
snapshotTickWorker,
takeSnapshotForVideoWorker,
} from "lib/mq/exec/snapshotTick.ts";
@ -14,12 +14,14 @@ import {
Deno.addSignalListener("SIGINT", async () => {
logger.log("SIGINT Received: Shutting down workers...", "mq");
await latestVideoWorker.close(true);
await snapshotWorker.close(true);
Deno.exit();
});
Deno.addSignalListener("SIGTERM", async () => {
logger.log("SIGTERM Received: Shutting down workers...", "mq");
await latestVideoWorker.close(true);
await snapshotWorker.close(true);
Deno.exit();
});
@ -40,7 +42,12 @@ const latestVideoWorker = new Worker(
break;
}
},
{ connection: redis as ConnectionOptions, concurrency: 6, removeOnComplete: { count: 1440 }, removeOnFail: { count: 0 } },
{
connection: redis as ConnectionOptions,
concurrency: 6,
removeOnComplete: { count: 1440 },
removeOnFail: { count: 0 },
},
);
latestVideoWorker.on("active", () => {
@ -69,6 +76,9 @@ const snapshotWorker = new Worker(
case "collectMilestoneSnapshots":
await collectMilestoneSnapshotsWorker(job);
break;
case "dispatchRegularSnapshots":
await regularSnapshotsWorker(job);
break;
default:
break;
}