update: completed snapshot for videos close to milestone
This commit is contained in:
parent
5cc2afeb0e
commit
7ac2d2c217
@ -1,5 +1,7 @@
|
|||||||
|
import { DAY, SECOND } from "$std/datetime/constants.ts";
|
||||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
import { VideoSnapshotType } from "lib/db/schema.d.ts";
|
import { VideoSnapshotType } from "lib/db/schema.d.ts";
|
||||||
|
import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts";
|
||||||
|
|
||||||
export async function getSongsNearMilestone(client: Client) {
|
export async function getSongsNearMilestone(client: Client) {
|
||||||
const queryResult = await client.queryObject<VideoSnapshotType>(`
|
const queryResult = await client.queryObject<VideoSnapshotType>(`
|
||||||
@ -42,7 +44,7 @@ export async function getSongsNearMilestone(client: Client) {
|
|||||||
return {
|
return {
|
||||||
...row,
|
...row,
|
||||||
aid: Number(row.aid),
|
aid: Number(row.aid),
|
||||||
}
|
};
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -53,5 +55,81 @@ export async function getUnsnapshotedSongs(client: Client) {
|
|||||||
LEFT JOIN video_snapshot v ON s.aid = v.aid
|
LEFT JOIN video_snapshot v ON s.aid = v.aid
|
||||||
WHERE v.aid IS NULL;
|
WHERE v.aid IS NULL;
|
||||||
`);
|
`);
|
||||||
return queryResult.rows.map(row => Number(row.aid));
|
return queryResult.rows.map((row) => Number(row.aid));
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function getSongSnapshotCount(client: Client, aid: number) {
|
||||||
|
const queryResult = await client.queryObject<{ count: number }>(`
|
||||||
|
SELECT COUNT(*) AS count
|
||||||
|
FROM video_snapshot
|
||||||
|
WHERE aid = $1;
|
||||||
|
`, [aid]);
|
||||||
|
return queryResult.rows[0].count;
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function songEligibleForMilestoneSnapshot(client: Client, aid: number) {
|
||||||
|
const count = await getSongSnapshotCount(client, aid);
|
||||||
|
if (count < 2) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
const queryResult = await client.queryObject<{ views1: number, created_at1: string, views2: number, created_at2: string }>(
|
||||||
|
`
|
||||||
|
WITH latest_snapshot AS (
|
||||||
|
SELECT
|
||||||
|
aid,
|
||||||
|
views,
|
||||||
|
created_at
|
||||||
|
FROM video_snapshot
|
||||||
|
WHERE aid = $1
|
||||||
|
ORDER BY created_at DESC
|
||||||
|
LIMIT 1
|
||||||
|
),
|
||||||
|
pairs AS (
|
||||||
|
SELECT
|
||||||
|
a.views AS views1,
|
||||||
|
a.created_at AS created_at1,
|
||||||
|
b.views AS views2,
|
||||||
|
b.created_at AS created_at2,
|
||||||
|
(b.created_at - a.created_at) AS interval
|
||||||
|
FROM video_snapshot a
|
||||||
|
JOIN latest_snapshot b
|
||||||
|
ON a.aid = b.aid
|
||||||
|
AND a.created_at < b.created_at
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
views1,
|
||||||
|
created_at1,
|
||||||
|
views2,
|
||||||
|
created_at2
|
||||||
|
FROM (
|
||||||
|
SELECT
|
||||||
|
*,
|
||||||
|
ROW_NUMBER() OVER (
|
||||||
|
ORDER BY
|
||||||
|
CASE WHEN interval <= INTERVAL '3 days' THEN 0 ELSE 1 END,
|
||||||
|
CASE WHEN interval <= INTERVAL '3 days' THEN -interval ELSE interval END
|
||||||
|
) AS rn
|
||||||
|
FROM pairs
|
||||||
|
) ranked
|
||||||
|
WHERE rn = 1;
|
||||||
|
`,
|
||||||
|
[aid],
|
||||||
|
);
|
||||||
|
if (queryResult.rows.length === 0) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
const recentViewsData = queryResult.rows[0];
|
||||||
|
const time1 = parseTimestampFromPsql(recentViewsData.created_at1);
|
||||||
|
const time2 = parseTimestampFromPsql(recentViewsData.created_at2);
|
||||||
|
const intervalSec = (time2 - time1) / SECOND;
|
||||||
|
const views1 = recentViewsData.views1;
|
||||||
|
const views2 = recentViewsData.views2;
|
||||||
|
const viewsDiff = views2 - views1;
|
||||||
|
if (viewsDiff == 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
const nextMilestone = views2 >= 100000 ? 1000000 : 100000;
|
||||||
|
const expectedViewsDiff = nextMilestone - views2;
|
||||||
|
const expectedIntervalSec = expectedViewsDiff / viewsDiff * intervalSec;
|
||||||
|
return expectedIntervalSec <= 3 * DAY;
|
||||||
}
|
}
|
||||||
|
@ -52,7 +52,7 @@ const createTransport = (level: string, filename: string) => {
|
|||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
const verboseLogPath = Deno.env.get("LOG_VERBOSE") ?? "logs/verbose.log";
|
const sillyLogPath = Deno.env.get("LOG_VERBOSE") ?? "logs/verbose.log";
|
||||||
const warnLogPath = Deno.env.get("LOG_WARN") ?? "logs/warn.log";
|
const warnLogPath = Deno.env.get("LOG_WARN") ?? "logs/warn.log";
|
||||||
const errorLogPath = Deno.env.get("LOG_ERROR") ?? "logs/error.log";
|
const errorLogPath = Deno.env.get("LOG_ERROR") ?? "logs/error.log";
|
||||||
|
|
||||||
@ -68,13 +68,16 @@ const winstonLogger = winston.createLogger({
|
|||||||
customFormat,
|
customFormat,
|
||||||
),
|
),
|
||||||
}),
|
}),
|
||||||
createTransport("verbose", verboseLogPath),
|
createTransport("silly", sillyLogPath),
|
||||||
createTransport("warn", warnLogPath),
|
createTransport("warn", warnLogPath),
|
||||||
createTransport("error", errorLogPath),
|
createTransport("error", errorLogPath),
|
||||||
],
|
],
|
||||||
});
|
});
|
||||||
|
|
||||||
const logger = {
|
const logger = {
|
||||||
|
silly: (message: string, service?: string, codePath?: string) => {
|
||||||
|
winstonLogger.silly(message, { service, codePath });
|
||||||
|
},
|
||||||
verbose: (message: string, service?: string, codePath?: string) => {
|
verbose: (message: string, service?: string, codePath?: string) => {
|
||||||
winstonLogger.verbose(message, { service, codePath });
|
winstonLogger.verbose(message, { service, codePath });
|
||||||
},
|
},
|
||||||
|
@ -1,13 +1,15 @@
|
|||||||
import { Job } from "bullmq";
|
import { Job } from "bullmq";
|
||||||
import { MINUTE, SECOND } from "$std/datetime/constants.ts";
|
import { MINUTE, SECOND } from "$std/datetime/constants.ts";
|
||||||
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
import { db } from "lib/db/init.ts";
|
import { db } from "lib/db/init.ts";
|
||||||
import { getSongsNearMilestone, getUnsnapshotedSongs } from "lib/db/snapshot.ts";
|
import { getSongsNearMilestone, getUnsnapshotedSongs, songEligibleForMilestoneSnapshot } from "lib/db/snapshot.ts";
|
||||||
import { SnapshotQueue } from "lib/mq/index.ts";
|
import { SnapshotQueue } from "lib/mq/index.ts";
|
||||||
import { insertVideoStats } from "lib/mq/task/getVideoStats.ts";
|
import { insertVideoStats } from "lib/mq/task/getVideoStats.ts";
|
||||||
import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts";
|
import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts";
|
||||||
import { redis } from "lib/db/redis.ts";
|
import { redis } from "lib/db/redis.ts";
|
||||||
import { NetSchedulerError } from "lib/mq/scheduler.ts";
|
import { NetSchedulerError } from "lib/mq/scheduler.ts";
|
||||||
import logger from "lib/log/logger.ts";
|
import logger from "lib/log/logger.ts";
|
||||||
|
import { formatSeconds } from "lib/utils/formatSeconds.ts";
|
||||||
|
|
||||||
async function snapshotScheduled(aid: number) {
|
async function snapshotScheduled(aid: number) {
|
||||||
try {
|
try {
|
||||||
@ -43,19 +45,24 @@ interface SongNearMilestone {
|
|||||||
replies: number;
|
replies: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
async function processMilestoneSnapshots(vidoesNearMilestone: SongNearMilestone[]) {
|
async function processMilestoneSnapshots(client: Client, vidoesNearMilestone: SongNearMilestone[]) {
|
||||||
let i = 0;
|
let i = 0;
|
||||||
for (const snapshot of vidoesNearMilestone) {
|
for (const snapshot of vidoesNearMilestone) {
|
||||||
if (await snapshotScheduled(snapshot.aid)) {
|
if (await snapshotScheduled(snapshot.aid)) {
|
||||||
|
logger.silly(`Video ${snapshot.aid} is already scheduled for snapshot`, "mq", "fn:processMilestoneSnapshots");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (await songEligibleForMilestoneSnapshot(client, snapshot.aid) === false) {
|
||||||
|
logger.silly(`Video ${snapshot.aid} is not eligible for milestone snapshot`, "mq", "fn:processMilestoneSnapshots");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
const factor = Math.floor(i / 8);
|
const factor = Math.floor(i / 8);
|
||||||
const delayTime = factor * SECOND * 2;
|
const delayTime = factor * SECOND * 2;
|
||||||
SnapshotQueue.add("snapshotMilestoneVideo", {
|
await SnapshotQueue.add("snapshotMilestoneVideo", {
|
||||||
aid: snapshot.aid,
|
aid: snapshot.aid,
|
||||||
currentViews: snapshot.views,
|
currentViews: snapshot.views,
|
||||||
snapshotedAt: snapshot.created_at,
|
snapshotedAt: snapshot.created_at,
|
||||||
}, { delay: delayTime });
|
}, { delay: delayTime, priority: 1 });
|
||||||
await setSnapshotScheduled(snapshot.aid, true, 20 * 60);
|
await setSnapshotScheduled(snapshot.aid, true, 20 * 60);
|
||||||
i++;
|
i++;
|
||||||
}
|
}
|
||||||
@ -65,13 +72,14 @@ async function processUnsnapshotedVideos(unsnapshotedVideos: number[]) {
|
|||||||
let i = 0;
|
let i = 0;
|
||||||
for (const aid of unsnapshotedVideos) {
|
for (const aid of unsnapshotedVideos) {
|
||||||
if (await snapshotScheduled(aid)) {
|
if (await snapshotScheduled(aid)) {
|
||||||
|
logger.silly(`Video ${aid} is already scheduled for snapshot`, "mq", "fn:processUnsnapshotedVideos");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
const factor = Math.floor(i / 5);
|
const factor = Math.floor(i / 5);
|
||||||
const delayTime = factor * SECOND * 4;
|
const delayTime = factor * SECOND * 4;
|
||||||
SnapshotQueue.add("snapshotVideo", {
|
await SnapshotQueue.add("snapshotVideo", {
|
||||||
aid,
|
aid,
|
||||||
}, { delay: delayTime });
|
}, { delay: delayTime, priority: 3 });
|
||||||
await setSnapshotScheduled(aid, true, 6 * 60 * 60);
|
await setSnapshotScheduled(aid, true, 6 * 60 * 60);
|
||||||
i++;
|
i++;
|
||||||
}
|
}
|
||||||
@ -81,7 +89,7 @@ export const snapshotTickWorker = async (_job: Job) => {
|
|||||||
const client = await db.connect();
|
const client = await db.connect();
|
||||||
try {
|
try {
|
||||||
const vidoesNearMilestone = await getSongsNearMilestone(client);
|
const vidoesNearMilestone = await getSongsNearMilestone(client);
|
||||||
await processMilestoneSnapshots(vidoesNearMilestone);
|
await processMilestoneSnapshots(client, vidoesNearMilestone);
|
||||||
|
|
||||||
const unsnapshotedVideos = await getUnsnapshotedSongs(client);
|
const unsnapshotedVideos = await getUnsnapshotedSongs(client);
|
||||||
await processUnsnapshotedVideos(unsnapshotedVideos);
|
await processUnsnapshotedVideos(unsnapshotedVideos);
|
||||||
@ -94,30 +102,42 @@ export const takeSnapshotForMilestoneVideoWorker = async (job: Job) => {
|
|||||||
const client = await db.connect();
|
const client = await db.connect();
|
||||||
await setSnapshotScheduled(job.data.aid, true, 20 * 60);
|
await setSnapshotScheduled(job.data.aid, true, 20 * 60);
|
||||||
try {
|
try {
|
||||||
const { aid, currentViews, lastSnapshoted } = job.data;
|
const { aid, currentViews, snapshotedAt } = job.data;
|
||||||
|
const lastSnapshoted = snapshotedAt;
|
||||||
const stat = await insertVideoStats(client, aid, "snapshotMilestoneVideo");
|
const stat = await insertVideoStats(client, aid, "snapshotMilestoneVideo");
|
||||||
if (stat == null) {
|
if (stat == null) {
|
||||||
setSnapshotScheduled(aid, false, 0);
|
await setSnapshotScheduled(aid, false, 0);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
const nextMilestone = currentViews >= 100000 ? 1000000 : 100000;
|
const nextMilestone = currentViews >= 100000 ? 1000000 : 100000;
|
||||||
if (stat.views >= nextMilestone) {
|
if (stat.views >= nextMilestone) {
|
||||||
setSnapshotScheduled(aid, false, 0);
|
await setSnapshotScheduled(aid, false, 0);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
const DELTA = 0.001;
|
||||||
const intervalSeconds = (Date.now() - parseTimestampFromPsql(lastSnapshoted)) / SECOND;
|
const intervalSeconds = (Date.now() - parseTimestampFromPsql(lastSnapshoted)) / SECOND;
|
||||||
const viewsIncrement = stat.views - currentViews;
|
const viewsIncrement = stat.views - currentViews;
|
||||||
const incrementSpeed = viewsIncrement / intervalSeconds;
|
const incrementSpeed = viewsIncrement / (intervalSeconds + DELTA);
|
||||||
const viewsToIncrease = nextMilestone - stat.views;
|
const viewsToIncrease = nextMilestone - stat.views;
|
||||||
const eta = viewsToIncrease / incrementSpeed;
|
const eta = viewsToIncrease / (incrementSpeed + DELTA);
|
||||||
const scheduledNextSnapshotDelay = eta * SECOND / 3;
|
const scheduledNextSnapshotDelay = eta * SECOND / 3;
|
||||||
const maxInterval = 20 * MINUTE;
|
const maxInterval = 20 * MINUTE;
|
||||||
const delay = Math.min(scheduledNextSnapshotDelay, maxInterval);
|
const delay = Math.min(scheduledNextSnapshotDelay, maxInterval);
|
||||||
SnapshotQueue.add("snapshotMilestoneVideo", {
|
await SnapshotQueue.add("snapshotMilestoneVideo", {
|
||||||
aid,
|
aid,
|
||||||
currentViews: stat.views,
|
currentViews: stat.views,
|
||||||
snapshotedAt: stat.time,
|
snapshotedAt: stat.time,
|
||||||
}, { delay });
|
}, { delay, priority: 1});
|
||||||
|
await job.updateData({
|
||||||
|
...job.data,
|
||||||
|
updatedViews: stat.views,
|
||||||
|
updatedTime: new Date(stat.time).toISOString(),
|
||||||
|
etaInMins: eta / 60,
|
||||||
|
});
|
||||||
|
logger.log(
|
||||||
|
`Scheduled next milestone snapshot for ${aid} in ${formatSeconds(delay / 1000)}, current views: ${stat.views}`,
|
||||||
|
"mq",
|
||||||
|
);
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
if (e instanceof NetSchedulerError && e.code === "NO_AVAILABLE_PROXY") {
|
if (e instanceof NetSchedulerError && e.code === "NO_AVAILABLE_PROXY") {
|
||||||
logger.warn(
|
logger.warn(
|
||||||
@ -125,11 +145,11 @@ export const takeSnapshotForMilestoneVideoWorker = async (job: Job) => {
|
|||||||
"mq",
|
"mq",
|
||||||
"fn:takeSnapshotForMilestoneVideoWorker",
|
"fn:takeSnapshotForMilestoneVideoWorker",
|
||||||
);
|
);
|
||||||
SnapshotQueue.add("snapshotMilestoneVideo", {
|
await SnapshotQueue.add("snapshotMilestoneVideo", {
|
||||||
aid: job.data.aid,
|
aid: job.data.aid,
|
||||||
currentViews: job.data.currentViews,
|
currentViews: job.data.currentViews,
|
||||||
snapshotedAt: job.data.snapshotedAt,
|
snapshotedAt: job.data.snapshotedAt,
|
||||||
}, { delay: 5 * SECOND });
|
}, { delay: 5 * SECOND, priority: 1 });
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
throw e;
|
throw e;
|
||||||
@ -144,29 +164,29 @@ export const takeSnapshotForVideoWorker = async (job: Job) => {
|
|||||||
try {
|
try {
|
||||||
const { aid } = job.data;
|
const { aid } = job.data;
|
||||||
const stat = await insertVideoStats(client, aid, "getVideoInfo");
|
const stat = await insertVideoStats(client, aid, "getVideoInfo");
|
||||||
|
logger.log(`Taken snapshot for ${aid}`, "mq");
|
||||||
if (stat == null) {
|
if (stat == null) {
|
||||||
setSnapshotScheduled(aid, false, 0);
|
setSnapshotScheduled(aid, false, 0);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
await job.updateData({
|
||||||
|
...job.data,
|
||||||
|
updatedViews: stat.views,
|
||||||
|
updatedTime: new Date(stat.time).toISOString(),
|
||||||
|
});
|
||||||
const nearMilestone = (stat.views >= 90000 && stat.views < 100000) ||
|
const nearMilestone = (stat.views >= 90000 && stat.views < 100000) ||
|
||||||
(stat.views >= 900000 && stat.views < 1000000);
|
(stat.views >= 900000 && stat.views < 1000000);
|
||||||
if (nearMilestone) {
|
if (nearMilestone) {
|
||||||
SnapshotQueue.add("snapshotMilestoneVideo", {
|
await SnapshotQueue.add("snapshotMilestoneVideo", {
|
||||||
aid,
|
aid,
|
||||||
currentViews: stat.views,
|
currentViews: stat.views,
|
||||||
snapshotedAt: stat.time,
|
snapshotedAt: stat.time,
|
||||||
}, { delay: 0 });
|
}, { delay: 0, priority: 1 });
|
||||||
}
|
}
|
||||||
|
await setSnapshotScheduled(aid, false, 0);
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
if (e instanceof NetSchedulerError && e.code === "NO_AVAILABLE_PROXY") {
|
if (e instanceof NetSchedulerError && e.code === "NO_AVAILABLE_PROXY") {
|
||||||
logger.warn(
|
await setSnapshotScheduled(job.data.aid, false, 0);
|
||||||
`No available proxy for aid ${job.data.aid}.`,
|
|
||||||
"mq",
|
|
||||||
"fn:takeSnapshotForMilestoneVideoWorker",
|
|
||||||
);
|
|
||||||
SnapshotQueue.add("snapshotVideo", {
|
|
||||||
aid: job.data.aid,
|
|
||||||
}, { delay: 10 * SECOND });
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
throw e;
|
throw e;
|
||||||
|
@ -311,6 +311,10 @@ const biliLimiterConfig: RateLimiterConfig[] = [
|
|||||||
window: new SlidingWindow(redis, 1),
|
window: new SlidingWindow(redis, 1),
|
||||||
max: 6,
|
max: 6,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
window: new SlidingWindow(redis, 5),
|
||||||
|
max: 20,
|
||||||
|
},
|
||||||
{
|
{
|
||||||
window: new SlidingWindow(redis, 30),
|
window: new SlidingWindow(redis, 30),
|
||||||
max: 100,
|
max: 100,
|
||||||
|
9
lib/utils/formatSeconds.ts
Normal file
9
lib/utils/formatSeconds.ts
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
export const formatSeconds = (seconds: number) => {
|
||||||
|
if (seconds < 60) {
|
||||||
|
return `${(seconds).toFixed(1)}s`;
|
||||||
|
}
|
||||||
|
if (seconds < 3600) {
|
||||||
|
return `${Math.floor(seconds / 60)}m${seconds % 60}s`;
|
||||||
|
}
|
||||||
|
return `${Math.floor(seconds / 3600)}h ${((seconds % 3600) / 60).toFixed(2)}m`;
|
||||||
|
};
|
@ -69,13 +69,9 @@ const snapshotWorker = new Worker(
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
{ connection: redis, concurrency: 20, removeOnComplete: { count: 1440 } },
|
{ connection: redis, concurrency: 10, removeOnComplete: { count: 2000 } },
|
||||||
);
|
);
|
||||||
|
|
||||||
snapshotWorker.on("active", () => {
|
|
||||||
logger.log("Worker (snapshot) activated.", "mq");
|
|
||||||
})
|
|
||||||
|
|
||||||
snapshotWorker.on("error", (err) => {
|
snapshotWorker.on("error", (err) => {
|
||||||
const e = err as WorkerError;
|
const e = err as WorkerError;
|
||||||
logger.error(e.rawError, e.service, e.codePath);
|
logger.error(e.rawError, e.service, e.codePath);
|
||||||
|
Loading…
Reference in New Issue
Block a user