From 2585025f69fec00daa293be8667d0040900518d6 Mon Sep 17 00:00:00 2001 From: alikia2x Date: Sun, 9 Feb 2025 21:06:14 +0800 Subject: [PATCH] ref: use jobscheduler for getLatestVideos --- lib/log/logger.ts | 66 +++++++++++++++++++++++++++++++++ lib/mq/executors.ts | 63 ++++++++++++------------------- lib/mq/init.ts | 5 ++- lib/net/bisectVideoStartFrom.ts | 23 ++++++++---- lib/task/insertLatestVideo.ts | 13 ++++++- worker.ts | 2 +- 6 files changed, 120 insertions(+), 52 deletions(-) create mode 100644 lib/log/logger.ts diff --git a/lib/log/logger.ts b/lib/log/logger.ts new file mode 100644 index 0000000..d01f49e --- /dev/null +++ b/lib/log/logger.ts @@ -0,0 +1,66 @@ +import winston, { format, transports } from "npm:winston"; +import { TransformableInfo } from "npm:logform"; +import chalk from "npm:chalk"; +import stripAnsi from 'npm:strip-ansi'; + +const customFormat = format.printf((info: TransformableInfo) => { + const { timestamp, level, message, service, codePath } = info; + const coloredService = service ? chalk.magenta(service): ""; + const coloredCodePath = codePath ? chalk.grey(`@${codePath}`) : ""; + const colon = service || codePath ? ": " : ""; + + return stripAnsi(level) === "debug" + ? `${timestamp} [${level}] ${coloredService}${coloredCodePath}${colon}${message}` + : `${timestamp} [${level}] ${coloredService}${colon}${message}`; +}); + +const timestampFormat = format.timestamp({ format: "YYYY-MM-DD HH:mm:ss.SSS" }); + +const createTransport = (level: string, filename: string) => { + return new transports.File({ + level, + filename, + format: format.combine(timestampFormat, format.json()), + }); +}; + +const winstonLogger = winston.createLogger({ + levels: winston.config.npm.levels, + transports: [ + new transports.Console({ + level: "debug", + format: format.combine( + format.timestamp({ format: "HH:mm:ss.SSS" }), // Different format for console + format.colorize(), + customFormat, + ), + }), + createTransport("info", "logs/app.log"), + createTransport("warn", "logs/warn.log"), + createTransport("error", "logs/error.log"), + ], +}); + +const logger = { + log: (message: string, service?: string, target: "term" | "file" | "both" = "both") => { + const logLevels = []; + if (target === "term" || target === "both") { + logLevels.push("info"); + } + if (target === "file" || target === "both") { + logLevels.push("info"); + } + logLevels.forEach((level) => winstonLogger.log(level, message, { service })); + }, + debug: (message: string, service?: string, codePath?: string) => { + winstonLogger.debug(message, { service, codePath }); + }, + warn: (message: string, service?: string) => { + winstonLogger.warn(message, { service }); + }, + error: (message: string, service?: string) => { + winstonLogger.error(message, { service }); + }, +}; + +export default logger; diff --git a/lib/mq/executors.ts b/lib/mq/executors.ts index 1168b30..f5318dc 100644 --- a/lib/mq/executors.ts +++ b/lib/mq/executors.ts @@ -1,56 +1,39 @@ import { Job } from "bullmq"; -import { redis } from "lib/db/redis.ts"; import { insertLatestVideos } from "lib/task/insertLatestVideo.ts"; import MainQueue from "lib/mq/index.ts"; -import { MINUTE, SECOND } from "$std/datetime/constants.ts"; +import { MINUTE } from "$std/datetime/constants.ts"; import { db } from "lib/db/init.ts"; import { truncate } from "lib/utils/truncate.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; -const LAST_EXECUTED_KEY = "job:insert-videos:last-executed"; -const DELTA = 15 * SECOND; const delayMap = [5, 10, 15, 30, 60, 60]; -const setLastExecutedTimestamp = async () => { - await redis.set(LAST_EXECUTED_KEY, Date.now()); - console.log(`[redis] job:getLatestVideos last executed timestamp set to ${Date.now()}`); -} - -const addJobToQueue = async (failedCount: number, delay: number) => { - const job = await MainQueue.getJob("getLatestVideos"); - if (job && job.getState() === 'active') { - console.log(`[bullmq] job:getLatestVideos is already running.`); - return; - } - console.log(`[bullmq] job:getLatestVideos added to queue with delay of ${delay / MINUTE} minutes.`) - MainQueue.add("getLatestVideos", { failedCount }, { delay: delay }) +const addJobToQueue = (failedCount: number, delay: number) => { + console.log(`[bullmq] job:getLatestVideos added to queue with delay of ${delay / MINUTE} minutes.`); + MainQueue.upsertJobScheduler("getLatestVideos", { + every: delay, + }, { + data: { + failedCount: failedCount, + }, + }); + return; }; export const insertVideosWorker = async (job: Job) => { - const failedCount = (job.data.failedCount ?? 0) as number; - const client = await db.connect(); - const lastExecutedTimestamp = Number(await redis.get(LAST_EXECUTED_KEY)); - console.log(`[redis] job:getLatestVideos last executed at ${new Date(lastExecutedTimestamp).toISOString()}`) + const failedCount = (job.data.failedCount ?? 0) as number; + const client = await db.connect(); - if (!lastExecutedTimestamp || isNaN(lastExecutedTimestamp)) { - await executeTask(client, failedCount); - return; - } - - const diff = Date.now() - lastExecutedTimestamp; - if (diff < 5 * MINUTE) { - const waitTime = 5 * MINUTE - diff; - await addJobToQueue(0, waitTime + DELTA); - return; - } - - await executeTask(client, failedCount); + await executeTask(client, failedCount); + return; }; const executeTask = async (client: Client, failedCount: number) => { - console.log("[task] Executing task:getLatestVideos") - const result = await insertLatestVideos(client); - failedCount = result !== 0 ? truncate(failedCount + 1, 0, 5) : 0; - await setLastExecutedTimestamp(); - await addJobToQueue(failedCount, delayMap[failedCount] * MINUTE); -}; \ No newline at end of file + console.log("[task] Executing task:getLatestVideos"); + const result = await insertLatestVideos(client); + failedCount = result !== 0 ? truncate(failedCount + 1, 0, 5) : 0; + if (failedCount !== 0) { + addJobToQueue(failedCount, delayMap[failedCount] * MINUTE); + } + return; +}; diff --git a/lib/mq/init.ts b/lib/mq/init.ts index caf8717..69cbad2 100644 --- a/lib/mq/init.ts +++ b/lib/mq/init.ts @@ -1,7 +1,10 @@ +import { MINUTE } from "$std/datetime/constants.ts"; import MainQueue from "lib/mq/index.ts"; async function configGetLatestVideos() { - await MainQueue.add("getLatestVideos", {}); + await MainQueue.upsertJobScheduler("getLatestVideos", { + every: 5 * MINUTE + }) } export async function initMQ() { diff --git a/lib/net/bisectVideoStartFrom.ts b/lib/net/bisectVideoStartFrom.ts index 2dbd206..d663e6c 100644 --- a/lib/net/bisectVideoStartFrom.ts +++ b/lib/net/bisectVideoStartFrom.ts @@ -1,22 +1,29 @@ import { getLatestVideos } from "lib/net/getLatestVideos.ts"; +import { AllDataType } from "lib/db/schema.d.ts"; -export async function bisectVideoPageInNewList(timestamp: number): Promise { - const pageSize = 50; +export async function getVideoPositionInNewList(timestamp: number): Promise { + const virtualPageSize = 50; let lowPage = 1; let highPage = 1; let foundUpper = false; while (true) { - const videos = await getLatestVideos(highPage * pageSize, 1, 250, false); + const ps = highPage < 2 ? 50 : 1 + const pn = highPage < 2 ? 1 : highPage * virtualPageSize; + const fetchTags = highPage < 2 ? true : false; + const videos = await getLatestVideos(pn, ps, 250, fetchTags); if (!videos || videos.length === 0) { break; } - const lastVideo = videos[0]; + const lastVideo = videos[videos.length - 1]; if (!lastVideo || !lastVideo.published_at) { break; } const lastTime = Date.parse(lastVideo.published_at); - if (lastTime <= timestamp) { + if (lastTime <= timestamp && highPage == 1) { + return videos; + } + else if (lastTime <= timestamp) { foundUpper = true; break; } else { @@ -34,7 +41,7 @@ export async function bisectVideoPageInNewList(timestamp: number): Promise 0) { for (let i = 0; i < boundaryVideos.length; i++) { @@ -73,7 +80,7 @@ export async function bisectVideoPageInNewList(timestamp: number): Promise {