diff --git a/packages/crawler/mq/exec/collectQueueMetrics.ts b/packages/crawler/mq/exec/collectQueueMetrics.ts new file mode 100644 index 0000000..03dbf9b --- /dev/null +++ b/packages/crawler/mq/exec/collectQueueMetrics.ts @@ -0,0 +1,18 @@ +import { queueJobsCounter } from "metrics"; +import { SnapshotQueue } from "mq"; + +export const collectQueueMetrics = async () => { + const counts = await SnapshotQueue.getJobCounts(); + const waiting = counts?.waiting; + const prioritized = counts?.prioritized; + const active = counts?.active; + const completed = counts?.completed; + const failed = counts?.failed; + const delayed = counts?.delayed; + waiting && queueJobsCounter.record(waiting, { queueName: "SnapshotQueue", status: "waiting" }); + prioritized && queueJobsCounter.record(prioritized, { queueName: "SnapshotQueue", status: "prioritized" }); + active && queueJobsCounter.record(active, { queueName: "SnapshotQueue", status: "active" }); + completed && queueJobsCounter.record(completed, { queueName: "SnapshotQueue", status: "completed" }); + failed && queueJobsCounter.record(failed, { queueName: "SnapshotQueue", status: "failed" }); + delayed && queueJobsCounter.record(delayed, { queueName: "SnapshotQueue", status: "delayed" }); +} \ No newline at end of file diff --git a/packages/crawler/mq/exec/getLatestVideos.ts b/packages/crawler/mq/exec/getLatestVideos.ts index 53d3b31..85a72e1 100644 --- a/packages/crawler/mq/exec/getLatestVideos.ts +++ b/packages/crawler/mq/exec/getLatestVideos.ts @@ -1,14 +1,7 @@ import { sql } from "@core/db/dbNew"; import { Job } from "bullmq"; -import { queueJobsCounter } from "metrics"; -import { SnapshotQueue } from "mq"; import { queueLatestVideos } from "mq/task/queueLatestVideo"; export const getLatestVideosWorker = async (_job: Job): Promise => { await queueLatestVideos(sql); - const counts = await SnapshotQueue.getJobCounts(); - const waiting = counts?.waiting; - const prioritized = counts?.prioritized; - waiting && queueJobsCounter.record(waiting, { queueName: "SnapshotQueue", status: "waiting" }); - prioritized && queueJobsCounter.record(prioritized, { queueName: "SnapshotQueue", status: "prioritized" }); }; diff --git a/packages/crawler/mq/index.ts b/packages/crawler/mq/index.ts index 869e716..0d90829 100644 --- a/packages/crawler/mq/index.ts +++ b/packages/crawler/mq/index.ts @@ -12,3 +12,7 @@ export const ClassifyVideoQueue = new Queue("classifyVideo", { export const SnapshotQueue = new Queue("snapshot", { connection: redis as ConnectionOptions }); + +export const MiscQueue = new Queue("misc", { + connection: redis as ConnectionOptions +}); \ No newline at end of file diff --git a/packages/crawler/mq/init.ts b/packages/crawler/mq/init.ts index fb94ed8..3adf66c 100644 --- a/packages/crawler/mq/init.ts +++ b/packages/crawler/mq/init.ts @@ -1,5 +1,5 @@ import { HOUR, MINUTE, SECOND } from "@core/lib"; -import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "mq/index"; +import { ClassifyVideoQueue, LatestVideosQueue, MiscQueue, SnapshotQueue } from "mq/index"; import logger from "@core/log"; import { initSnapshotWindowCounts } from "db/snapshotSchedule"; import { redis } from "@core/db/redis"; @@ -71,5 +71,10 @@ export async function initMQ() { immediately: true }); + await MiscQueue.upsertJobScheduler("collectQueueMetrics", { + every: 10 * SECOND, + immediately: true + }); + logger.log("Message queue initialized."); } diff --git a/packages/crawler/src/bullui.ts b/packages/crawler/src/bullui.ts index da86d71..868a705 100644 --- a/packages/crawler/src/bullui.ts +++ b/packages/crawler/src/bullui.ts @@ -2,7 +2,7 @@ import express from "express"; import { createBullBoard } from "@bull-board/api"; import { BullMQAdapter } from "@bull-board/api/bullMQAdapter.js"; import { ExpressAdapter } from "@bull-board/express"; -import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "mq/index"; +import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue, MiscQueue } from "mq/index"; const serverAdapter = new ExpressAdapter(); serverAdapter.setBasePath("/"); @@ -11,7 +11,8 @@ createBullBoard({ queues: [ new BullMQAdapter(LatestVideosQueue), new BullMQAdapter(ClassifyVideoQueue), - new BullMQAdapter(SnapshotQueue) + new BullMQAdapter(SnapshotQueue), + new BullMQAdapter(MiscQueue) ], serverAdapter: serverAdapter }); diff --git a/packages/crawler/src/worker.ts b/packages/crawler/src/worker.ts index 7dc42d6..19a10eb 100644 --- a/packages/crawler/src/worker.ts +++ b/packages/crawler/src/worker.ts @@ -16,6 +16,7 @@ import { redis } from "@core/db/redis"; import logger from "@core/log"; import { lockManager } from "@core/mq/lockManager"; import { WorkerError } from "mq/schema"; +import { collectQueueMetrics } from "mq/exec/collectQueueMetrics"; const releaseLockForJob = async (name: string) => { await lockManager.releaseLock(name); @@ -34,6 +35,7 @@ const shutdown = async (signal: string) => { await releaseAllLocks(); await latestVideoWorker.close(true); await snapshotWorker.close(true); + await miscWorker.close(true); process.exit(0); }; @@ -102,3 +104,21 @@ snapshotWorker.on("error", (err) => { const e = err as WorkerError; logger.error(e.rawError, e.service, e.codePath); }); + +const miscWorker = new Worker( + "misc", + async (job: Job) => { + switch (job.name) { + case "collectQueueMetrics": + return await collectQueueMetrics(); + default: + break; + } + }, + { connection: redis as ConnectionOptions, concurrency: 5, removeOnComplete: { count: 1000 } } +); + +miscWorker.on("error", (err) => { + const e = err as WorkerError; + logger.error(e.rawError, e.service, e.codePath); +}); \ No newline at end of file