update: collection of metrics of queues
This commit is contained in:
parent
125a30f0df
commit
9ba1a0c6f0
18
packages/crawler/mq/exec/collectQueueMetrics.ts
Normal file
18
packages/crawler/mq/exec/collectQueueMetrics.ts
Normal file
@ -0,0 +1,18 @@
|
||||
import { queueJobsCounter } from "metrics";
|
||||
import { SnapshotQueue } from "mq";
|
||||
|
||||
export const collectQueueMetrics = async () => {
|
||||
const counts = await SnapshotQueue.getJobCounts();
|
||||
const waiting = counts?.waiting;
|
||||
const prioritized = counts?.prioritized;
|
||||
const active = counts?.active;
|
||||
const completed = counts?.completed;
|
||||
const failed = counts?.failed;
|
||||
const delayed = counts?.delayed;
|
||||
waiting && queueJobsCounter.record(waiting, { queueName: "SnapshotQueue", status: "waiting" });
|
||||
prioritized && queueJobsCounter.record(prioritized, { queueName: "SnapshotQueue", status: "prioritized" });
|
||||
active && queueJobsCounter.record(active, { queueName: "SnapshotQueue", status: "active" });
|
||||
completed && queueJobsCounter.record(completed, { queueName: "SnapshotQueue", status: "completed" });
|
||||
failed && queueJobsCounter.record(failed, { queueName: "SnapshotQueue", status: "failed" });
|
||||
delayed && queueJobsCounter.record(delayed, { queueName: "SnapshotQueue", status: "delayed" });
|
||||
}
|
||||
@ -1,14 +1,7 @@
|
||||
import { sql } from "@core/db/dbNew";
|
||||
import { Job } from "bullmq";
|
||||
import { queueJobsCounter } from "metrics";
|
||||
import { SnapshotQueue } from "mq";
|
||||
import { queueLatestVideos } from "mq/task/queueLatestVideo";
|
||||
|
||||
export const getLatestVideosWorker = async (_job: Job): Promise<void> => {
|
||||
await queueLatestVideos(sql);
|
||||
const counts = await SnapshotQueue.getJobCounts();
|
||||
const waiting = counts?.waiting;
|
||||
const prioritized = counts?.prioritized;
|
||||
waiting && queueJobsCounter.record(waiting, { queueName: "SnapshotQueue", status: "waiting" });
|
||||
prioritized && queueJobsCounter.record(prioritized, { queueName: "SnapshotQueue", status: "prioritized" });
|
||||
};
|
||||
|
||||
@ -12,3 +12,7 @@ export const ClassifyVideoQueue = new Queue("classifyVideo", {
|
||||
export const SnapshotQueue = new Queue("snapshot", {
|
||||
connection: redis as ConnectionOptions
|
||||
});
|
||||
|
||||
export const MiscQueue = new Queue("misc", {
|
||||
connection: redis as ConnectionOptions
|
||||
});
|
||||
@ -1,5 +1,5 @@
|
||||
import { HOUR, MINUTE, SECOND } from "@core/lib";
|
||||
import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "mq/index";
|
||||
import { ClassifyVideoQueue, LatestVideosQueue, MiscQueue, SnapshotQueue } from "mq/index";
|
||||
import logger from "@core/log";
|
||||
import { initSnapshotWindowCounts } from "db/snapshotSchedule";
|
||||
import { redis } from "@core/db/redis";
|
||||
@ -71,5 +71,10 @@ export async function initMQ() {
|
||||
immediately: true
|
||||
});
|
||||
|
||||
await MiscQueue.upsertJobScheduler("collectQueueMetrics", {
|
||||
every: 10 * SECOND,
|
||||
immediately: true
|
||||
});
|
||||
|
||||
logger.log("Message queue initialized.");
|
||||
}
|
||||
|
||||
@ -2,7 +2,7 @@ import express from "express";
|
||||
import { createBullBoard } from "@bull-board/api";
|
||||
import { BullMQAdapter } from "@bull-board/api/bullMQAdapter.js";
|
||||
import { ExpressAdapter } from "@bull-board/express";
|
||||
import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "mq/index";
|
||||
import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue, MiscQueue } from "mq/index";
|
||||
|
||||
const serverAdapter = new ExpressAdapter();
|
||||
serverAdapter.setBasePath("/");
|
||||
@ -11,7 +11,8 @@ createBullBoard({
|
||||
queues: [
|
||||
new BullMQAdapter(LatestVideosQueue),
|
||||
new BullMQAdapter(ClassifyVideoQueue),
|
||||
new BullMQAdapter(SnapshotQueue)
|
||||
new BullMQAdapter(SnapshotQueue),
|
||||
new BullMQAdapter(MiscQueue)
|
||||
],
|
||||
serverAdapter: serverAdapter
|
||||
});
|
||||
|
||||
@ -16,6 +16,7 @@ import { redis } from "@core/db/redis";
|
||||
import logger from "@core/log";
|
||||
import { lockManager } from "@core/mq/lockManager";
|
||||
import { WorkerError } from "mq/schema";
|
||||
import { collectQueueMetrics } from "mq/exec/collectQueueMetrics";
|
||||
|
||||
const releaseLockForJob = async (name: string) => {
|
||||
await lockManager.releaseLock(name);
|
||||
@ -34,6 +35,7 @@ const shutdown = async (signal: string) => {
|
||||
await releaseAllLocks();
|
||||
await latestVideoWorker.close(true);
|
||||
await snapshotWorker.close(true);
|
||||
await miscWorker.close(true);
|
||||
process.exit(0);
|
||||
};
|
||||
|
||||
@ -102,3 +104,21 @@ snapshotWorker.on("error", (err) => {
|
||||
const e = err as WorkerError;
|
||||
logger.error(e.rawError, e.service, e.codePath);
|
||||
});
|
||||
|
||||
const miscWorker = new Worker(
|
||||
"misc",
|
||||
async (job: Job) => {
|
||||
switch (job.name) {
|
||||
case "collectQueueMetrics":
|
||||
return await collectQueueMetrics();
|
||||
default:
|
||||
break;
|
||||
}
|
||||
},
|
||||
{ connection: redis as ConnectionOptions, concurrency: 5, removeOnComplete: { count: 1000 } }
|
||||
);
|
||||
|
||||
miscWorker.on("error", (err) => {
|
||||
const e = err as WorkerError;
|
||||
logger.error(e.rawError, e.service, e.codePath);
|
||||
});
|
||||
Loading…
Reference in New Issue
Block a user