fix: incorrectly delayed milestone schedule due to UPDATE
This commit is contained in:
parent
6eaaf921d6
commit
2c47105913
@ -4,6 +4,7 @@ import logger from "log/logger.ts";
|
|||||||
import { MINUTE } from "$std/datetime/constants.ts";
|
import { MINUTE } from "$std/datetime/constants.ts";
|
||||||
import { redis } from "@core/db/redis.ts";
|
import { redis } from "@core/db/redis.ts";
|
||||||
import { Redis } from "ioredis";
|
import { Redis } from "ioredis";
|
||||||
|
import {parseTimestampFromPsql} from "../utils/formatTimestampToPostgre.ts";
|
||||||
|
|
||||||
const REDIS_KEY = "cvsa:snapshot_window_counts";
|
const REDIS_KEY = "cvsa:snapshot_window_counts";
|
||||||
|
|
||||||
@ -168,6 +169,20 @@ export async function getLatestSnapshot(client: Client, aid: number): Promise<Sn
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function getLatestActiveScheduleWithType(client: Client, aid: number, type: string) {
|
||||||
|
const query: string = `
|
||||||
|
SELECT *
|
||||||
|
FROM snapshot_schedule
|
||||||
|
WHERE aid = $1
|
||||||
|
AND type = $2
|
||||||
|
AND (status = 'pending' OR status = 'processing')
|
||||||
|
ORDER BY started_at DESC
|
||||||
|
LIMIT 1
|
||||||
|
`
|
||||||
|
const res = await client.queryObject<SnapshotScheduleType>(query, [aid, type]);
|
||||||
|
return res.rows[0];
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Creates a new snapshot schedule record.
|
* Creates a new snapshot schedule record.
|
||||||
* @param client The database client.
|
* @param client The database client.
|
||||||
@ -184,19 +199,21 @@ export async function scheduleSnapshot(
|
|||||||
let adjustedTime = new Date(targetTime);
|
let adjustedTime = new Date(targetTime);
|
||||||
const hashActiveSchedule = await videoHasActiveScheduleWithType(client, aid, type);
|
const hashActiveSchedule = await videoHasActiveScheduleWithType(client, aid, type);
|
||||||
if (type == "milestone" && hashActiveSchedule) {
|
if (type == "milestone" && hashActiveSchedule) {
|
||||||
await client.queryObject(`
|
const latestActiveSchedule = await getLatestActiveScheduleWithType(client, aid, type);
|
||||||
UPDATE snapshot_schedule
|
const latestScheduleStartedAt = new Date(parseTimestampFromPsql(latestActiveSchedule.started_at!));
|
||||||
SET started_at = $1
|
if (latestScheduleStartedAt < adjustedTime) {
|
||||||
WHERE aid = $2
|
await client.queryObject(`
|
||||||
AND type = 'milestone'
|
UPDATE snapshot_schedule
|
||||||
AND (status = 'pending' OR status = 'processing')
|
SET started_at = $1
|
||||||
`, [adjustedTime, aid]);
|
WHERE id = $2
|
||||||
logger.log(
|
`, [adjustedTime, latestActiveSchedule.id]);
|
||||||
`Updated snapshot schedule for ${aid} at ${adjustedTime.toISOString()}`,
|
logger.log(
|
||||||
"mq",
|
`Updated snapshot schedule for ${aid} at ${adjustedTime.toISOString()}`,
|
||||||
"fn:scheduleSnapshot",
|
"mq",
|
||||||
);
|
"fn:scheduleSnapshot",
|
||||||
return;
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (hashActiveSchedule && !force) return;
|
if (hashActiveSchedule && !force) return;
|
||||||
if (type !== "milestone" && type !== "new") {
|
if (type !== "milestone" && type !== "new") {
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
import { Job } from "bullmq";
|
import { Job } from "bullmq";
|
||||||
import { db } from "db/init.ts";
|
import { db } from "db/init.ts";
|
||||||
import { getUnlabelledVideos, getVideoInfoFromAllData, insertVideoLabel } from "db/allData.ts";
|
import { getUnlabelledVideos, getVideoInfoFromAllData, insertVideoLabel } from "../../db/bilibili_metadata.ts";
|
||||||
import Akari from "ml/akari.ts";
|
import Akari from "ml/akari.ts";
|
||||||
import { ClassifyVideoQueue } from "mq/index.ts";
|
import { ClassifyVideoQueue } from "mq/index.ts";
|
||||||
import logger from "log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
|
@ -17,7 +17,7 @@ export const dispatchMilestoneSnapshotsWorker = (_job: Job): Promise<void> =>
|
|||||||
if (eta > 144) continue;
|
if (eta > 144) continue;
|
||||||
const now = Date.now();
|
const now = Date.now();
|
||||||
const scheduledNextSnapshotDelay = eta * HOUR;
|
const scheduledNextSnapshotDelay = eta * HOUR;
|
||||||
const maxInterval = 4 * HOUR;
|
const maxInterval = 1 * HOUR;
|
||||||
const minInterval = 1 * SECOND;
|
const minInterval = 1 * SECOND;
|
||||||
const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval);
|
const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval);
|
||||||
const targetTime = now + delay;
|
const targetTime = now + delay;
|
||||||
|
@ -5,7 +5,7 @@ import { scheduleSnapshot, setSnapshotStatus, snapshotScheduleExists } from "db/
|
|||||||
import logger from "log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
import { HOUR, MINUTE, SECOND } from "@std/datetime";
|
import { HOUR, MINUTE, SECOND } from "@std/datetime";
|
||||||
import { lockManager } from "mq/lockManager.ts";
|
import { lockManager } from "mq/lockManager.ts";
|
||||||
import { getBiliVideoStatus, setBiliVideoStatus } from "db/allData.ts";
|
import { getBiliVideoStatus, setBiliVideoStatus } from "../../db/bilibili_metadata.ts";
|
||||||
import { insertVideoSnapshot } from "mq/task/getVideoStats.ts";
|
import { insertVideoSnapshot } from "mq/task/getVideoStats.ts";
|
||||||
import { getSongsPublihsedAt } from "db/songs.ts";
|
import { getSongsPublihsedAt } from "db/songs.ts";
|
||||||
import { getAdjustedShortTermETA } from "mq/scheduling.ts";
|
import { getAdjustedShortTermETA } from "mq/scheduling.ts";
|
||||||
|
@ -3,7 +3,7 @@ import { getVideoDetails } from "net/getVideoDetails.ts";
|
|||||||
import { formatTimestampToPsql } from "utils/formatTimestampToPostgre.ts";
|
import { formatTimestampToPsql } from "utils/formatTimestampToPostgre.ts";
|
||||||
import logger from "log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
import { ClassifyVideoQueue } from "mq/index.ts";
|
import { ClassifyVideoQueue } from "mq/index.ts";
|
||||||
import { userExistsInBiliUsers, videoExistsInAllData } from "db/allData.ts";
|
import { userExistsInBiliUsers, videoExistsInAllData } from "../../db/bilibili_metadata.ts";
|
||||||
import { HOUR, SECOND } from "@std/datetime";
|
import { HOUR, SECOND } from "@std/datetime";
|
||||||
|
|
||||||
export async function insertVideoInfo(client: Client, aid: number) {
|
export async function insertVideoInfo(client: Client, aid: number) {
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
import { getLatestVideoAids } from "net/getLatestVideoAids.ts";
|
import { getLatestVideoAids } from "net/getLatestVideoAids.ts";
|
||||||
import { videoExistsInAllData } from "db/allData.ts";
|
import { videoExistsInAllData } from "../../db/bilibili_metadata.ts";
|
||||||
import { sleep } from "utils/sleep.ts";
|
import { sleep } from "utils/sleep.ts";
|
||||||
import { SECOND } from "@std/datetime";
|
import { SECOND } from "@std/datetime";
|
||||||
import logger from "log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
|
Loading…
Reference in New Issue
Block a user