improve: better eta prediction
This commit is contained in:
parent
f177d96817
commit
bce4161501
@ -59,20 +59,58 @@ export async function getUnsnapshotedSongs(client: Client) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export async function getSongSnapshotCount(client: Client, aid: number) {
|
export async function getSongSnapshotCount(client: Client, aid: number) {
|
||||||
const queryResult = await client.queryObject<{ count: number }>(`
|
const queryResult = await client.queryObject<{ count: number }>(
|
||||||
|
`
|
||||||
SELECT COUNT(*) AS count
|
SELECT COUNT(*) AS count
|
||||||
FROM video_snapshot
|
FROM video_snapshot
|
||||||
WHERE aid = $1;
|
WHERE aid = $1;
|
||||||
`, [aid]);
|
`,
|
||||||
|
[aid],
|
||||||
|
);
|
||||||
return queryResult.rows[0].count;
|
return queryResult.rows[0].count;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function getShortTermEtaPrediction(client: Client, aid: number) {
|
||||||
|
const queryResult = await client.queryObject<{eta: number}>(
|
||||||
|
`
|
||||||
|
WITH old_snapshot AS (
|
||||||
|
SELECT created_at, views
|
||||||
|
FROM video_snapshot
|
||||||
|
WHERE aid = $1 AND
|
||||||
|
NOW() - created_at > '20 min'
|
||||||
|
ORDER BY created_at DESC
|
||||||
|
LIMIT 1
|
||||||
|
),
|
||||||
|
new_snapshot AS (
|
||||||
|
SELECT created_at, views
|
||||||
|
FROM video_snapshot
|
||||||
|
WHERE aid = $1
|
||||||
|
ORDER BY created_at DESC
|
||||||
|
LIMIT 1
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
CASE
|
||||||
|
WHEN n.views > 100000 THEN (1000000 - n.views) / ((n.views - o.views) / (EXTRACT(EPOCH FROM (n.created_at - o.created_at)) + 0.01))
|
||||||
|
ELSE (100000 - n.views) / ((n.views - o.views) / (EXTRACT(EPOCH FROM (n.created_at - o.created_at)) + 0.01))
|
||||||
|
END AS eta
|
||||||
|
FROM old_snapshot o, new_snapshot n;
|
||||||
|
`,
|
||||||
|
[aid],
|
||||||
|
);
|
||||||
|
if (queryResult.rows.length === 0) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return queryResult.rows[0].eta;
|
||||||
|
}
|
||||||
|
|
||||||
export async function songEligibleForMilestoneSnapshot(client: Client, aid: number) {
|
export async function songEligibleForMilestoneSnapshot(client: Client, aid: number) {
|
||||||
const count = await getSongSnapshotCount(client, aid);
|
const count = await getSongSnapshotCount(client, aid);
|
||||||
if (count < 2) {
|
if (count < 2) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
const queryResult = await client.queryObject<{ views1: number, created_at1: string, views2: number, created_at2: string }>(
|
const queryResult = await client.queryObject<
|
||||||
|
{ views1: number; created_at1: string; views2: number; created_at2: string }
|
||||||
|
>(
|
||||||
`
|
`
|
||||||
WITH latest_snapshot AS (
|
WITH latest_snapshot AS (
|
||||||
SELECT
|
SELECT
|
||||||
|
@ -2,7 +2,12 @@ import { Job } from "bullmq";
|
|||||||
import { MINUTE, SECOND } from "$std/datetime/constants.ts";
|
import { MINUTE, SECOND } from "$std/datetime/constants.ts";
|
||||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
import { db } from "lib/db/init.ts";
|
import { db } from "lib/db/init.ts";
|
||||||
import { getSongsNearMilestone, getUnsnapshotedSongs, songEligibleForMilestoneSnapshot } from "lib/db/snapshot.ts";
|
import {
|
||||||
|
getShortTermEtaPrediction,
|
||||||
|
getSongsNearMilestone,
|
||||||
|
getUnsnapshotedSongs,
|
||||||
|
songEligibleForMilestoneSnapshot,
|
||||||
|
} from "lib/db/snapshot.ts";
|
||||||
import { SnapshotQueue } from "lib/mq/index.ts";
|
import { SnapshotQueue } from "lib/mq/index.ts";
|
||||||
import { insertVideoStats } from "lib/mq/task/getVideoStats.ts";
|
import { insertVideoStats } from "lib/mq/task/getVideoStats.ts";
|
||||||
import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts";
|
import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts";
|
||||||
@ -50,11 +55,19 @@ async function processMilestoneSnapshots(client: Client, vidoesNearMilestone: So
|
|||||||
let i = 0;
|
let i = 0;
|
||||||
for (const snapshot of vidoesNearMilestone) {
|
for (const snapshot of vidoesNearMilestone) {
|
||||||
if (await snapshotScheduled(snapshot.aid)) {
|
if (await snapshotScheduled(snapshot.aid)) {
|
||||||
logger.silly(`Video ${snapshot.aid} is already scheduled for snapshot`, "mq", "fn:processMilestoneSnapshots");
|
logger.silly(
|
||||||
|
`Video ${snapshot.aid} is already scheduled for snapshot`,
|
||||||
|
"mq",
|
||||||
|
"fn:processMilestoneSnapshots",
|
||||||
|
);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (await songEligibleForMilestoneSnapshot(client, snapshot.aid) === false) {
|
if (await songEligibleForMilestoneSnapshot(client, snapshot.aid) === false) {
|
||||||
logger.silly(`Video ${snapshot.aid} is not eligible for milestone snapshot`, "mq", "fn:processMilestoneSnapshots");
|
logger.silly(
|
||||||
|
`Video ${snapshot.aid} is not eligible for milestone snapshot`,
|
||||||
|
"mq",
|
||||||
|
"fn:processMilestoneSnapshots",
|
||||||
|
);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
const factor = Math.floor(i / 8);
|
const factor = Math.floor(i / 8);
|
||||||
@ -103,14 +116,14 @@ export const takeSnapshotForMilestoneVideoWorker = async (job: Job) => {
|
|||||||
const client = await db.connect();
|
const client = await db.connect();
|
||||||
await setSnapshotScheduled(job.data.aid, true, 20 * 60);
|
await setSnapshotScheduled(job.data.aid, true, 20 * 60);
|
||||||
try {
|
try {
|
||||||
const { aid, currentViews, snapshotedAt } = job.data;
|
const aid: number = job.data.aid;
|
||||||
const lastSnapshoted = snapshotedAt;
|
const currentViews: number = job.data.currentViews;
|
||||||
|
const lastSnapshoted: string = job.data.snapshotedAt;
|
||||||
const stat = await insertVideoStats(client, aid, "snapshotMilestoneVideo");
|
const stat = await insertVideoStats(client, aid, "snapshotMilestoneVideo");
|
||||||
if (typeof stat === "number") {
|
if (typeof stat === "number") {
|
||||||
if (stat === -404 || stat === 62002 || stat == 62012) {
|
if (stat === -404 || stat === 62002 || stat == 62012) {
|
||||||
await setSnapshotScheduled(aid, true, 6 * 60 * 60);
|
await setSnapshotScheduled(aid, true, 6 * 60 * 60);
|
||||||
}
|
} else {
|
||||||
else {
|
|
||||||
await setSnapshotScheduled(aid, false, 0);
|
await setSnapshotScheduled(aid, false, 0);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
@ -120,12 +133,15 @@ export const takeSnapshotForMilestoneVideoWorker = async (job: Job) => {
|
|||||||
await setSnapshotScheduled(aid, false, 0);
|
await setSnapshotScheduled(aid, false, 0);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
let eta = await getShortTermEtaPrediction(client, aid);
|
||||||
|
if (eta === null) {
|
||||||
const DELTA = 0.001;
|
const DELTA = 0.001;
|
||||||
const intervalSeconds = (Date.now() - parseTimestampFromPsql(lastSnapshoted)) / SECOND;
|
const intervalSeconds = (Date.now() - parseTimestampFromPsql(lastSnapshoted)) / SECOND;
|
||||||
const viewsIncrement = stat.views - currentViews;
|
const viewsIncrement = stat.views - currentViews;
|
||||||
const incrementSpeed = viewsIncrement / (intervalSeconds + DELTA);
|
const incrementSpeed = viewsIncrement / (intervalSeconds + DELTA);
|
||||||
const viewsToIncrease = nextMilestone - stat.views;
|
const viewsToIncrease = nextMilestone - stat.views;
|
||||||
const eta = viewsToIncrease / (incrementSpeed + DELTA);
|
eta = viewsToIncrease / (incrementSpeed + DELTA);
|
||||||
|
}
|
||||||
const scheduledNextSnapshotDelay = eta * SECOND / 3;
|
const scheduledNextSnapshotDelay = eta * SECOND / 3;
|
||||||
const maxInterval = 20 * MINUTE;
|
const maxInterval = 20 * MINUTE;
|
||||||
const minInterval = 1 * SECOND;
|
const minInterval = 1 * SECOND;
|
||||||
@ -142,7 +158,9 @@ export const takeSnapshotForMilestoneVideoWorker = async (job: Job) => {
|
|||||||
etaInMins: eta / 60,
|
etaInMins: eta / 60,
|
||||||
});
|
});
|
||||||
logger.log(
|
logger.log(
|
||||||
`Scheduled next milestone snapshot for ${aid} in ${formatSeconds(delay / 1000)}, current views: ${stat.views}`,
|
`Scheduled next milestone snapshot for ${aid} in ${
|
||||||
|
formatSeconds(delay / 1000)
|
||||||
|
}, current views: ${stat.views}`,
|
||||||
"mq",
|
"mq",
|
||||||
);
|
);
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
@ -174,8 +192,7 @@ export const takeSnapshotForVideoWorker = async (job: Job) => {
|
|||||||
if (typeof stat === "number") {
|
if (typeof stat === "number") {
|
||||||
if (stat === -404 || stat === 62002 || stat == 62012) {
|
if (stat === -404 || stat === 62002 || stat == 62012) {
|
||||||
await setSnapshotScheduled(aid, true, 6 * 60 * 60);
|
await setSnapshotScheduled(aid, true, 6 * 60 * 60);
|
||||||
}
|
} else {
|
||||||
else {
|
|
||||||
await setSnapshotScheduled(aid, false, 0);
|
await setSnapshotScheduled(aid, false, 0);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
|
Loading…
Reference in New Issue
Block a user