From 7104a95af93438eabf116dde42d57bb81f8cff8f Mon Sep 17 00:00:00 2001 From: alikia2x Date: Sat, 15 Mar 2025 21:27:19 +0800 Subject: [PATCH 01/10] ref: rename table all_data, bili_user to bilibili_metadata, bilibili_user --- lib/db/allData.ts | 14 +++++++------- lib/mq/task/collectSongs.ts | 6 +++--- lib/mq/task/getVideoDetails.ts | 6 +++--- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/lib/db/allData.ts b/lib/db/allData.ts index 8e30780..0c6a42d 100644 --- a/lib/db/allData.ts +++ b/lib/db/allData.ts @@ -3,19 +3,19 @@ import { AllDataType, BiliUserType } from "lib/db/schema.d.ts"; import { modelVersion } from "lib/ml/filter_inference.ts"; export async function videoExistsInAllData(client: Client, aid: number) { - return await client.queryObject<{ exists: boolean }>(`SELECT EXISTS(SELECT 1 FROM all_data WHERE aid = $1)`, [aid]) + return await client.queryObject<{ exists: boolean }>(`SELECT EXISTS(SELECT 1 FROM bilibili_metadata WHERE aid = $1)`, [aid]) .then((result) => result.rows[0].exists); } export async function userExistsInBiliUsers(client: Client, uid: number) { - return await client.queryObject<{ exists: boolean }>(`SELECT EXISTS(SELECT 1 FROM bili_user WHERE uid = $1)`, [ + return await client.queryObject<{ exists: boolean }>(`SELECT EXISTS(SELECT 1 FROM bilibili_user WHERE uid = $1)`, [ uid, ]); } export async function getUnlabelledVideos(client: Client) { const queryResult = await client.queryObject<{ aid: number }>( - `SELECT a.aid FROM all_data a LEFT JOIN labelling_result l ON a.aid = l.aid WHERE l.aid IS NULL`, + `SELECT a.aid FROM bilibili_metadata a LEFT JOIN labelling_result l ON a.aid = l.aid WHERE l.aid IS NULL`, ); return queryResult.rows.map((row) => row.aid); } @@ -29,14 +29,14 @@ export async function insertVideoLabel(client: Client, aid: number, label: numbe export async function getVideoInfoFromAllData(client: Client, aid: number) { const queryResult = await client.queryObject( - `SELECT * FROM all_data WHERE aid = $1`, + `SELECT * FROM bilibili_metadata WHERE aid = $1`, [aid], ); const row = queryResult.rows[0]; let authorInfo = ""; if (row.uid && await userExistsInBiliUsers(client, row.uid)) { const q = await client.queryObject( - `SELECT * FROM bili_user WHERE uid = $1`, + `SELECT * FROM bilibili_user WHERE uid = $1`, [row.uid], ); const userRow = q.rows[0]; @@ -56,8 +56,8 @@ export async function getUnArchivedBiliUsers(client: Client) { const queryResult = await client.queryObject<{ uid: number }>( ` SELECT ad.uid - FROM all_data ad - LEFT JOIN bili_user bu ON ad.uid = bu.uid + FROM bilibili_metadata ad + LEFT JOIN bilibili_user bu ON ad.uid = bu.uid WHERE bu.uid IS NULL; `, [], diff --git a/lib/mq/task/collectSongs.ts b/lib/mq/task/collectSongs.ts index 04e033d..7a7daad 100644 --- a/lib/mq/task/collectSongs.ts +++ b/lib/mq/task/collectSongs.ts @@ -18,9 +18,9 @@ export async function insertIntoSongs(client: Client, aid: number) { INSERT INTO songs (aid, bvid, published_at, duration) VALUES ( $1, - (SELECT bvid FROM all_data WHERE aid = $1), - (SELECT published_at FROM all_data WHERE aid = $1), - (SELECT duration FROM all_data WHERE aid = $1) + (SELECT bvid FROM bilibili_metadata WHERE aid = $1), + (SELECT published_at FROM bilibili_metadata WHERE aid = $1), + (SELECT duration FROM bilibili_metadata WHERE aid = $1) ) ON CONFLICT DO NOTHING `, diff --git a/lib/mq/task/getVideoDetails.ts b/lib/mq/task/getVideoDetails.ts index ead8dd0..1f4287b 100644 --- a/lib/mq/task/getVideoDetails.ts +++ b/lib/mq/task/getVideoDetails.ts @@ -24,19 +24,19 @@ export async function insertVideoInfo(client: Client, aid: number) { const published_at = formatTimestampToPsql(data.View.pubdate); const duration = data.View.duration; await client.queryObject( - `INSERT INTO all_data (aid, bvid, description, uid, tags, title, published_at, duration) + `INSERT INTO bilibili_metadata (aid, bvid, description, uid, tags, title, published_at, duration) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)`, [aid, bvid, desc, uid, tags, title, published_at, duration], ); const userExists = await userExistsInBiliUsers(client, aid); if (!userExists) { await client.queryObject( - `INSERT INTO bili_user (uid, username, "desc", fans) VALUES ($1, $2, $3, $4)`, + `INSERT INTO bilibili_user (uid, username, "desc", fans) VALUES ($1, $2, $3, $4)`, [uid, data.View.owner.name, data.Card.card.sign, data.Card.follower], ); } else { await client.queryObject( - `UPDATE bili_user SET fans = $1 WHERE uid = $2`, + `UPDATE bilibili_user SET fans = $1 WHERE uid = $2`, [data.Card.follower, uid], ); } From 0ff1c78dcc22e0782a65af85d6b9d74c53097c40 Mon Sep 17 00:00:00 2001 From: alikia2x Date: Sun, 16 Mar 2025 14:00:49 +0800 Subject: [PATCH 02/10] fix: incorrect timestamp unit when inserting to database --- lib/mq/task/getVideoDetails.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/mq/task/getVideoDetails.ts b/lib/mq/task/getVideoDetails.ts index 1f4287b..cff4890 100644 --- a/lib/mq/task/getVideoDetails.ts +++ b/lib/mq/task/getVideoDetails.ts @@ -21,7 +21,7 @@ export async function insertVideoInfo(client: Client, aid: number) { .filter((tag) => tag.tag_type in ["old_channel", "topic"]) .map((tag) => tag.tag_name).join(","); const title = data.View.title; - const published_at = formatTimestampToPsql(data.View.pubdate); + const published_at = formatTimestampToPsql(data.View.pubdate * 1000); const duration = data.View.duration; await client.queryObject( `INSERT INTO bilibili_metadata (aid, bvid, description, uid, tags, title, published_at, duration) From a9ac8de5472087a914cb32c2a09261b7dae060ae Mon Sep 17 00:00:00 2001 From: alikia2x Date: Sun, 16 Mar 2025 14:23:11 +0800 Subject: [PATCH 03/10] fix: unhandled timezone mismatch when inserting to database --- lib/mq/task/getVideoDetails.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/mq/task/getVideoDetails.ts b/lib/mq/task/getVideoDetails.ts index cff4890..51a1876 100644 --- a/lib/mq/task/getVideoDetails.ts +++ b/lib/mq/task/getVideoDetails.ts @@ -4,6 +4,7 @@ import { formatTimestampToPsql } from "lib/utils/formatTimestampToPostgre.ts"; import logger from "lib/log/logger.ts"; import { ClassifyVideoQueue } from "lib/mq/index.ts"; import { userExistsInBiliUsers, videoExistsInAllData } from "lib/db/allData.ts"; +import { HOUR, SECOND } from "$std/datetime/constants.ts"; export async function insertVideoInfo(client: Client, aid: number) { const videoExists = await videoExistsInAllData(client, aid); @@ -21,7 +22,7 @@ export async function insertVideoInfo(client: Client, aid: number) { .filter((tag) => tag.tag_type in ["old_channel", "topic"]) .map((tag) => tag.tag_name).join(","); const title = data.View.title; - const published_at = formatTimestampToPsql(data.View.pubdate * 1000); + const published_at = formatTimestampToPsql(data.View.pubdate * SECOND + 8 * HOUR); const duration = data.View.duration; await client.queryObject( `INSERT INTO bilibili_metadata (aid, bvid, description, uid, tags, title, published_at, duration) From cd8aa826e125ac54efee09b8d76471f6d3d05144 Mon Sep 17 00:00:00 2001 From: alikia2x Date: Mon, 17 Mar 2025 00:33:28 +0800 Subject: [PATCH 04/10] fix: prevent videos from being crawled for too long --- lib/db/snapshot.ts | 19 ++++++++++++++++++- lib/mq/exec/snapshotTick.ts | 20 +++++++------------- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/lib/db/snapshot.ts b/lib/db/snapshot.ts index 663a628..5921059 100644 --- a/lib/db/snapshot.ts +++ b/lib/db/snapshot.ts @@ -71,7 +71,7 @@ export async function getSongSnapshotCount(client: Client, aid: number) { } export async function getShortTermEtaPrediction(client: Client, aid: number) { - const queryResult = await client.queryObject<{eta: number}>( + const queryResult = await client.queryObject<{ eta: number }>( ` WITH old_snapshot AS ( SELECT created_at, views @@ -120,6 +120,23 @@ export async function getShortTermEtaPrediction(client: Client, aid: number) { return queryResult.rows[0].eta; } +export async function getIntervalFromLastSnapshotToNow(client: Client, aid: number) { + const queryResult = await client.queryObject<{ interval: number }>( + ` + SELECT EXTRACT(EPOCH FROM (NOW() - created_at)) AS interval + FROM video_snapshot + WHERE aid = $1 + ORDER BY created_at DESC + LIMIT 1; + `, + [aid], + ); + if (queryResult.rows.length === 0) { + return null; + } + return queryResult.rows[0].interval; +} + export async function songEligibleForMilestoneSnapshot(client: Client, aid: number) { const count = await getSongSnapshotCount(client, aid); if (count < 2) { diff --git a/lib/mq/exec/snapshotTick.ts b/lib/mq/exec/snapshotTick.ts index 12443ff..bbc7205 100644 --- a/lib/mq/exec/snapshotTick.ts +++ b/lib/mq/exec/snapshotTick.ts @@ -1,8 +1,9 @@ import { Job } from "bullmq"; -import { MINUTE, SECOND } from "$std/datetime/constants.ts"; +import { HOUR, MINUTE, SECOND } from "$std/datetime/constants.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { db } from "lib/db/init.ts"; import { +getIntervalFromLastSnapshotToNow, getShortTermEtaPrediction, getSongsNearMilestone, getUnsnapshotedSongs, @@ -55,19 +56,12 @@ async function processMilestoneSnapshots(client: Client, vidoesNearMilestone: So let i = 0; for (const snapshot of vidoesNearMilestone) { if (await snapshotScheduled(snapshot.aid)) { - logger.silly( - `Video ${snapshot.aid} is already scheduled for snapshot`, - "mq", - "fn:processMilestoneSnapshots", - ); continue; } - if (await songEligibleForMilestoneSnapshot(client, snapshot.aid) === false) { - logger.silly( - `Video ${snapshot.aid} is not eligible for milestone snapshot`, - "mq", - "fn:processMilestoneSnapshots", - ); + const timeFromLastSnapshot = await getIntervalFromLastSnapshotToNow(client, snapshot.aid); + const lastSnapshotLessThan8Hrs = timeFromLastSnapshot && timeFromLastSnapshot * SECOND < 8 * HOUR; + const notEligible = await songEligibleForMilestoneSnapshot(client, snapshot.aid); + if (notEligible && lastSnapshotLessThan8Hrs) { continue; } const factor = Math.floor(i / 8); @@ -143,7 +137,7 @@ export const takeSnapshotForMilestoneVideoWorker = async (job: Job) => { eta = viewsToIncrease / (incrementSpeed + DELTA); } const scheduledNextSnapshotDelay = eta * SECOND / 3; - const maxInterval = 20 * MINUTE; + const maxInterval = 60 * MINUTE; const minInterval = 1 * SECOND; const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval); await SnapshotQueue.add("snapshotMilestoneVideo", { From 00b52c01f79699ed51e4691c4a28c077a4ac1bc2 Mon Sep 17 00:00:00 2001 From: alikia2x Date: Fri, 21 Mar 2025 20:51:34 +0800 Subject: [PATCH 05/10] fix: unexpected column `bvid` when inserting to `songs` table --- lib/mq/task/collectSongs.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/mq/task/collectSongs.ts b/lib/mq/task/collectSongs.ts index 7a7daad..9c49823 100644 --- a/lib/mq/task/collectSongs.ts +++ b/lib/mq/task/collectSongs.ts @@ -15,10 +15,9 @@ export async function collectSongs(client: Client) { export async function insertIntoSongs(client: Client, aid: number) { await client.queryObject( ` - INSERT INTO songs (aid, bvid, published_at, duration) + INSERT INTO songs (aid, published_at, duration) VALUES ( $1, - (SELECT bvid FROM bilibili_metadata WHERE aid = $1), (SELECT published_at FROM bilibili_metadata WHERE aid = $1), (SELECT duration FROM bilibili_metadata WHERE aid = $1) ) From 8158ce10c02b1c21879ad55970ca0b7fd0fbdfc8 Mon Sep 17 00:00:00 2001 From: alikia2x Date: Fri, 21 Mar 2025 21:06:01 +0800 Subject: [PATCH 06/10] fix: inserting videos into `songs` table regardless of classified label --- lib/mq/exec/classifyVideo.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/mq/exec/classifyVideo.ts b/lib/mq/exec/classifyVideo.ts index 3541892..53aadd9 100644 --- a/lib/mq/exec/classifyVideo.ts +++ b/lib/mq/exec/classifyVideo.ts @@ -26,7 +26,7 @@ export const classifyVideoWorker = async (job: Job) => { await insertVideoLabel(client, aid, label); const exists = await aidExistsInSongs(client, aid); - if (!exists) { + if (!exists && label !== 0) { await insertIntoSongs(client, aid); } From fabb77d98d8459d91592cf6ada0f22ce47fb0ffb Mon Sep 17 00:00:00 2001 From: alikia2x Date: Sat, 22 Mar 2025 00:28:47 +0800 Subject: [PATCH 07/10] fix: inefficient SQL query for getting songs close to milestone --- lib/db/snapshot.ts | 53 +++++++++++++++++++--------------------------- 1 file changed, 22 insertions(+), 31 deletions(-) diff --git a/lib/db/snapshot.ts b/lib/db/snapshot.ts index 5921059..9f8cee4 100644 --- a/lib/db/snapshot.ts +++ b/lib/db/snapshot.ts @@ -5,40 +5,31 @@ import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts"; export async function getSongsNearMilestone(client: Client) { const queryResult = await client.queryObject(` - WITH max_views_per_aid AS ( - -- 找出每个 aid 的最大 views 值,并确保 aid 存在于 songs 表中 - SELECT - vs.aid, - MAX(vs.views) AS max_views - FROM + WITH filtered_snapshots AS ( + SELECT + vs.* + FROM video_snapshot vs - INNER JOIN - songs s - ON - vs.aid = s.aid - GROUP BY - vs.aid + WHERE + (vs.views >= 90000 AND vs.views < 100000) OR + (vs.views >= 900000 AND vs.views < 1000000) ), - filtered_max_views AS ( - -- 筛选出满足条件的最大 views - SELECT - aid, - max_views - FROM - max_views_per_aid - WHERE - (max_views >= 90000 AND max_views < 100000) OR - (max_views >= 900000 AND max_views < 1000000) + ranked_snapshots AS ( + SELECT + fs.*, + ROW_NUMBER() OVER (PARTITION BY fs.aid ORDER BY fs.created_at DESC) as rn, + MAX(fs.views) OVER (PARTITION BY fs.aid) as max_views_per_aid + FROM + filtered_snapshots fs + INNER JOIN + songs s ON fs.aid = s.aid ) - -- 获取符合条件的完整行数据 - SELECT - vs.* - FROM - video_snapshot vs - INNER JOIN - filtered_max_views fmv - ON - vs.aid = fmv.aid AND vs.views = fmv.max_views + SELECT + rs.id, rs.created_at, rs.views, rs.coins, rs.likes, rs.favorites, rs.shares, rs.danmakus, rs.aid, rs.replies + FROM + ranked_snapshots rs + WHERE + rs.rn = 1; `); return queryResult.rows.map((row) => { return { From 1895d601d96480cae1880fe9a7bcb639cc2ca027 Mon Sep 17 00:00:00 2001 From: alikia2x Date: Sat, 22 Mar 2025 00:40:00 +0800 Subject: [PATCH 08/10] update: dynamic delay factor for snapshotMilestoneVideo --- lib/mq/exec/snapshotTick.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/mq/exec/snapshotTick.ts b/lib/mq/exec/snapshotTick.ts index bbc7205..7d4d980 100644 --- a/lib/mq/exec/snapshotTick.ts +++ b/lib/mq/exec/snapshotTick.ts @@ -106,6 +106,8 @@ export const snapshotTickWorker = async (_job: Job) => { } }; +const log = (a: number, b: number = 10) => Math.log(a) / Math.log(b); + export const takeSnapshotForMilestoneVideoWorker = async (job: Job) => { const client = await db.connect(); await setSnapshotScheduled(job.data.aid, true, 20 * 60); @@ -128,6 +130,7 @@ export const takeSnapshotForMilestoneVideoWorker = async (job: Job) => { return; } let eta = await getShortTermEtaPrediction(client, aid); + let factor = 3; if (eta === null) { const DELTA = 0.001; const intervalSeconds = (Date.now() - parseTimestampFromPsql(lastSnapshoted)) / SECOND; @@ -135,8 +138,9 @@ export const takeSnapshotForMilestoneVideoWorker = async (job: Job) => { const incrementSpeed = viewsIncrement / (intervalSeconds + DELTA); const viewsToIncrease = nextMilestone - stat.views; eta = viewsToIncrease / (incrementSpeed + DELTA); + factor = log(2.97 / log(viewsToIncrease + 1), 1.14); } - const scheduledNextSnapshotDelay = eta * SECOND / 3; + const scheduledNextSnapshotDelay = eta * SECOND / factor; const maxInterval = 60 * MINUTE; const minInterval = 1 * SECOND; const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval); From 559c63b43410cf0074c1518e767e0ff35e0531f5 Mon Sep 17 00:00:00 2001 From: alikia2x Date: Sat, 22 Mar 2025 00:42:37 +0800 Subject: [PATCH 09/10] update: more beautiful time interval formatting --- lib/utils/formatSeconds.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/utils/formatSeconds.ts b/lib/utils/formatSeconds.ts index ffabb22..694f94c 100644 --- a/lib/utils/formatSeconds.ts +++ b/lib/utils/formatSeconds.ts @@ -3,7 +3,7 @@ export const formatSeconds = (seconds: number) => { return `${(seconds).toFixed(1)}s`; } if (seconds < 3600) { - return `${Math.floor(seconds / 60)}m${seconds % 60}s`; + return `${Math.floor(seconds / 60)}m${(seconds % 60).toFixed(1)}s`; } return `${Math.floor(seconds / 3600)}h ${((seconds % 3600) / 60).toFixed(2)}m`; }; From e5534cda24f38b78d5058de5d320b4091bd56c0d Mon Sep 17 00:00:00 2001 From: alikia2x Date: Sat, 22 Mar 2025 00:58:36 +0800 Subject: [PATCH 10/10] fix: incorrect filter condition that causes empty tags --- lib/mq/task/getVideoDetails.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/mq/task/getVideoDetails.ts b/lib/mq/task/getVideoDetails.ts index 51a1876..ea5f903 100644 --- a/lib/mq/task/getVideoDetails.ts +++ b/lib/mq/task/getVideoDetails.ts @@ -19,7 +19,7 @@ export async function insertVideoInfo(client: Client, aid: number) { const desc = data.View.desc; const uid = data.View.owner.mid; const tags = data.Tags - .filter((tag) => tag.tag_type in ["old_channel", "topic"]) + .filter((tag) => !["old_channel", "topic"].indexOf(tag.tag_type)) .map((tag) => tag.tag_name).join(","); const title = data.View.title; const published_at = formatTimestampToPsql(data.View.pubdate * SECOND + 8 * HOUR);