Compare commits

...

2 Commits

Author SHA1 Message Date
48b1130cba
feat: continuous monitoring of new songs 2025-03-24 04:53:43 +08:00
42db333d1a
temp: schedule for new songs 2025-03-24 04:40:10 +08:00
4 changed files with 47 additions and 4 deletions

View File

@ -162,7 +162,7 @@ export async function getSnapshotScheduleCountWithinRange(client: Client, start:
export async function scheduleSnapshot(client: Client, aid: number, type: string, targetTime: number) { export async function scheduleSnapshot(client: Client, aid: number, type: string, targetTime: number) {
if (await videoHasActiveSchedule(client, aid)) return; if (await videoHasActiveSchedule(client, aid)) return;
let adjustedTime = new Date(targetTime); let adjustedTime = new Date(targetTime);
if (type !== "milestone") { if (type !== "milestone" && type !== "new") {
adjustedTime = await adjustSnapshotTime(new Date(targetTime), 1000, redis); adjustedTime = await adjustSnapshotTime(new Date(targetTime), 1000, redis);
} }
logger.log(`Scheduled snapshot for ${aid} at ${adjustedTime.toISOString()}`, "mq", "fn:scheduleSnapshot"); logger.log(`Scheduled snapshot for ${aid} at ${adjustedTime.toISOString()}`, "mq", "fn:scheduleSnapshot");

View File

@ -1,4 +1,5 @@
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts";
export async function getNotCollectedSongs(client: Client) { export async function getNotCollectedSongs(client: Client) {
const queryResult = await client.queryObject<{ aid: number }>(` const queryResult = await client.queryObject<{ aid: number }>(`
@ -22,8 +23,23 @@ export async function aidExistsInSongs(client: Client, aid: number) {
FROM songs FROM songs
WHERE aid = $1 WHERE aid = $1
); );
`, `,
[aid], [aid],
); );
return queryResult.rows[0].exists; return queryResult.rows[0].exists;
} }
export async function getSongsPublihsedAt(client: Client, aid: number) {
const queryResult = await client.queryObject<{ published_at: string }>(
`
SELECT published_at
FROM songs
WHERE aid = $1;
`,
[aid],
);
if (queryResult.rows.length === 0) {
return null;
}
return parseTimestampFromPsql(queryResult.rows[0].published_at);
}

View File

@ -21,6 +21,7 @@ import { NetSchedulerError } from "lib/mq/scheduler.ts";
import { setBiliVideoStatus } from "lib/db/allData.ts"; import { setBiliVideoStatus } from "lib/db/allData.ts";
import { truncate } from "lib/utils/truncate.ts"; import { truncate } from "lib/utils/truncate.ts";
import { lockManager } from "lib/mq/lockManager.ts"; import { lockManager } from "lib/mq/lockManager.ts";
import { getSongsPublihsedAt } from "lib/db/songs.ts";
const priorityMap: { [key: string]: number } = { const priorityMap: { [key: string]: number } = {
"milestone": 1, "milestone": 1,
@ -30,6 +31,7 @@ const priorityMap: { [key: string]: number } = {
const snapshotTypeToTaskMap: { [key: string]: string } = { const snapshotTypeToTaskMap: { [key: string]: string } = {
"milestone": "snapshotMilestoneVideo", "milestone": "snapshotMilestoneVideo",
"normal": "snapshotVideo", "normal": "snapshotVideo",
"new": "snapshotMilestoneVideo"
}; };
export const snapshotTickWorker = async (_job: Job) => { export const snapshotTickWorker = async (_job: Job) => {
@ -164,7 +166,6 @@ export const regularSnapshotsWorker = async (_job: Job) => {
const now = Date.now(); const now = Date.now();
const lastSnapshotedAt = latestSnapshot?.time ?? now; const lastSnapshotedAt = latestSnapshot?.time ?? now;
const interval = await getRegularSnapshotInterval(client, aid); const interval = await getRegularSnapshotInterval(client, aid);
logger.debug(`${interval} hours for aid ${aid}`, "mq")
const targetTime = truncate(lastSnapshotedAt + interval * HOUR, now + 1, now + 100000 * WEEK); const targetTime = truncate(lastSnapshotedAt + interval * HOUR, now + 1, now + 100000 * WEEK);
await scheduleSnapshot(client, aid, "normal", targetTime); await scheduleSnapshot(client, aid, "normal", targetTime);
if (now - startedAt > 25 * MINUTE) { if (now - startedAt > 25 * MINUTE) {
@ -203,9 +204,32 @@ export const takeSnapshotForVideoWorker = async (job: Job) => {
} }
await setSnapshotStatus(client, id, "completed"); await setSnapshotStatus(client, id, "completed");
if (type === "normal") { if (type === "normal") {
await scheduleSnapshot(client, aid, type, Date.now() + 24 * HOUR); const interval = await getRegularSnapshotInterval(client, aid);
await scheduleSnapshot(client, aid, type, Date.now() + interval * HOUR);
return `DONE`; return `DONE`;
} }
else if (type === "new") {
const publihsedAt = await getSongsPublihsedAt(client, aid);
const timeSincePublished = stat.time - publihsedAt!;
const viewsPerHour = stat.views / timeSincePublished * HOUR;
if (timeSincePublished > 48 * HOUR) {
return `DONE`
}
if (timeSincePublished > 2 * HOUR && viewsPerHour < 10) {
return `DONE`
}
let intervalMins = 240;
if (viewsPerHour > 50) {
intervalMins = 120;
}
if (viewsPerHour > 100) {
intervalMins = 60;
}
if (viewsPerHour > 1000) {
intervalMins = 15;
}
await scheduleSnapshot(client, aid, type, Date.now() + intervalMins * MINUTE);
}
if (type !== "milestone") return `DONE`; if (type !== "milestone") return `DONE`;
const eta = await getAdjustedShortTermETA(client, aid); const eta = await getAdjustedShortTermETA(client, aid);
if (eta > 72) return "ETA_TOO_LONG"; if (eta > 72) return "ETA_TOO_LONG";

View File

@ -1,6 +1,8 @@
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { aidExistsInSongs, getNotCollectedSongs } from "lib/db/songs.ts"; import { aidExistsInSongs, getNotCollectedSongs } from "lib/db/songs.ts";
import logger from "lib/log/logger.ts"; import logger from "lib/log/logger.ts";
import { scheduleSnapshot } from "lib/db/snapshotSchedule.ts";
import { MINUTE } from "$std/datetime/constants.ts";
export async function collectSongs(client: Client) { export async function collectSongs(client: Client) {
const aids = await getNotCollectedSongs(client); const aids = await getNotCollectedSongs(client);
@ -8,6 +10,7 @@ export async function collectSongs(client: Client) {
const exists = await aidExistsInSongs(client, aid); const exists = await aidExistsInSongs(client, aid);
if (exists) continue; if (exists) continue;
await insertIntoSongs(client, aid); await insertIntoSongs(client, aid);
await scheduleSnapshot(client, aid, "new", Date.now() + 10 * MINUTE);
logger.log(`Video ${aid} was added into the songs table.`, "mq", "fn:collectSongs"); logger.log(`Video ${aid} was added into the songs table.`, "mq", "fn:collectSongs");
} }
} }