diff --git a/packages/core/net/delegate.ts b/packages/core/net/delegate.ts index ef0411b..eec7f73 100644 --- a/packages/core/net/delegate.ts +++ b/packages/core/net/delegate.ts @@ -16,6 +16,7 @@ import { Readable } from "stream"; interface FCResponse { statusCode: number; body: string; + serverTime: number; } interface Proxy { @@ -167,7 +168,10 @@ class NetworkDelegate { * - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR` * - The proxy type is not supported: with error code `NOT_IMPLEMENTED` */ - async request(url: string, task: string): Promise { + async request(url: string, task: string): Promise<{ + data: R; + time: number; + }> { // find a available proxy const proxiesNames = this.getTaskProxies(task); for (const proxyName of shuffleArray(proxiesNames)) { @@ -203,7 +207,10 @@ class NetworkDelegate { proxyName: string, task: string, force: boolean = false - ): Promise { + ): Promise<{ + data: R; + time: number; + }> { const proxy = this.proxies[proxyName]; if (!proxy) { throw new NetSchedulerError(`Proxy "${proxyName}" not found`, "PROXY_NOT_FOUND"); @@ -214,7 +221,10 @@ class NetworkDelegate { return result; } - private async makeRequest(url: string, proxy: Proxy): Promise { + private async makeRequest(url: string, proxy: Proxy): Promise<{ + data: R; + time: number; + }> { switch (proxy.type) { case "native": return await this.nativeRequest(url); @@ -228,7 +238,10 @@ class NetworkDelegate { } } - private async nativeRequest(url: string): Promise { + private async nativeRequest(url: string): Promise<{ + data: R; + time: number; + }> { try { const controller = new AbortController(); const timeout = setTimeout(() => controller.abort(), 10 * SECOND); @@ -239,13 +252,26 @@ class NetworkDelegate { clearTimeout(timeout); - return (await response.json()) as R; + const start = Date.now(); + const data = await response.json(); + const end = Date.now(); + const serverTime = start + (end - start) / 2; + return { + data: data as R, + time: serverTime + }; } catch (e) { throw new NetSchedulerError("Fetch error", "FETCH_ERROR", e); } } - private async alicloudFcRequest(url: string, region: string): Promise { + private async alicloudFcRequest( + url: string, + region: string + ): Promise<{ + data: R; + time: number; + }> { try { const client = getAlicloudClient(region); const bodyStream = Stream.readFromString(JSON.stringify({ url: url })); @@ -275,7 +301,10 @@ class NetworkDelegate { "ALICLOUD_PROXY_ERR" ); } else { - return JSON.parse(rawData.body) as R; + return { + data: JSON.parse(rawData.body) as R, + time: rawData.serverTime + }; } } catch (e) { logger.error(e as Error, "net", "fn:alicloudFcRequest"); diff --git a/packages/core/net/getVideoInfo.ts b/packages/core/net/getVideoInfo.ts index b842231..cfc6d3b 100644 --- a/packages/core/net/getVideoInfo.ts +++ b/packages/core/net/getVideoInfo.ts @@ -15,15 +15,27 @@ import logger from "@core/log"; * - The native `fetch` function threw an error: with error code `FETCH_ERROR` * - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR` */ -export async function getVideoInfo(aid: number, task: string): Promise { +export async function getVideoInfo( + aid: number, + task: string +): Promise< + | { + data: VideoInfoData; + time: number; + } + | number +> { const url = `https://api.bilibili.com/x/web-interface/view?aid=${aid}`; - const data = await networkDelegate.request(url, task); + const { data, time } = await networkDelegate.request(url, task); const errMessage = `Error fetching metadata for ${aid}:`; if (data.code !== 0) { logger.error(errMessage + data.code + "-" + data.message, "net", "fn:getVideoInfo"); return data.code; } - return data.data; + return { + data: data.data, + time: time + }; } /* @@ -39,9 +51,12 @@ export async function getVideoInfo(aid: number, task: string): Promise { +export async function getVideoInfoByBV( + bvid: string, + task: string +): Promise { const url = `https://api.bilibili.com/x/web-interface/view?bvid=${bvid}`; - const data = await networkDelegate.request(url, task); + const { data } = await networkDelegate.request(url, task); const errMessage = `Error fetching metadata for ${bvid}:`; if (data.code !== 0) { logger.error(errMessage + data.code + "-" + data.message, "net", "fn:getVideoInfoByBV"); diff --git a/packages/core/test/netDelegate.test.ts b/packages/core/test/netDelegate.test.ts index 1f9ad53..d531b3c 100644 --- a/packages/core/test/netDelegate.test.ts +++ b/packages/core/test/netDelegate.test.ts @@ -3,7 +3,7 @@ import { test, expect, describe } from "bun:test"; describe("proxying requests", () => { test("Alibaba Cloud FC", async () => { - const res = await networkDelegate.request("https://postman-echo.com/get", "test") as any; + const { res } = await networkDelegate.request("https://postman-echo.com/get", "test") as any; expect(res.headers.referer).toBe('https://www.bilibili.com/'); }); }); diff --git a/packages/crawler/mq/exec/takeBulkSnapshot.ts b/packages/crawler/mq/exec/takeBulkSnapshot.ts index 00ec8c4..2a63060 100644 --- a/packages/crawler/mq/exec/takeBulkSnapshot.ts +++ b/packages/crawler/mq/exec/takeBulkSnapshot.ts @@ -30,12 +30,13 @@ export const takeBulkSnapshotForVideosWorker = async (job: Job) => { } aidsToFetch.push(aid); } - const data = await bulkGetVideoStats(aidsToFetch); - if (typeof data === "number") { + const r = await bulkGetVideoStats(aidsToFetch); + if (typeof r === "number") { await bulkSetSnapshotStatus(sql, ids, "failed"); await bulkScheduleSnapshot(sql, aidsToFetch, "normal", Date.now() + 15 * SECOND); - return `GET_BILI_STATUS_${data}`; + return `GET_BILI_STATUS_${r}`; } + const { data, time } = r; for (const video of data) { const aid = video.id; const stat = video.cnt_info; @@ -58,7 +59,7 @@ export const takeBulkSnapshotForVideosWorker = async (job: Job) => { await updateETA(sql, aid, eta, speed, views); } await sql` - INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites) + INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites, created_at) VALUES ( ${aid}, ${views}, @@ -67,7 +68,8 @@ export const takeBulkSnapshotForVideosWorker = async (job: Job) => { ${likes}, ${coins}, ${shares}, - ${favorites} + ${favorites}, + ${new Date(time).toUTCString()} ) `; diff --git a/packages/crawler/mq/task/getVideoStats.ts b/packages/crawler/mq/task/getVideoStats.ts index 20b068d..bf31bf6 100644 --- a/packages/crawler/mq/task/getVideoStats.ts +++ b/packages/crawler/mq/task/getVideoStats.ts @@ -24,12 +24,16 @@ export interface SnapshotNumber { * - The native `fetch` function threw an error: with error code `FETCH_ERROR` * - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR` */ -export async function insertVideoSnapshot(sql: Psql, aid: number, task: string): Promise { - const data = await getVideoInfo(aid, task); - if (typeof data == "number") { - return data; +export async function insertVideoSnapshot( + sql: Psql, + aid: number, + task: string +): Promise { + const r = await getVideoInfo(aid, task); + if (typeof r == "number") { + return r; } - const time = new Date().getTime(); + const { data, time } = r; const views = data.stat.view; const danmakus = data.stat.danmaku; const replies = data.stat.reply; @@ -39,8 +43,8 @@ export async function insertVideoSnapshot(sql: Psql, aid: number, task: string): const favorites = data.stat.favorite; await sql` - INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites) - VALUES (${aid}, ${views}, ${danmakus}, ${replies}, ${likes}, ${coins}, ${shares}, ${favorites}) + INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites, created_at) + VALUES (${aid}, ${views}, ${danmakus}, ${replies}, ${likes}, ${coins}, ${shares}, ${favorites}, ${new Date(time).toUTCString()}) `; logger.log(`Taken snapshot for video ${aid}.`, "net", "fn:insertVideoSnapshot"); diff --git a/packages/crawler/net/bulkGetVideoStats.ts b/packages/crawler/net/bulkGetVideoStats.ts index 049ec96..aa71ddd 100644 --- a/packages/crawler/net/bulkGetVideoStats.ts +++ b/packages/crawler/net/bulkGetVideoStats.ts @@ -11,17 +11,29 @@ import logger from "@core/log"; * - The native `fetch` function threw an error: with error code `FETCH_ERROR` * - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR` */ -export async function bulkGetVideoStats(aids: number[]): Promise { +export async function bulkGetVideoStats(aids: number[]): Promise< + | { + data: MediaListInfoData; + time: number; + } + | number +> { // TODO: https://api.bilibili.com/x/v3/fav/resource/infos?resources=20:2 let url = `https://api.bilibili.com/medialist/gateway/base/resource/infos?resources=`; for (const aid of aids) { url += `${aid}:2,`; } - const data = await networkDelegate.request(url, "bulkSnapshot"); + const { data, time } = await networkDelegate.request( + url, + "bulkSnapshot" + ); const errMessage = `Error fetching metadata for aid list: ${aids.join(",")}:`; if (data.code !== 0) { logger.error(errMessage + data.code + "-" + data.message, "net", "fn:getVideoInfo"); return data.code; } - return data.data; + return { + data: data.data, + time: time + }; } diff --git a/packages/crawler/net/getLatestVideoAids.ts b/packages/crawler/net/getLatestVideoAids.ts index 12ba8dd..bbb8673 100644 --- a/packages/crawler/net/getLatestVideoAids.ts +++ b/packages/crawler/net/getLatestVideoAids.ts @@ -8,7 +8,7 @@ export async function getLatestVideoAids(page: number = 1, pageSize: number = 10 const range = `${startFrom}-${endTo}`; const errMessage = `Error fetching latest aid for ${range}:`; const url = `https://api.bilibili.com/x/web-interface/newlist?rid=30&ps=${pageSize}&pn=${page}`; - const data = await networkDelegate.request(url, "getLatestVideos"); + const { data } = await networkDelegate.request(url, "getLatestVideos"); if (data.code != 0) { logger.error(errMessage + data.message, "net", "getLastestVideos"); return []; diff --git a/packages/crawler/net/getVideoDetails.ts b/packages/crawler/net/getVideoDetails.ts index 3a6f515..414b421 100644 --- a/packages/crawler/net/getVideoDetails.ts +++ b/packages/crawler/net/getVideoDetails.ts @@ -4,7 +4,7 @@ import logger from "@core/log"; export async function getVideoDetails(aid: number, archive: boolean = false): Promise { const url = `https://api.bilibili.com/x/web-interface/view/detail?aid=${aid}`; - const data = await networkDelegate.request(url, archive ? "" : "getVideoInfo"); + const { data } = await networkDelegate.request(url, archive ? "" : "getVideoInfo"); const errMessage = `Error fetching metadata for ${aid}:`; if (data.code !== 0) { logger.error(errMessage + data.code + "-" + data.message, "net", "fn:getVideoInfo");