From 125a30f0dfc0fd67c130bb84b249d28a7b88c883 Mon Sep 17 00:00:00 2001 From: alikia2x Date: Thu, 30 Oct 2025 01:46:34 +0800 Subject: [PATCH] add: metrics for queue --- packages/crawler/metrics/index.ts | 4 ++++ .../crawler/mq/exec/dispatchMilestoneSnapshots.ts | 12 +++++++++++- packages/crawler/mq/exec/getLatestVideos.ts | 7 +++++++ 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/packages/crawler/metrics/index.ts b/packages/crawler/metrics/index.ts index eb2c0ab..07dd5ee 100644 --- a/packages/crawler/metrics/index.ts +++ b/packages/crawler/metrics/index.ts @@ -20,6 +20,10 @@ export const jobCounter = meter.createCounter("job_count", { description: "Number of executed BullMQ jobs" }); +export const queueJobsCounter = meter.createGauge("queue_jobs_count", { + description: "Number of jobs in specific BullMQ queue" +}); + export const jobDuration = meter.createHistogram("job_duration", { description: "Execution duration of BullMQ jobs in milliseconds", unit: "ms" diff --git a/packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts b/packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts index 0022e02..10c5984 100644 --- a/packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts +++ b/packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts @@ -6,8 +6,10 @@ import { scheduleSnapshot } from "db/snapshotSchedule"; import logger from "@core/log"; import { HOUR, MINUTE, SECOND } from "@core/lib"; import { sql } from "@core/db/dbNew"; +import { jobCounter, jobDuration } from "metrics"; export const dispatchMilestoneSnapshotsWorker = async (_job: Job) => { + const start = Date.now(); try { const videos = await getVideosNearMilestone(sql); for (const video of videos) { @@ -21,9 +23,17 @@ export const dispatchMilestoneSnapshotsWorker = async (_job: Job) => { const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval); const targetTime = now + delay; await scheduleSnapshot(sql, aid, "milestone", targetTime); - logger.log(`Scheduled milestone snapshot for aid ${aid} in ${(delay / MINUTE).toFixed(2)} mins.`, "mq"); + logger.log( + `Scheduled milestone snapshot for aid ${aid} in ${(delay / MINUTE).toFixed(2)} mins.`, + "mq" + ); } } catch (e) { logger.error(e as Error, "mq", "fn:dispatchMilestoneSnapshotsWorker"); + } finally { + const duration = Date.now() - start; + + jobCounter.add(1, { jobName: "dispatchMilestoneSnapshots" }); + jobDuration.record(duration, { jobName: "dispatchMilestoneSnapshots" }); } }; diff --git a/packages/crawler/mq/exec/getLatestVideos.ts b/packages/crawler/mq/exec/getLatestVideos.ts index 85a72e1..53d3b31 100644 --- a/packages/crawler/mq/exec/getLatestVideos.ts +++ b/packages/crawler/mq/exec/getLatestVideos.ts @@ -1,7 +1,14 @@ import { sql } from "@core/db/dbNew"; import { Job } from "bullmq"; +import { queueJobsCounter } from "metrics"; +import { SnapshotQueue } from "mq"; import { queueLatestVideos } from "mq/task/queueLatestVideo"; export const getLatestVideosWorker = async (_job: Job): Promise => { await queueLatestVideos(sql); + const counts = await SnapshotQueue.getJobCounts(); + const waiting = counts?.waiting; + const prioritized = counts?.prioritized; + waiting && queueJobsCounter.record(waiting, { queueName: "SnapshotQueue", status: "waiting" }); + prioritized && queueJobsCounter.record(prioritized, { queueName: "SnapshotQueue", status: "prioritized" }); };