diff --git a/lib/db/allData.ts b/lib/db/allData.ts index 00fe22e..6e9c509 100644 --- a/lib/db/allData.ts +++ b/lib/db/allData.ts @@ -68,3 +68,10 @@ export async function getUnArchivedBiliUsers(client: Client) { const rows = queryResult.rows; return rows.map((row) => row.uid); } + +export async function setBiliVideoStatus(client: Client, aid: number, status: number) { + return await client.queryObject( + `UPDATE bilibili_metadata SET status = $1 WHERE aid = $2`, + [status, aid], + ); +} diff --git a/lib/db/schema.d.ts b/lib/db/schema.d.ts index 983389c..d030308 100644 --- a/lib/db/schema.d.ts +++ b/lib/db/schema.d.ts @@ -32,6 +32,18 @@ export interface VideoSnapshotType { replies: number; } +export interface LatestSnapshotType { + aid: number; + time: number; + views: number; + danmakus: number; + replies: number; + likes: number; + coins: number; + shares: number; + favorites: number; +} + export interface SnapshotScheduleType { id: number; aid: number; diff --git a/lib/db/snapshot.ts b/lib/db/snapshot.ts index d838899..be7ec26 100644 --- a/lib/db/snapshot.ts +++ b/lib/db/snapshot.ts @@ -1,35 +1,17 @@ -import { DAY, SECOND } from "$std/datetime/constants.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; -import { VideoSnapshotType } from "lib/db/schema.d.ts"; -import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts"; +import { LatestSnapshotType } from "lib/db/schema.d.ts"; export async function getVideosNearMilestone(client: Client) { - const queryResult = await client.queryObject(` - WITH filtered_snapshots AS ( - SELECT - vs.* - FROM - video_snapshot vs - WHERE - (vs.views >= 90000 AND vs.views < 100000) OR - (vs.views >= 900000 AND vs.views < 1000000) - ), - ranked_snapshots AS ( - SELECT - fs.*, - ROW_NUMBER() OVER (PARTITION BY fs.aid ORDER BY fs.created_at DESC) as rn, - MAX(fs.views) OVER (PARTITION BY fs.aid) as max_views_per_aid - FROM - filtered_snapshots fs - INNER JOIN - songs s ON fs.aid = s.aid - ) - SELECT - rs.id, rs.created_at, rs.views, rs.coins, rs.likes, rs.favorites, rs.shares, rs.danmakus, rs.aid, rs.replies - FROM - ranked_snapshots rs - WHERE - rs.rn = 1; + const queryResult = await client.queryObject(` + SELECT ls.* + FROM latest_video_snapshot ls + INNER JOIN + songs s ON ls.aid = s.aid + WHERE + s.deleted = false AND + (views >= 90000 AND views < 100000) OR + (views >= 900000 AND views < 1000000) OR + (views >= 9900000 AND views < 10000000) `); return queryResult.rows.map((row) => { return { @@ -38,161 +20,3 @@ export async function getVideosNearMilestone(client: Client) { }; }); } - -export async function getUnsnapshotedSongs(client: Client) { - const queryResult = await client.queryObject<{ aid: bigint }>(` - SELECT DISTINCT s.aid - FROM songs s - LEFT JOIN video_snapshot v ON s.aid = v.aid - WHERE v.aid IS NULL; - `); - return queryResult.rows.map((row) => Number(row.aid)); -} - -export async function getSongSnapshotCount(client: Client, aid: number) { - const queryResult = await client.queryObject<{ count: number }>( - ` - SELECT COUNT(*) AS count - FROM video_snapshot - WHERE aid = $1; - `, - [aid], - ); - return queryResult.rows[0].count; -} - -export async function getShortTermEtaPrediction(client: Client, aid: number) { - const queryResult = await client.queryObject<{ eta: number }>( - ` - WITH old_snapshot AS ( - SELECT created_at, views - FROM video_snapshot - WHERE aid = $1 AND - NOW() - created_at > '20 min' - ORDER BY created_at DESC - LIMIT 1 - ), - new_snapshot AS ( - SELECT created_at, views - FROM video_snapshot - WHERE aid = $1 - ORDER BY created_at DESC - LIMIT 1 - ) - SELECT - CASE - WHEN n.views > 100000 - THEN - (1000000 - n.views) -- Views remaining - / - ( - (n.views - o.views) -- Views delta - / - (EXTRACT(EPOCH FROM (n.created_at - o.created_at)) + 0.001) -- Time delta in seconds - + 0.001 - ) -- Increment per second - ELSE - (100000 - n.views) -- Views remaining - / - ( - (n.views - o.views) -- Views delta - / - (EXTRACT(EPOCH FROM (n.created_at - o.created_at)) + 0.001) -- Time delta in seconds - + 0.001 - ) -- Increment per second - END AS eta - FROM old_snapshot o, new_snapshot n; - `, - [aid], - ); - if (queryResult.rows.length === 0) { - return null; - } - return queryResult.rows[0].eta; -} - -export async function getIntervalFromLastSnapshotToNow(client: Client, aid: number) { - const queryResult = await client.queryObject<{ interval: number }>( - ` - SELECT EXTRACT(EPOCH FROM (NOW() - created_at)) AS interval - FROM video_snapshot - WHERE aid = $1 - ORDER BY created_at DESC - LIMIT 1; - `, - [aid], - ); - if (queryResult.rows.length === 0) { - return null; - } - return queryResult.rows[0].interval; -} - -export async function songEligibleForMilestoneSnapshot(client: Client, aid: number) { - const count = await getSongSnapshotCount(client, aid); - if (count < 2) { - return true; - } - const queryResult = await client.queryObject< - { views1: number; created_at1: string; views2: number; created_at2: string } - >( - ` - WITH latest_snapshot AS ( - SELECT - aid, - views, - created_at - FROM video_snapshot - WHERE aid = $1 - ORDER BY created_at DESC - LIMIT 1 - ), - pairs AS ( - SELECT - a.views AS views1, - a.created_at AS created_at1, - b.views AS views2, - b.created_at AS created_at2, - (b.created_at - a.created_at) AS interval - FROM video_snapshot a - JOIN latest_snapshot b - ON a.aid = b.aid - AND a.created_at < b.created_at - ) - SELECT - views1, - created_at1, - views2, - created_at2 - FROM ( - SELECT - *, - ROW_NUMBER() OVER ( - ORDER BY - CASE WHEN interval <= INTERVAL '3 days' THEN 0 ELSE 1 END, - CASE WHEN interval <= INTERVAL '3 days' THEN -interval ELSE interval END - ) AS rn - FROM pairs - ) ranked - WHERE rn = 1; - `, - [aid], - ); - if (queryResult.rows.length === 0) { - return true; - } - const recentViewsData = queryResult.rows[0]; - const time1 = parseTimestampFromPsql(recentViewsData.created_at1); - const time2 = parseTimestampFromPsql(recentViewsData.created_at2); - const intervalSec = (time2 - time1) / SECOND; - const views1 = recentViewsData.views1; - const views2 = recentViewsData.views2; - const viewsDiff = views2 - views1; - if (viewsDiff == 0) { - return false; - } - const nextMilestone = views2 >= 100000 ? 1000000 : 100000; - const expectedViewsDiff = nextMilestone - views2; - const expectedIntervalSec = expectedViewsDiff / viewsDiff * intervalSec; - return expectedIntervalSec <= 3 * DAY; -} diff --git a/lib/db/snapshotSchedule.ts b/lib/db/snapshotSchedule.ts index 8fd54fc..bd4e805 100644 --- a/lib/db/snapshotSchedule.ts +++ b/lib/db/snapshotSchedule.ts @@ -14,6 +14,14 @@ export async function videoHasActiveSchedule(client: Client, aid: number) { return res.rows.length > 0; } +export async function videoHasProcessingSchedule(client: Client, aid: number) { + const res = await client.queryObject<{ status: string }>( + `SELECT status FROM snapshot_schedule WHERE aid = $1 AND status = 'processing'`, + [aid], + ); + return res.rows.length > 0; +} + interface Snapshot { created_at: number; views: number; @@ -156,9 +164,20 @@ function truncateTo5MinInterval(timestamp: Date): Date { } export async function getSnapshotsInNextSecond(client: Client) { - const res = await client.queryObject( - `SELECT * FROM cvsa.public.snapshot_schedule WHERE started_at <= NOW() + INTERVAL '1 second'`, - [], - ); + const query = ` + SELECT * + FROM snapshot_schedule + WHERE started_at + BETWEEN NOW() - INTERVAL '5 seconds' + AND NOW() + INTERVAL '1 seconds' + `; + const res = await client.queryObject(query, []); return res.rows; } + +export async function setSnapshotStatus(client: Client, id: number, status: string) { + return client.queryObject( + `UPDATE snapshot_schedule SET status = $2 WHERE id = $1`, + [id, status], + ); +} diff --git a/lib/mq/exec/snapshotTick.ts b/lib/mq/exec/snapshotTick.ts index 86de99b..453ac65 100644 --- a/lib/mq/exec/snapshotTick.ts +++ b/lib/mq/exec/snapshotTick.ts @@ -6,27 +6,44 @@ import { getLatestSnapshot, getSnapshotsInNextSecond, scheduleSnapshot, + setSnapshotStatus, videoHasActiveSchedule, + videoHasProcessingSchedule, } from "lib/db/snapshotSchedule.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; -import { HOUR, MINUTE } from "$std/datetime/constants.ts"; +import { HOUR, MINUTE, SECOND } from "$std/datetime/constants.ts"; import logger from "lib/log/logger.ts"; import { SnapshotQueue } from "lib/mq/index.ts"; +import { insertVideoSnapshot } from "../task/getVideoStats.ts"; +import { NetSchedulerError } from "../scheduler.ts"; +import { setBiliVideoStatus } from "../../db/allData.ts"; const priorityMap: { [key: string]: number } = { "milestone": 1, }; +const snapshotTypeToTaskMap: { [key: string]: string } = { + "milestone": "snapshotMilestoneVideo", + "normal": "snapshotVideo", +}; + export const snapshotTickWorker = async (_job: Job) => { const client = await db.connect(); try { const schedules = await getSnapshotsInNextSecond(client); for (const schedule of schedules) { let priority = 3; - if (schedule.type && priorityMap[schedule.type]) + if (schedule.type && priorityMap[schedule.type]) { priority = priorityMap[schedule.type]; - await SnapshotQueue.add("snapshotVideo", { aid: schedule.aid, priority }); + } + await SnapshotQueue.add("snapshotVideo", { + aid: schedule.aid, + id: schedule.id, + type: schedule.type ?? "normal", + }, { priority }); } + } catch (e) { + logger.error(e as Error); } finally { client.release(); } @@ -52,7 +69,7 @@ const getAdjustedShortTermETA = async (client: Client, aid: number) => { if (!latestSnapshot) return 0; const currentTimestamp = Date.now(); - const timeIntervals = [20 * MINUTE, 1 * HOUR, 3 * HOUR, 6 * HOUR]; + const timeIntervals = [3 * MINUTE, 20 * MINUTE, 1 * HOUR, 3 * HOUR, 6 * HOUR]; const DELTA = 0.00001; let minETAHours = Infinity; @@ -94,6 +111,42 @@ export const collectMilestoneSnapshotsWorker = async (_job: Job) => { } }; -export const takeSnapshotForVideoWorker = async (_job: Job) => { - // TODO: implement +export const takeSnapshotForVideoWorker = async (job: Job) => { + const id = job.data.id; + const aid = job.data.aid; + const task = snapshotTypeToTaskMap[job.data.type] ?? "snapshotVideo"; + const client = await db.connect(); + try { + if (await videoHasProcessingSchedule(client, aid)) { + return `ALREADY_PROCESSING`; + } + await setSnapshotStatus(client, id, "processing"); + const stat = await insertVideoSnapshot(client, aid, task); + if (typeof stat === "number") { + await setBiliVideoStatus(client, aid, stat); + await setSnapshotStatus(client, id, "completed"); + return `BILI_STATUS_${stat}`; + } + const eta = await getAdjustedShortTermETA(client, aid); + if (eta > 72) return "ETA_TOO_LONG"; + const now = Date.now(); + const targetTime = now + eta * HOUR; + await setSnapshotStatus(client, id, "completed"); + await scheduleSnapshot(client, aid, "milestone", targetTime); + } catch (e) { + if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") { + logger.warn( + `No available proxy for aid ${job.data.aid}.`, + "mq", + "fn:takeSnapshotForVideoWorker", + ); + await setSnapshotStatus(client, id, "completed"); + await scheduleSnapshot(client, aid, "milestone", Date.now() + 5 * SECOND); + return; + } + logger.error(e as Error, "mq", "fn:takeSnapshotForVideoWorker"); + await setSnapshotStatus(client, id, "failed"); + } finally { + client.release(); + } }; diff --git a/lib/mq/scheduler.ts b/lib/mq/scheduler.ts index 94c9361..a711a11 100644 --- a/lib/mq/scheduler.ts +++ b/lib/mq/scheduler.ts @@ -21,7 +21,7 @@ interface ProxiesMap { } type NetSchedulerErrorCode = - | "NO_AVAILABLE_PROXY" + | "NO_PROXY_AVAILABLE" | "PROXY_RATE_LIMITED" | "PROXY_NOT_FOUND" | "FETCH_ERROR" @@ -143,10 +143,10 @@ class NetScheduler { * @param {string} method - The HTTP method to use for the request. Default is "GET". * @returns {Promise} - A promise that resolves to the response body. * @throws {NetSchedulerError} - The error will be thrown in following cases: - * - No available proxy currently: with error code NO_AVAILABLE_PROXY - * - Proxy is under rate limit: with error code PROXY_RATE_LIMITED - * - The native `fetch` function threw an error: with error code FETCH_ERROR - * - The proxy type is not supported: with error code NOT_IMPLEMENTED + * - No proxy is available currently: with error code `NO_PROXY_AVAILABLE` + * - The native `fetch` function threw an error: with error code `FETCH_ERROR` + * - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR` + * - The proxy type is not supported: with error code `NOT_IMPLEMENTED` */ async request(url: string, task: string, method: string = "GET"): Promise { // find a available proxy @@ -156,7 +156,7 @@ class NetScheduler { return await this.proxyRequest(url, proxyName, task, method); } } - throw new NetSchedulerError("No available proxy currently.", "NO_AVAILABLE_PROXY"); + throw new NetSchedulerError("No proxy is available currently.", "NO_PROXY_AVAILABLE"); } /* @@ -168,10 +168,11 @@ class NetScheduler { * @param {boolean} force - If true, the request will be made even if the proxy is rate limited. Default is false. * @returns {Promise} - A promise that resolves to the response body. * @throws {NetSchedulerError} - The error will be thrown in following cases: - * - Proxy not found: with error code PROXY_NOT_FOUND - * - Proxy is under rate limit: with error code PROXY_RATE_LIMITED - * - The native `fetch` function threw an error: with error code FETCH_ERROR - * - The proxy type is not supported: with error code NOT_IMPLEMENTED + * - Proxy not found: with error code `PROXY_NOT_FOUND` + * - Proxy is under rate limit: with error code `PROXY_RATE_LIMITED` + * - The native `fetch` function threw an error: with error code `FETCH_ERROR` + * - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR` + * - The proxy type is not supported: with error code `NOT_IMPLEMENTED` */ async proxyRequest( url: string, @@ -255,8 +256,6 @@ class NetScheduler { } private async alicloudFcRequest(url: string, region: string): Promise { - let rawOutput: null | Uint8Array = null; - let rawErr: null | Uint8Array = null; try { const decoder = new TextDecoder(); const output = await new Deno.Command("aliyun", { @@ -280,19 +279,9 @@ class NetScheduler { `CVSA-${region}`, ], }).output(); - rawOutput = output.stdout; - rawErr = output.stderr; const out = decoder.decode(output.stdout); const rawData = JSON.parse(out); if (rawData.statusCode !== 200) { - const fileId = randomUUID(); - await Deno.writeFile(`./logs/files/${fileId}.stdout`, output.stdout); - await Deno.writeFile(`./logs/files/${fileId}.stderr`, output.stderr); - logger.log( - `Returned non-200 status code. Raw ouput saved to ./logs/files/${fileId}.stdout/stderr`, - "net", - "fn:alicloudFcRequest", - ); throw new NetSchedulerError( `Error proxying ${url} to ali-fc region ${region}, code: ${rawData.statusCode}.`, "ALICLOUD_PROXY_ERR", @@ -301,16 +290,6 @@ class NetScheduler { return JSON.parse(JSON.parse(rawData.body)) as R; } } catch (e) { - if (rawOutput !== null || rawErr !== null) { - const fileId = randomUUID(); - rawOutput && await Deno.writeFile(`./logs/files/${fileId}.stdout`, rawOutput); - rawErr && await Deno.writeFile(`./logs/files/${fileId}.stderr`, rawErr); - logger.log( - `Error occurred. Raw ouput saved to ./logs/files/${fileId}.stdout/stderr`, - "net", - "fn:alicloudFcRequest", - ); - } logger.error(e as Error, "net", "fn:alicloudFcRequest"); throw new NetSchedulerError(`Unhandled error: Cannot proxy ${url} to ali-fc.`, "ALICLOUD_PROXY_ERR", e); } @@ -369,7 +348,8 @@ Execution order for setup: - Call after addProxy and addTask. Configures rate limiters specifically for tasks and their associated proxies. - Depends on tasks and proxies being defined to apply limiters correctly. 4. setProviderLimiter(providerName, config): - - Call after addProxy and addTask. Sets rate limiters at the provider level, affecting all proxies used by tasks of that provider. + - Call after addProxy and addTask. + - It sets rate limiters at the provider level, affecting all proxies used by tasks of that provider. - Depends on tasks and proxies being defined to identify which proxies to apply provider-level limiters to. In summary: addProxy -> addTask -> (setTaskLimiter and/or setProviderLimiter). diff --git a/lib/mq/task/getVideoStats.ts b/lib/mq/task/getVideoStats.ts index 6f85035..8e3530a 100644 --- a/lib/mq/task/getVideoStats.ts +++ b/lib/mq/task/getVideoStats.ts @@ -1,12 +1,27 @@ import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { getVideoInfo } from "lib/net/getVideoInfo.ts"; +import { LatestSnapshotType } from "lib/db/schema.d.ts"; -export async function insertVideoStats(client: Client, aid: number, task: string) { +/* + * Fetch video stats from bilibili API and insert into database + * @returns {Promise} + * A number indicating the status code when receiving non-0 status code from bilibili, + * otherwise an VideoSnapshot object containing the video stats + * @throws {NetSchedulerError} - The error will be thrown in following cases: + * - No proxy is available currently: with error code `NO_PROXY_AVAILABLE` + * - The native `fetch` function threw an error: with error code `FETCH_ERROR` + * - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR` + */ +export async function insertVideoSnapshot( + client: Client, + aid: number, + task: string, +): Promise { const data = await getVideoInfo(aid, task); - const time = new Date().getTime(); if (typeof data == "number") { return data; } + const time = new Date().getTime(); const views = data.stat.view; const danmakus = data.stat.danmaku; const replies = data.stat.reply; @@ -14,14 +29,17 @@ export async function insertVideoStats(client: Client, aid: number, task: string const coins = data.stat.coin; const shares = data.stat.share; const favorites = data.stat.favorite; - await client.queryObject( - ` + + const query: string = ` INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) - `, + `; + await client.queryObject( + query, [aid, views, danmakus, replies, likes, coins, shares, favorites], ); - return { + + const snapshot: LatestSnapshotType = { aid, views, danmakus, @@ -32,4 +50,6 @@ export async function insertVideoStats(client: Client, aid: number, task: string favorites, time, }; + + return snapshot; } diff --git a/lib/net/getVideoInfo.ts b/lib/net/getVideoInfo.ts index c35bf56..897fc62 100644 --- a/lib/net/getVideoInfo.ts +++ b/lib/net/getVideoInfo.ts @@ -2,6 +2,19 @@ import netScheduler from "lib/mq/scheduler.ts"; import { VideoInfoData, VideoInfoResponse } from "lib/net/bilibili.d.ts"; import logger from "lib/log/logger.ts"; +/* + * Fetch video metadata from bilibili API + * @param {number} aid - The video's aid + * @param {string} task - The task name used in scheduler. It can be one of the following: + * - snapshotVideo + * - getVideoInfo + * - snapshotMilestoneVideo + * @returns {Promise} VideoInfoData or the error code returned by bilibili API + * @throws {NetSchedulerError} - The error will be thrown in following cases: + * - No proxy is available currently: with error code `NO_PROXY_AVAILABLE` + * - The native `fetch` function threw an error: with error code `FETCH_ERROR` + * - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR` + */ export async function getVideoInfo(aid: number, task: string): Promise { const url = `https://api.bilibili.com/x/web-interface/view?aid=${aid}`; const data = await netScheduler.request(url, task); diff --git a/src/worker.ts b/src/worker.ts index 9998569..ba2b510 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -40,7 +40,12 @@ const latestVideoWorker = new Worker( break; } }, - { connection: redis as ConnectionOptions, concurrency: 6, removeOnComplete: { count: 1440 }, removeOnFail: { count: 0 } }, + { + connection: redis as ConnectionOptions, + concurrency: 6, + removeOnComplete: { count: 1440 }, + removeOnFail: { count: 0 }, + }, ); latestVideoWorker.on("active", () => {