fix: job consumption rate too low, add outdated job cleanup
This commit is contained in:
parent
48b1130cba
commit
fa058b22fe
@ -217,7 +217,7 @@ export async function getSnapshotsInNextSecond(client: Client) {
|
|||||||
ELSE 1
|
ELSE 1
|
||||||
END,
|
END,
|
||||||
started_at
|
started_at
|
||||||
LIMIT 3;
|
LIMIT 10;
|
||||||
`;
|
`;
|
||||||
const res = await client.queryObject<SnapshotScheduleType>(query, []);
|
const res = await client.queryObject<SnapshotScheduleType>(query, []);
|
||||||
return res.rows;
|
return res.rows;
|
||||||
|
@ -31,7 +31,7 @@ const priorityMap: { [key: string]: number } = {
|
|||||||
const snapshotTypeToTaskMap: { [key: string]: string } = {
|
const snapshotTypeToTaskMap: { [key: string]: string } = {
|
||||||
"milestone": "snapshotMilestoneVideo",
|
"milestone": "snapshotMilestoneVideo",
|
||||||
"normal": "snapshotVideo",
|
"normal": "snapshotVideo",
|
||||||
"new": "snapshotMilestoneVideo"
|
"new": "snapshotMilestoneVideo",
|
||||||
};
|
};
|
||||||
|
|
||||||
export const snapshotTickWorker = async (_job: Job) => {
|
export const snapshotTickWorker = async (_job: Job) => {
|
||||||
@ -134,7 +134,7 @@ export const collectMilestoneSnapshotsWorker = async (_job: Job) => {
|
|||||||
|
|
||||||
const getRegularSnapshotInterval = async (client: Client, aid: number) => {
|
const getRegularSnapshotInterval = async (client: Client, aid: number) => {
|
||||||
const now = Date.now();
|
const now = Date.now();
|
||||||
const date = new Date(now - 24 * HOUR);
|
const date = new Date(now - 24 * HOUR);
|
||||||
const oldSnapshot = await findClosestSnapshot(client, aid, date);
|
const oldSnapshot = await findClosestSnapshot(client, aid, date);
|
||||||
const latestSnapshot = await getLatestSnapshot(client, aid);
|
const latestSnapshot = await getLatestSnapshot(client, aid);
|
||||||
if (!oldSnapshot || !latestSnapshot) return 0;
|
if (!oldSnapshot || !latestSnapshot) return 0;
|
||||||
@ -148,7 +148,7 @@ const getRegularSnapshotInterval = async (client: Client, aid: number) => {
|
|||||||
if (speedPerDay < 120) return 24;
|
if (speedPerDay < 120) return 24;
|
||||||
if (speedPerDay < 320) return 12;
|
if (speedPerDay < 320) return 12;
|
||||||
return 6;
|
return 6;
|
||||||
}
|
};
|
||||||
|
|
||||||
export const regularSnapshotsWorker = async (_job: Job) => {
|
export const regularSnapshotsWorker = async (_job: Job) => {
|
||||||
const client = await db.connect();
|
const client = await db.connect();
|
||||||
@ -207,26 +207,25 @@ export const takeSnapshotForVideoWorker = async (job: Job) => {
|
|||||||
const interval = await getRegularSnapshotInterval(client, aid);
|
const interval = await getRegularSnapshotInterval(client, aid);
|
||||||
await scheduleSnapshot(client, aid, type, Date.now() + interval * HOUR);
|
await scheduleSnapshot(client, aid, type, Date.now() + interval * HOUR);
|
||||||
return `DONE`;
|
return `DONE`;
|
||||||
}
|
} else if (type === "new") {
|
||||||
else if (type === "new") {
|
|
||||||
const publihsedAt = await getSongsPublihsedAt(client, aid);
|
const publihsedAt = await getSongsPublihsedAt(client, aid);
|
||||||
const timeSincePublished = stat.time - publihsedAt!;
|
const timeSincePublished = stat.time - publihsedAt!;
|
||||||
const viewsPerHour = stat.views / timeSincePublished * HOUR;
|
const viewsPerHour = stat.views / timeSincePublished * HOUR;
|
||||||
if (timeSincePublished > 48 * HOUR) {
|
if (timeSincePublished > 48 * HOUR) {
|
||||||
return `DONE`
|
return `DONE`;
|
||||||
}
|
}
|
||||||
if (timeSincePublished > 2 * HOUR && viewsPerHour < 10) {
|
if (timeSincePublished > 2 * HOUR && viewsPerHour < 10) {
|
||||||
return `DONE`
|
return `DONE`;
|
||||||
}
|
}
|
||||||
let intervalMins = 240;
|
let intervalMins = 240;
|
||||||
if (viewsPerHour > 50) {
|
if (viewsPerHour > 50) {
|
||||||
intervalMins = 120;
|
intervalMins = 120;
|
||||||
}
|
}
|
||||||
if (viewsPerHour > 100) {
|
if (viewsPerHour > 100) {
|
||||||
intervalMins = 60;
|
intervalMins = 60;
|
||||||
}
|
}
|
||||||
if (viewsPerHour > 1000) {
|
if (viewsPerHour > 1000) {
|
||||||
intervalMins = 15;
|
intervalMins = 15;
|
||||||
}
|
}
|
||||||
await scheduleSnapshot(client, aid, type, Date.now() + intervalMins * MINUTE);
|
await scheduleSnapshot(client, aid, type, Date.now() + intervalMins * MINUTE);
|
||||||
}
|
}
|
||||||
@ -254,3 +253,28 @@ export const takeSnapshotForVideoWorker = async (job: Job) => {
|
|||||||
client.release();
|
client.release();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
export const scheduleCleanupWorker = async (_job: Job) => {
|
||||||
|
const client = await db.connect();
|
||||||
|
try {
|
||||||
|
const query = `
|
||||||
|
SELECT id, aid, type
|
||||||
|
FROM snapshot_schedule
|
||||||
|
WHERE status IN ('pending', 'processing')
|
||||||
|
AND started_at < NOW() - INTERVAL '5 minutes'
|
||||||
|
`;
|
||||||
|
const { rows } = await client.queryObject<{ id: bigint; aid: bigint; type: string }>(query);
|
||||||
|
if (rows.length === 0) return;
|
||||||
|
for (const row of rows) {
|
||||||
|
const id = Number(row.id);
|
||||||
|
const aid = Number(row.aid);
|
||||||
|
const type = row.type;
|
||||||
|
await setSnapshotStatus(client, id, "timeout");
|
||||||
|
await scheduleSnapshot(client, aid, type, Date.now() + 10 * SECOND);
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
logger.error(e as Error, "mq", "fn:scheduleCleanupWorker");
|
||||||
|
} finally {
|
||||||
|
client.release();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
@ -10,6 +10,7 @@ import {
|
|||||||
regularSnapshotsWorker,
|
regularSnapshotsWorker,
|
||||||
snapshotTickWorker,
|
snapshotTickWorker,
|
||||||
takeSnapshotForVideoWorker,
|
takeSnapshotForVideoWorker,
|
||||||
|
scheduleCleanupWorker
|
||||||
} from "lib/mq/exec/snapshotTick.ts";
|
} from "lib/mq/exec/snapshotTick.ts";
|
||||||
|
|
||||||
Deno.addSignalListener("SIGINT", async () => {
|
Deno.addSignalListener("SIGINT", async () => {
|
||||||
@ -80,6 +81,9 @@ const snapshotWorker = new Worker(
|
|||||||
case "dispatchRegularSnapshots":
|
case "dispatchRegularSnapshots":
|
||||||
await regularSnapshotsWorker(job);
|
await regularSnapshotsWorker(job);
|
||||||
break;
|
break;
|
||||||
|
case "scheduleCleanup":
|
||||||
|
await scheduleCleanupWorker(job);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user