From 2c471059138c4fa030dd5f30137a9c765ff0eb33 Mon Sep 17 00:00:00 2001 From: alikia2x Date: Thu, 17 Apr 2025 02:18:25 +0800 Subject: [PATCH] fix: incorrectly delayed milestone schedule due to UPDATE --- .../db/{allData.ts => bilibili_metadata.ts} | 0 packages/crawler/db/snapshotSchedule.ts | 43 +++++++++++++------ packages/crawler/mq/exec/classifyVideo.ts | 2 +- .../mq/exec/dispatchMilestoneSnapshots.ts | 2 +- packages/crawler/mq/exec/snapshotVideo.ts | 2 +- packages/crawler/mq/task/getVideoDetails.ts | 2 +- packages/crawler/mq/task/queueLatestVideo.ts | 2 +- 7 files changed, 35 insertions(+), 18 deletions(-) rename packages/crawler/db/{allData.ts => bilibili_metadata.ts} (100%) diff --git a/packages/crawler/db/allData.ts b/packages/crawler/db/bilibili_metadata.ts similarity index 100% rename from packages/crawler/db/allData.ts rename to packages/crawler/db/bilibili_metadata.ts diff --git a/packages/crawler/db/snapshotSchedule.ts b/packages/crawler/db/snapshotSchedule.ts index 72c8f3e..1642f61 100644 --- a/packages/crawler/db/snapshotSchedule.ts +++ b/packages/crawler/db/snapshotSchedule.ts @@ -4,6 +4,7 @@ import logger from "log/logger.ts"; import { MINUTE } from "$std/datetime/constants.ts"; import { redis } from "@core/db/redis.ts"; import { Redis } from "ioredis"; +import {parseTimestampFromPsql} from "../utils/formatTimestampToPostgre.ts"; const REDIS_KEY = "cvsa:snapshot_window_counts"; @@ -168,6 +169,20 @@ export async function getLatestSnapshot(client: Client, aid: number): Promise(query, [aid, type]); + return res.rows[0]; +} + /* * Creates a new snapshot schedule record. * @param client The database client. @@ -184,19 +199,21 @@ export async function scheduleSnapshot( let adjustedTime = new Date(targetTime); const hashActiveSchedule = await videoHasActiveScheduleWithType(client, aid, type); if (type == "milestone" && hashActiveSchedule) { - await client.queryObject(` - UPDATE snapshot_schedule - SET started_at = $1 - WHERE aid = $2 - AND type = 'milestone' - AND (status = 'pending' OR status = 'processing') - `, [adjustedTime, aid]); - logger.log( - `Updated snapshot schedule for ${aid} at ${adjustedTime.toISOString()}`, - "mq", - "fn:scheduleSnapshot", - ); - return; + const latestActiveSchedule = await getLatestActiveScheduleWithType(client, aid, type); + const latestScheduleStartedAt = new Date(parseTimestampFromPsql(latestActiveSchedule.started_at!)); + if (latestScheduleStartedAt < adjustedTime) { + await client.queryObject(` + UPDATE snapshot_schedule + SET started_at = $1 + WHERE id = $2 + `, [adjustedTime, latestActiveSchedule.id]); + logger.log( + `Updated snapshot schedule for ${aid} at ${adjustedTime.toISOString()}`, + "mq", + "fn:scheduleSnapshot", + ); + return; + } } if (hashActiveSchedule && !force) return; if (type !== "milestone" && type !== "new") { diff --git a/packages/crawler/mq/exec/classifyVideo.ts b/packages/crawler/mq/exec/classifyVideo.ts index f4c1f43..916a8ee 100644 --- a/packages/crawler/mq/exec/classifyVideo.ts +++ b/packages/crawler/mq/exec/classifyVideo.ts @@ -1,6 +1,6 @@ import { Job } from "bullmq"; import { db } from "db/init.ts"; -import { getUnlabelledVideos, getVideoInfoFromAllData, insertVideoLabel } from "db/allData.ts"; +import { getUnlabelledVideos, getVideoInfoFromAllData, insertVideoLabel } from "../../db/bilibili_metadata.ts"; import Akari from "ml/akari.ts"; import { ClassifyVideoQueue } from "mq/index.ts"; import logger from "log/logger.ts"; diff --git a/packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts b/packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts index 3be7d0e..1d8d8c7 100644 --- a/packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts +++ b/packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts @@ -17,7 +17,7 @@ export const dispatchMilestoneSnapshotsWorker = (_job: Job): Promise => if (eta > 144) continue; const now = Date.now(); const scheduledNextSnapshotDelay = eta * HOUR; - const maxInterval = 4 * HOUR; + const maxInterval = 1 * HOUR; const minInterval = 1 * SECOND; const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval); const targetTime = now + delay; diff --git a/packages/crawler/mq/exec/snapshotVideo.ts b/packages/crawler/mq/exec/snapshotVideo.ts index dbb0fbc..488f729 100644 --- a/packages/crawler/mq/exec/snapshotVideo.ts +++ b/packages/crawler/mq/exec/snapshotVideo.ts @@ -5,7 +5,7 @@ import { scheduleSnapshot, setSnapshotStatus, snapshotScheduleExists } from "db/ import logger from "log/logger.ts"; import { HOUR, MINUTE, SECOND } from "@std/datetime"; import { lockManager } from "mq/lockManager.ts"; -import { getBiliVideoStatus, setBiliVideoStatus } from "db/allData.ts"; +import { getBiliVideoStatus, setBiliVideoStatus } from "../../db/bilibili_metadata.ts"; import { insertVideoSnapshot } from "mq/task/getVideoStats.ts"; import { getSongsPublihsedAt } from "db/songs.ts"; import { getAdjustedShortTermETA } from "mq/scheduling.ts"; diff --git a/packages/crawler/mq/task/getVideoDetails.ts b/packages/crawler/mq/task/getVideoDetails.ts index 72addf5..40224b0 100644 --- a/packages/crawler/mq/task/getVideoDetails.ts +++ b/packages/crawler/mq/task/getVideoDetails.ts @@ -3,7 +3,7 @@ import { getVideoDetails } from "net/getVideoDetails.ts"; import { formatTimestampToPsql } from "utils/formatTimestampToPostgre.ts"; import logger from "log/logger.ts"; import { ClassifyVideoQueue } from "mq/index.ts"; -import { userExistsInBiliUsers, videoExistsInAllData } from "db/allData.ts"; +import { userExistsInBiliUsers, videoExistsInAllData } from "../../db/bilibili_metadata.ts"; import { HOUR, SECOND } from "@std/datetime"; export async function insertVideoInfo(client: Client, aid: number) { diff --git a/packages/crawler/mq/task/queueLatestVideo.ts b/packages/crawler/mq/task/queueLatestVideo.ts index 47f8bcc..6327860 100644 --- a/packages/crawler/mq/task/queueLatestVideo.ts +++ b/packages/crawler/mq/task/queueLatestVideo.ts @@ -1,6 +1,6 @@ import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { getLatestVideoAids } from "net/getLatestVideoAids.ts"; -import { videoExistsInAllData } from "db/allData.ts"; +import { videoExistsInAllData } from "../../db/bilibili_metadata.ts"; import { sleep } from "utils/sleep.ts"; import { SECOND } from "@std/datetime"; import logger from "log/logger.ts";