diff --git a/packages/crawler/db/eta.ts b/packages/crawler/db/eta.ts new file mode 100644 index 0000000..a8109d2 --- /dev/null +++ b/packages/crawler/db/eta.ts @@ -0,0 +1,10 @@ +import { Psql } from "@core/db/psql"; + +export async function updateETA(sql: Psql, aid: number, eta: number, speed: number, views: number) { + return sql` + INSERT INTO eta (aid, eta, speed, current_views) + VALUES (${aid}, ${eta}, ${speed}, ${views}) + ON CONFLICT (aid) + DO UPDATE SET eta = ${eta}, speed = ${speed}, current_views = ${views} + `; +} diff --git a/packages/crawler/mq/exec/takeBulkSnapshot.ts b/packages/crawler/mq/exec/takeBulkSnapshot.ts index 7b7ca9b..7b52ce6 100644 --- a/packages/crawler/mq/exec/takeBulkSnapshot.ts +++ b/packages/crawler/mq/exec/takeBulkSnapshot.ts @@ -2,6 +2,7 @@ import { Job } from "bullmq"; import { bulkScheduleSnapshot, bulkSetSnapshotStatus, + getLatestSnapshot, scheduleSnapshot, snapshotScheduleExists } from "db/snapshotSchedule"; @@ -12,6 +13,8 @@ import { HOUR, MINUTE, SECOND } from "@core/lib"; import { getRegularSnapshotInterval } from "mq/task/regularSnapshotInterval"; import { SnapshotScheduleType } from "@core/db/schema"; import { sql } from "@core/db/dbNew"; +import { updateETA } from "db/eta"; +import { closetMilestone } from "./snapshotTick"; export const takeBulkSnapshotForVideosWorker = async (job: Job) => { const schedules: SnapshotScheduleType[] = job.data.schedules; @@ -43,6 +46,15 @@ export const takeBulkSnapshotForVideosWorker = async (job: Job) => { const coins = stat.coin; const shares = stat.share; const favorites = stat.collect; + const currentSnapshot = await getLatestSnapshot(sql, aid); + const DELTA = 0.0001; + const viewsDiff = views - currentSnapshot.views; + const hoursDiff = (new Date().getTime() - currentSnapshot.created_at) / HOUR; + const speed = viewsDiff / (hoursDiff + DELTA); + const target = closetMilestone(views); + const viewsToIncrease = target - views; + const eta = viewsToIncrease / (speed + DELTA); + await updateETA(sql, aid, eta, speed, views); await sql` INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites) VALUES ( diff --git a/packages/crawler/mq/scheduling.ts b/packages/crawler/mq/scheduling.ts index 8012749..31066f0 100644 --- a/packages/crawler/mq/scheduling.ts +++ b/packages/crawler/mq/scheduling.ts @@ -3,6 +3,7 @@ import { truncate } from "utils/truncate"; import { closetMilestone } from "./exec/snapshotTick"; import { HOUR, MINUTE } from "@core/lib"; import type { Psql } from "@core/db/psql.d"; +import { updateETA } from "db/eta"; const log = (value: number, base: number = 10) => Math.log(value) / Math.log(base); @@ -34,8 +35,15 @@ export const getAdjustedShortTermETA = async (sql: Psql, aid: number) => { const currentTimestamp = new Date().getTime(); const timeIntervals = [3 * MINUTE, 20 * MINUTE, HOUR, 3 * HOUR, 6 * HOUR, 72 * HOUR]; + const originalWeight = [3, 5, 3, 2, 2, 3]; + const weight = originalWeight.map((x) => x / originalWeight.reduce((a, b) => a + b, 0)); const DELTA = 0.00001; let minETAHours = Infinity; + let avgSpeed = 0; + + const target = closetMilestone(latestSnapshot.views); + const viewsToIncrease = target - latestSnapshot.views; + const factor = truncate(getFactor(viewsToIncrease), 4.5, 100); for (const timeInterval of timeIntervals) { const date = new Date(currentTimestamp - timeInterval); @@ -45,11 +53,8 @@ export const getAdjustedShortTermETA = async (sql: Psql, aid: number) => { const viewsDiff = latestSnapshot.views - snapshot.views; if (viewsDiff <= 0) continue; const speed = viewsDiff / (hoursDiff + DELTA); - const target = closetMilestone(latestSnapshot.views); - const viewsToIncrease = target - latestSnapshot.views; + avgSpeed += speed * weight[timeIntervals.indexOf(timeInterval)]; const eta = viewsToIncrease / (speed + DELTA); - let factor = getFactor(viewsToIncrease); - factor = truncate(factor, 4.5, 100); const adjustedETA = eta / factor; if (adjustedETA < minETAHours) { minETAHours = adjustedETA; @@ -60,5 +65,9 @@ export const getAdjustedShortTermETA = async (sql: Psql, aid: number) => { minETAHours = Infinity; } + const avgETAHours = viewsToIncrease / (avgSpeed + DELTA); + + await updateETA(sql, aid, avgETAHours, avgSpeed, latestSnapshot.views); + return minETAHours; };