ref: use jobscheduler for getLatestVideos

This commit is contained in:
alikia2x (寒寒) 2025-02-09 21:06:14 +08:00
parent bc6c0283a8
commit 2585025f69
Signed by: alikia2x
GPG Key ID: 56209E0CCD8420C6
6 changed files with 120 additions and 52 deletions

66
lib/log/logger.ts Normal file
View File

@ -0,0 +1,66 @@
import winston, { format, transports } from "npm:winston";
import { TransformableInfo } from "npm:logform";
import chalk from "npm:chalk";
import stripAnsi from 'npm:strip-ansi';
const customFormat = format.printf((info: TransformableInfo) => {
const { timestamp, level, message, service, codePath } = info;
const coloredService = service ? chalk.magenta(service): "";
const coloredCodePath = codePath ? chalk.grey(`@${codePath}`) : "";
const colon = service || codePath ? ": " : "";
return stripAnsi(level) === "debug"
? `${timestamp} [${level}] ${coloredService}${coloredCodePath}${colon}${message}`
: `${timestamp} [${level}] ${coloredService}${colon}${message}`;
});
const timestampFormat = format.timestamp({ format: "YYYY-MM-DD HH:mm:ss.SSS" });
const createTransport = (level: string, filename: string) => {
return new transports.File({
level,
filename,
format: format.combine(timestampFormat, format.json()),
});
};
const winstonLogger = winston.createLogger({
levels: winston.config.npm.levels,
transports: [
new transports.Console({
level: "debug",
format: format.combine(
format.timestamp({ format: "HH:mm:ss.SSS" }), // Different format for console
format.colorize(),
customFormat,
),
}),
createTransport("info", "logs/app.log"),
createTransport("warn", "logs/warn.log"),
createTransport("error", "logs/error.log"),
],
});
const logger = {
log: (message: string, service?: string, target: "term" | "file" | "both" = "both") => {
const logLevels = [];
if (target === "term" || target === "both") {
logLevels.push("info");
}
if (target === "file" || target === "both") {
logLevels.push("info");
}
logLevels.forEach((level) => winstonLogger.log(level, message, { service }));
},
debug: (message: string, service?: string, codePath?: string) => {
winstonLogger.debug(message, { service, codePath });
},
warn: (message: string, service?: string) => {
winstonLogger.warn(message, { service });
},
error: (message: string, service?: string) => {
winstonLogger.error(message, { service });
},
};
export default logger;

View File

@ -1,56 +1,39 @@
import { Job } from "bullmq";
import { redis } from "lib/db/redis.ts";
import { insertLatestVideos } from "lib/task/insertLatestVideo.ts";
import MainQueue from "lib/mq/index.ts";
import { MINUTE, SECOND } from "$std/datetime/constants.ts";
import { MINUTE } from "$std/datetime/constants.ts";
import { db } from "lib/db/init.ts";
import { truncate } from "lib/utils/truncate.ts";
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
const LAST_EXECUTED_KEY = "job:insert-videos:last-executed";
const DELTA = 15 * SECOND;
const delayMap = [5, 10, 15, 30, 60, 60];
const setLastExecutedTimestamp = async () => {
await redis.set(LAST_EXECUTED_KEY, Date.now());
console.log(`[redis] job:getLatestVideos last executed timestamp set to ${Date.now()}`);
}
const addJobToQueue = async (failedCount: number, delay: number) => {
const job = await MainQueue.getJob("getLatestVideos");
if (job && job.getState() === 'active') {
console.log(`[bullmq] job:getLatestVideos is already running.`);
return;
}
console.log(`[bullmq] job:getLatestVideos added to queue with delay of ${delay / MINUTE} minutes.`)
MainQueue.add("getLatestVideos", { failedCount }, { delay: delay })
const addJobToQueue = (failedCount: number, delay: number) => {
console.log(`[bullmq] job:getLatestVideos added to queue with delay of ${delay / MINUTE} minutes.`);
MainQueue.upsertJobScheduler("getLatestVideos", {
every: delay,
}, {
data: {
failedCount: failedCount,
},
});
return;
};
export const insertVideosWorker = async (job: Job) => {
const failedCount = (job.data.failedCount ?? 0) as number;
const client = await db.connect();
const lastExecutedTimestamp = Number(await redis.get(LAST_EXECUTED_KEY));
console.log(`[redis] job:getLatestVideos last executed at ${new Date(lastExecutedTimestamp).toISOString()}`)
const failedCount = (job.data.failedCount ?? 0) as number;
const client = await db.connect();
if (!lastExecutedTimestamp || isNaN(lastExecutedTimestamp)) {
await executeTask(client, failedCount);
return;
}
const diff = Date.now() - lastExecutedTimestamp;
if (diff < 5 * MINUTE) {
const waitTime = 5 * MINUTE - diff;
await addJobToQueue(0, waitTime + DELTA);
return;
}
await executeTask(client, failedCount);
await executeTask(client, failedCount);
return;
};
const executeTask = async (client: Client, failedCount: number) => {
console.log("[task] Executing task:getLatestVideos")
const result = await insertLatestVideos(client);
failedCount = result !== 0 ? truncate(failedCount + 1, 0, 5) : 0;
await setLastExecutedTimestamp();
await addJobToQueue(failedCount, delayMap[failedCount] * MINUTE);
console.log("[task] Executing task:getLatestVideos");
const result = await insertLatestVideos(client);
failedCount = result !== 0 ? truncate(failedCount + 1, 0, 5) : 0;
if (failedCount !== 0) {
addJobToQueue(failedCount, delayMap[failedCount] * MINUTE);
}
return;
};

View File

@ -1,7 +1,10 @@
import { MINUTE } from "$std/datetime/constants.ts";
import MainQueue from "lib/mq/index.ts";
async function configGetLatestVideos() {
await MainQueue.add("getLatestVideos", {});
await MainQueue.upsertJobScheduler("getLatestVideos", {
every: 5 * MINUTE
})
}
export async function initMQ() {

View File

@ -1,22 +1,29 @@
import { getLatestVideos } from "lib/net/getLatestVideos.ts";
import { AllDataType } from "lib/db/schema.d.ts";
export async function bisectVideoPageInNewList(timestamp: number): Promise<number | null> {
const pageSize = 50;
export async function getVideoPositionInNewList(timestamp: number): Promise<number | null | AllDataType[]> {
const virtualPageSize = 50;
let lowPage = 1;
let highPage = 1;
let foundUpper = false;
while (true) {
const videos = await getLatestVideos(highPage * pageSize, 1, 250, false);
const ps = highPage < 2 ? 50 : 1
const pn = highPage < 2 ? 1 : highPage * virtualPageSize;
const fetchTags = highPage < 2 ? true : false;
const videos = await getLatestVideos(pn, ps, 250, fetchTags);
if (!videos || videos.length === 0) {
break;
}
const lastVideo = videos[0];
const lastVideo = videos[videos.length - 1];
if (!lastVideo || !lastVideo.published_at) {
break;
}
const lastTime = Date.parse(lastVideo.published_at);
if (lastTime <= timestamp) {
if (lastTime <= timestamp && highPage == 1) {
return videos;
}
else if (lastTime <= timestamp) {
foundUpper = true;
break;
} else {
@ -34,7 +41,7 @@ export async function bisectVideoPageInNewList(timestamp: number): Promise<numbe
let hi = highPage;
while (lo <= hi) {
const mid = Math.floor((lo + hi) / 2);
const videos = await getLatestVideos(mid * pageSize, 1, 250, false);
const videos = await getLatestVideos(mid * virtualPageSize, 1, 250, false);
if (!videos) {
return null;
}
@ -56,7 +63,7 @@ export async function bisectVideoPageInNewList(timestamp: number): Promise<numbe
}
}
const boundaryVideos = await getLatestVideos(boundaryPage, pageSize, 250, false);
const boundaryVideos = await getLatestVideos(boundaryPage, virtualPageSize, 250, false);
let indexInPage = 0;
if (boundaryVideos && boundaryVideos.length > 0) {
for (let i = 0; i < boundaryVideos.length; i++) {
@ -73,7 +80,7 @@ export async function bisectVideoPageInNewList(timestamp: number): Promise<numbe
}
}
const count = (boundaryPage - 1) * pageSize + indexInPage;
const count = (boundaryPage - 1) * virtualPageSize + indexInPage;
const safetyMargin = 5;

View File

@ -2,7 +2,7 @@ import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { getLatestVideos } from "lib/net/getLatestVideos.ts";
import { getLatestVideoTimestampFromAllData, insertIntoAllData, videoExistsInAllData } from "lib/db/allData.ts";
import { sleep } from "lib/utils/sleep.ts";
import { bisectVideoPageInNewList } from "lib/net/bisectVideoStartFrom.ts";
import { getVideoPositionInNewList } from "lib/net/bisectVideoStartFrom.ts";
import { SECOND } from "$std/datetime/constants.ts";
export async function insertLatestVideos(
@ -17,11 +17,20 @@ export async function insertLatestVideos(
return null
}
console.log(`[func:insertLatestVideos] Latest video in the database: ${new Date(latestVideoTimestamp).toISOString()}`)
const videoIndex = await bisectVideoPageInNewList(latestVideoTimestamp);
const videoIndex = await getVideoPositionInNewList(latestVideoTimestamp);
if (videoIndex == null) {
console.error("[func:insertLatestVideos] Cannot locate the video through bisect.");
return null
}
if (typeof videoIndex == "object") {
for (const video of videoIndex) {
const videoExists = await videoExistsInAllData(client, video.aid);
if (!videoExists) {
insertIntoAllData(client, video);
}
}
return 0;
}
let page = Math.floor(videoIndex / pageSize) + 1;
let failCount = 0;
const insertedVideos = new Set();

View File

@ -13,7 +13,7 @@ const worker = new Worker(
break;
}
},
{ connection: redis },
{ connection: redis, concurrency: 4 },
);
worker.on("active", () => {