Compare commits

...

2 Commits

21 changed files with 182 additions and 173 deletions

View File

@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project version="4"> <project version="4">
<component name="DenoSettings"> <component name="DenoSettings">
<option name="useDenoValue" value="ENABLE" /> <option name="denoInit" value="{&#10; &quot;enable&quot;: true,&#10; &quot;lint&quot;: true,&#10; &quot;unstable&quot;: true,&#10; &quot;importMap&quot;: &quot;import_map.json&quot;,&#10; &quot;config&quot;: &quot;deno.json&quot;,&#10; &quot;fmt&quot;: {&#10; &quot;useTabs&quot;: true,&#10; &quot;lineWidth&quot;: 120,&#10; &quot;indentWidth&quot;: 4,&#10; &quot;semiColons&quot;: true,&#10; &quot;proseWrap&quot;: &quot;always&quot;&#10; }&#10;}" />
</component> </component>
</project> </project>

View File

@ -3,33 +3,33 @@
// For a full list of overridable settings, and general information on folder-specific settings, // For a full list of overridable settings, and general information on folder-specific settings,
// see the documentation: https://zed.dev/docs/configuring-zed#settings-files // see the documentation: https://zed.dev/docs/configuring-zed#settings-files
{ {
"lsp": { "lsp": {
"deno": { "deno": {
"settings": { "settings": {
"deno": { "deno": {
"enable": true "enable": true
} }
} }
} }
}, },
"languages": { "languages": {
"TypeScript": { "TypeScript": {
"language_servers": [ "language_servers": [
"deno", "deno",
"!typescript-language-server", "!typescript-language-server",
"!vtsls", "!vtsls",
"!eslint" "!eslint"
], ],
"formatter": "language_server" "formatter": "language_server"
}, },
"TSX": { "TSX": {
"language_servers": [ "language_servers": [
"deno", "deno",
"!typescript-language-server", "!typescript-language-server",
"!vtsls", "!vtsls",
"!eslint" "!eslint"
], ],
"formatter": "language_server" "formatter": "language_server"
} }
} }
} }

View File

@ -10,7 +10,6 @@ import type { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import type { BlankEnv, BlankInput } from "hono/types"; import type { BlankEnv, BlankInput } from "hono/types";
import type { VideoInfoData } from "@core/net/bilibili.d.ts"; import type { VideoInfoData } from "@core/net/bilibili.d.ts";
const redis = new Redis({ maxRetriesPerRequest: null }); const redis = new Redis({ maxRetriesPerRequest: null });
const CACHE_EXPIRATION_SECONDS = 60; const CACHE_EXPIRATION_SECONDS = 60;
@ -39,7 +38,6 @@ async function insertVideoSnapshot(client: Client, data: VideoInfoData) {
logger.log(`Inserted into snapshot for video ${aid} by videoInfo API.`, "api", "fn:insertVideoSnapshot"); logger.log(`Inserted into snapshot for video ${aid} by videoInfo API.`, "api", "fn:insertVideoSnapshot");
} }
export const videoInfoHandler = createHandlers(async (c: ContextType) => { export const videoInfoHandler = createHandlers(async (c: ContextType) => {
const client = c.get("db"); const client = c.get("db");
try { try {

View File

@ -1,8 +1,8 @@
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { SnapshotScheduleType } from "@core/db/schema"; import { SnapshotScheduleType } from "@core/db/schema";
import logger from "log/logger.ts"; import logger from "log/logger.ts";
import { MINUTE } from "$std/datetime/constants.ts"; import { MINUTE } from "@std/datetime";
import { redis } from "../../core/db/redis.ts"; import { redis } from "@core/db/redis.ts";
import { Redis } from "ioredis"; import { Redis } from "ioredis";
const REDIS_KEY = "cvsa:snapshot_window_counts"; const REDIS_KEY = "cvsa:snapshot_window_counts";

View File

@ -23,6 +23,7 @@
"imports": { "imports": {
"@std/assert": "jsr:@std/assert@1", "@std/assert": "jsr:@std/assert@1",
"$std/": "https://deno.land/std@0.216.0/", "$std/": "https://deno.land/std@0.216.0/",
"@std/datetime": "jsr:@std/datetime@^0.225.4",
"@huggingface/transformers": "npm:@huggingface/transformers@3.0.0", "@huggingface/transformers": "npm:@huggingface/transformers@3.0.0",
"bullmq": "npm:bullmq", "bullmq": "npm:bullmq",
"mq/": "./mq/", "mq/": "./mq/",

View File

@ -8,7 +8,7 @@ import { lockManager } from "mq/lockManager.ts";
import { aidExistsInSongs } from "db/songs.ts"; import { aidExistsInSongs } from "db/songs.ts";
import { insertIntoSongs } from "mq/task/collectSongs.ts"; import { insertIntoSongs } from "mq/task/collectSongs.ts";
import { scheduleSnapshot } from "db/snapshotSchedule.ts"; import { scheduleSnapshot } from "db/snapshotSchedule.ts";
import { MINUTE } from "$std/datetime/constants.ts"; import { MINUTE } from "@std/datetime";
export const classifyVideoWorker = async (job: Job) => { export const classifyVideoWorker = async (job: Job) => {
const client = await db.connect(); const client = await db.connect();

View File

@ -0,0 +1,9 @@
import { Job } from "npm:bullmq@5.45.2";
import { collectSongs } from "mq/task/collectSongs.ts";
import { withDbConnection } from "db/withConnection.ts";
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
export const collectSongsWorker = (_job: Job): Promise<void> =>
withDbConnection(async (client: Client) => {
await collectSongs(client);
});

View File

@ -0,0 +1,4 @@
export * from "mq/exec/getLatestVideos.ts";
export * from "./getVideoInfo.ts";
export * from "./collectSongs.ts";
export * from "./takeBulkSnapshot.ts";

View File

@ -1,37 +1,10 @@
import { Job } from "bullmq"; import { Job } from "bullmq";
import { queueLatestVideos } from "mq/task/queueLatestVideo.ts"; import { queueLatestVideos } from "mq/task/queueLatestVideo.ts";
import { db } from "db/init.ts"; import { withDbConnection } from "db/withConnection.ts";
import { insertVideoInfo } from "mq/task/getVideoDetails.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { collectSongs } from "mq/task/collectSongs.ts";
export const getLatestVideosWorker = async (_job: Job): Promise<void> => {
const client = await db.connect();
try {
await queueLatestVideos(client);
} finally {
client.release();
}
};
export const collectSongsWorker = async (_job: Job): Promise<void> => { export const getLatestVideosWorker = (_job: Job): Promise<void> =>
const client = await db.connect(); withDbConnection(async (client: Client) => {
try { await queueLatestVideos(client)
await collectSongs(client); });
} finally {
client.release();
}
};
export const getVideoInfoWorker = async (job: Job): Promise<number> => {
const client = await db.connect();
try {
const aid = job.data.aid;
if (!aid) {
return 3;
}
await insertVideoInfo(client, aid);
return 0;
} finally {
client.release();
}
};

View File

@ -0,0 +1,15 @@
import { Job } from "npm:bullmq@5.45.2";
import { insertVideoInfo } from "mq/task/getVideoDetails.ts";
import { withDbConnection } from "db/withConnection.ts";
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import logger from "log/logger.ts";
export const getVideoInfoWorker = async (job: Job): Promise<void> =>
await withDbConnection<void>(async (client: Client) => {
const aid = job.data.aid;
if (!aid) {
logger.warn("aid does not exists", "mq", "job:getVideoInfo");
return;
}
await insertVideoInfo(client, aid)
});

View File

@ -3,12 +3,8 @@ import { db } from "db/init.ts";
import { getLatestVideoSnapshot, getVideosNearMilestone } from "db/snapshot.ts"; import { getLatestVideoSnapshot, getVideosNearMilestone } from "db/snapshot.ts";
import { import {
bulkGetVideosWithoutProcessingSchedules, bulkGetVideosWithoutProcessingSchedules,
bulkScheduleSnapshot,
bulkSetSnapshotStatus, bulkSetSnapshotStatus,
findClosestSnapshot,
findSnapshotBefore,
getBulkSnapshotsInNextSecond, getBulkSnapshotsInNextSecond,
getLatestSnapshot,
getSnapshotsInNextSecond, getSnapshotsInNextSecond,
getVideosWithoutActiveSnapshotSchedule, getVideosWithoutActiveSnapshotSchedule,
scheduleSnapshot, scheduleSnapshot,
@ -16,8 +12,7 @@ import {
snapshotScheduleExists, snapshotScheduleExists,
videoHasProcessingSchedule, videoHasProcessingSchedule,
} from "db/snapshotSchedule.ts"; } from "db/snapshotSchedule.ts";
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { HOUR, MINUTE, SECOND, WEEK } from "@std/datetime";
import { HOUR, MINUTE, SECOND, WEEK } from "$std/datetime/constants.ts";
import logger from "log/logger.ts"; import logger from "log/logger.ts";
import { SnapshotQueue } from "mq/index.ts"; import { SnapshotQueue } from "mq/index.ts";
import { insertVideoSnapshot } from "mq/task/getVideoStats.ts"; import { insertVideoSnapshot } from "mq/task/getVideoStats.ts";
@ -26,8 +21,8 @@ import { getBiliVideoStatus, setBiliVideoStatus } from "db/allData.ts";
import { truncate } from "utils/truncate.ts"; import { truncate } from "utils/truncate.ts";
import { lockManager } from "mq/lockManager.ts"; import { lockManager } from "mq/lockManager.ts";
import { getSongsPublihsedAt } from "db/songs.ts"; import { getSongsPublihsedAt } from "db/songs.ts";
import { bulkGetVideoStats } from "net/bulkGetVideoStats.ts";
import { getAdjustedShortTermETA } from "../scheduling.ts"; import { getAdjustedShortTermETA } from "../scheduling.ts";
import { getRegularSnapshotInterval } from "../task/regularSnapshotInterval.ts";
const priorityMap: { [key: string]: number } = { const priorityMap: { [key: string]: number } = {
"milestone": 1, "milestone": 1,
@ -61,7 +56,7 @@ export const bulkSnapshotTickWorker = async (_job: Job) => {
map: dataMap, map: dataMap,
}, { priority: 3 }); }, { priority: 3 });
} }
return `OK` return `OK`;
} catch (e) { } catch (e) {
logger.error(e as Error); logger.error(e as Error);
} finally { } finally {
@ -127,25 +122,6 @@ export const collectMilestoneSnapshotsWorker = async (_job: Job) => {
} }
}; };
const getRegularSnapshotInterval = async (client: Client, aid: number) => {
const now = Date.now();
const date = new Date(now - 24 * HOUR);
let oldSnapshot = await findSnapshotBefore(client, aid, date);
if (!oldSnapshot) oldSnapshot = await findClosestSnapshot(client, aid, date);
const latestSnapshot = await getLatestSnapshot(client, aid);
if (!oldSnapshot || !latestSnapshot) return 0;
if (oldSnapshot.created_at === latestSnapshot.created_at) return 0;
const hoursDiff = (latestSnapshot.created_at - oldSnapshot.created_at) / HOUR;
if (hoursDiff < 8) return 24;
const viewsDiff = latestSnapshot.views - oldSnapshot.views;
if (viewsDiff === 0) return 72;
const speedPerDay = viewsDiff / (hoursDiff + 0.001) * 24;
if (speedPerDay < 6) return 36;
if (speedPerDay < 120) return 24;
if (speedPerDay < 320) return 12;
return 6;
};
export const regularSnapshotsWorker = async (_job: Job) => { export const regularSnapshotsWorker = async (_job: Job) => {
const client = await db.connect(); const client = await db.connect();
const startedAt = Date.now(); const startedAt = Date.now();
@ -178,72 +154,6 @@ export const regularSnapshotsWorker = async (_job: Job) => {
} }
}; };
export const takeBulkSnapshotForVideosWorker = async (job: Job) => {
const dataMap: { [key: number]: number } = job.data.map;
const ids = Object.keys(dataMap).map((id) => Number(id));
const aidsToFetch: number[] = [];
const client = await db.connect();
try {
for (const id of ids) {
const aid = Number(dataMap[id]);
const exists = await snapshotScheduleExists(client, id);
if (!exists) {
continue;
}
aidsToFetch.push(aid);
}
const data = await bulkGetVideoStats(aidsToFetch);
if (typeof data === "number") {
await bulkSetSnapshotStatus(client, ids, "failed");
await bulkScheduleSnapshot(client, aidsToFetch, "normal", Date.now() + 15 * SECOND);
return `GET_BILI_STATUS_${data}`;
}
for (const video of data) {
const aid = video.id;
const stat = video.cnt_info;
const views = stat.play;
const danmakus = stat.danmaku;
const replies = stat.reply;
const likes = stat.thumb_up;
const coins = stat.coin;
const shares = stat.share;
const favorites = stat.collect;
const query: string = `
INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`;
await client.queryObject(
query,
[aid, views, danmakus, replies, likes, coins, shares, favorites],
);
logger.log(`Taken snapshot for video ${aid} in bulk.`, "net", "fn:takeBulkSnapshotForVideosWorker");
}
await bulkSetSnapshotStatus(client, ids, "completed");
for (const aid of aidsToFetch) {
const interval = await getRegularSnapshotInterval(client, aid);
logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq");
await scheduleSnapshot(client, aid, "normal", Date.now() + interval * HOUR);
}
return `DONE`;
} catch (e) {
if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") {
logger.warn(
`No available proxy for bulk request now.`,
"mq",
"fn:takeBulkSnapshotForVideosWorker",
);
await bulkSetSnapshotStatus(client, ids, "completed");
await bulkScheduleSnapshot(client, aidsToFetch, "normal", Date.now() + 2 * MINUTE);
return;
}
logger.error(e as Error, "mq", "fn:takeBulkSnapshotForVideosWorker");
await bulkSetSnapshotStatus(client, ids, "failed");
} finally {
client.release();
}
};
export const takeSnapshotForVideoWorker = async (job: Job) => { export const takeSnapshotForVideoWorker = async (job: Job) => {
const id = job.data.id; const id = job.data.id;
const aid = Number(job.data.aid); const aid = Number(job.data.aid);

View File

@ -0,0 +1,79 @@
import { Job } from "npm:bullmq@5.45.2";
import { db } from "db/init.ts";
import {
bulkScheduleSnapshot,
bulkSetSnapshotStatus,
scheduleSnapshot,
snapshotScheduleExists,
} from "db/snapshotSchedule.ts";
import { bulkGetVideoStats } from "net/bulkGetVideoStats.ts";
import logger from "log/logger.ts";
import { NetSchedulerError } from "@core/net/delegate.ts";
import { HOUR, MINUTE, SECOND } from "@std/datetime";
import { getRegularSnapshotInterval } from "../task/regularSnapshotInterval.ts";
export const takeBulkSnapshotForVideosWorker = async (job: Job) => {
const dataMap: { [key: number]: number } = job.data.map;
const ids = Object.keys(dataMap).map((id) => Number(id));
const aidsToFetch: number[] = [];
const client = await db.connect();
try {
for (const id of ids) {
const aid = Number(dataMap[id]);
const exists = await snapshotScheduleExists(client, id);
if (!exists) {
continue;
}
aidsToFetch.push(aid);
}
const data = await bulkGetVideoStats(aidsToFetch);
if (typeof data === "number") {
await bulkSetSnapshotStatus(client, ids, "failed");
await bulkScheduleSnapshot(client, aidsToFetch, "normal", Date.now() + 15 * SECOND);
return `GET_BILI_STATUS_${data}`;
}
for (const video of data) {
const aid = video.id;
const stat = video.cnt_info;
const views = stat.play;
const danmakus = stat.danmaku;
const replies = stat.reply;
const likes = stat.thumb_up;
const coins = stat.coin;
const shares = stat.share;
const favorites = stat.collect;
const query: string = `
INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`;
await client.queryObject(
query,
[aid, views, danmakus, replies, likes, coins, shares, favorites],
);
logger.log(`Taken snapshot for video ${aid} in bulk.`, "net", "fn:takeBulkSnapshotForVideosWorker");
}
await bulkSetSnapshotStatus(client, ids, "completed");
for (const aid of aidsToFetch) {
const interval = await getRegularSnapshotInterval(client, aid);
logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq");
await scheduleSnapshot(client, aid, "normal", Date.now() + interval * HOUR);
}
return `DONE`;
} catch (e) {
if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") {
logger.warn(
`No available proxy for bulk request now.`,
"mq",
"fn:takeBulkSnapshotForVideosWorker",
);
await bulkSetSnapshotStatus(client, ids, "completed");
await bulkScheduleSnapshot(client, aidsToFetch, "normal", Date.now() + 2 * MINUTE);
return;
}
logger.error(e as Error, "mq", "fn:takeBulkSnapshotForVideosWorker");
await bulkSetSnapshotStatus(client, ids, "failed");
} finally {
client.release();
}
};

View File

@ -1 +0,0 @@
export * from "mq/exec/getLatestVideos.ts";

View File

@ -1,4 +1,4 @@
import { MINUTE, SECOND } from "$std/datetime/constants.ts"; import { MINUTE, SECOND } from "@std/datetime";
import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "mq/index.ts"; import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "mq/index.ts";
import logger from "log/logger.ts"; import logger from "log/logger.ts";
import { initSnapshotWindowCounts } from "db/snapshotSchedule.ts"; import { initSnapshotWindowCounts } from "db/snapshotSchedule.ts";

View File

@ -8,7 +8,7 @@ import { findClosestSnapshot, getLatestSnapshot, hasAtLeast2Snapshots } from "db
import { truncate } from "utils/truncate.ts"; import { truncate } from "utils/truncate.ts";
import { closetMilestone } from "./exec/snapshotTick.ts"; import { closetMilestone } from "./exec/snapshotTick.ts";
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { HOUR, MINUTE } from "$std/datetime/constants.ts"; import { HOUR, MINUTE } from "@std/datetime";
const log = (value: number, base: number = 10) => Math.log(value) / Math.log(base); const log = (value: number, base: number = 10) => Math.log(value) / Math.log(base);

View File

@ -2,7 +2,7 @@ import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { aidExistsInSongs, getNotCollectedSongs } from "db/songs.ts"; import { aidExistsInSongs, getNotCollectedSongs } from "db/songs.ts";
import logger from "log/logger.ts"; import logger from "log/logger.ts";
import { scheduleSnapshot } from "db/snapshotSchedule.ts"; import { scheduleSnapshot } from "db/snapshotSchedule.ts";
import { MINUTE } from "$std/datetime/constants.ts"; import { MINUTE } from "@std/datetime";
export async function collectSongs(client: Client) { export async function collectSongs(client: Client) {
const aids = await getNotCollectedSongs(client); const aids = await getNotCollectedSongs(client);

View File

@ -4,7 +4,7 @@ import { formatTimestampToPsql } from "utils/formatTimestampToPostgre.ts";
import logger from "log/logger.ts"; import logger from "log/logger.ts";
import { ClassifyVideoQueue } from "mq/index.ts"; import { ClassifyVideoQueue } from "mq/index.ts";
import { userExistsInBiliUsers, videoExistsInAllData } from "db/allData.ts"; import { userExistsInBiliUsers, videoExistsInAllData } from "db/allData.ts";
import { HOUR, SECOND } from "$std/datetime/constants.ts"; import { HOUR, SECOND } from "@std/datetime";
export async function insertVideoInfo(client: Client, aid: number) { export async function insertVideoInfo(client: Client, aid: number) {
const videoExists = await videoExistsInAllData(client, aid); const videoExists = await videoExistsInAllData(client, aid);

View File

@ -2,7 +2,7 @@ import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { getLatestVideoAids } from "net/getLatestVideoAids.ts"; import { getLatestVideoAids } from "net/getLatestVideoAids.ts";
import { videoExistsInAllData } from "db/allData.ts"; import { videoExistsInAllData } from "db/allData.ts";
import { sleep } from "utils/sleep.ts"; import { sleep } from "utils/sleep.ts";
import { SECOND } from "$std/datetime/constants.ts"; import { SECOND } from "@std/datetime";
import logger from "log/logger.ts"; import logger from "log/logger.ts";
import { LatestVideosQueue } from "mq/index.ts"; import { LatestVideosQueue } from "mq/index.ts";

View File

@ -0,0 +1,22 @@
import { findClosestSnapshot, findSnapshotBefore, getLatestSnapshot } from "db/snapshotSchedule.ts";
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { HOUR } from "@std/datetime";
export const getRegularSnapshotInterval = async (client: Client, aid: number) => {
const now = Date.now();
const date = new Date(now - 24 * HOUR);
let oldSnapshot = await findSnapshotBefore(client, aid, date);
if (!oldSnapshot) oldSnapshot = await findClosestSnapshot(client, aid, date);
const latestSnapshot = await getLatestSnapshot(client, aid);
if (!oldSnapshot || !latestSnapshot) return 0;
if (oldSnapshot.created_at === latestSnapshot.created_at) return 0;
const hoursDiff = (latestSnapshot.created_at - oldSnapshot.created_at) / HOUR;
if (hoursDiff < 8) return 24;
const viewsDiff = latestSnapshot.views - oldSnapshot.views;
if (viewsDiff === 0) return 72;
const speedPerDay = viewsDiff / (hoursDiff + 0.001) * 24;
if (speedPerDay < 6) return 36;
if (speedPerDay < 120) return 24;
if (speedPerDay < 320) return 12;
return 6;
};

View File

@ -1,19 +1,18 @@
import { ConnectionOptions, Job, Worker } from "bullmq"; import { ConnectionOptions, Job, Worker } from "bullmq";
import { collectSongsWorker, getLatestVideosWorker } from "mq/executors.ts"; import { collectSongsWorker, getLatestVideosWorker, getVideoInfoWorker } from "mq/exec/executors.ts";
import { redis } from "../../core/db/redis.ts"; import { redis } from "@core/db/redis.ts";
import logger from "log/logger.ts"; import logger from "log/logger.ts";
import { lockManager } from "mq/lockManager.ts"; import { lockManager } from "mq/lockManager.ts";
import { WorkerError } from "mq/schema.ts"; import { WorkerError } from "mq/schema.ts";
import { getVideoInfoWorker } from "mq/exec/getLatestVideos.ts";
import { import {
bulkSnapshotTickWorker, bulkSnapshotTickWorker,
collectMilestoneSnapshotsWorker, collectMilestoneSnapshotsWorker,
regularSnapshotsWorker, regularSnapshotsWorker,
scheduleCleanupWorker, scheduleCleanupWorker,
snapshotTickWorker, snapshotTickWorker,
takeBulkSnapshotForVideosWorker,
takeSnapshotForVideoWorker, takeSnapshotForVideoWorker,
} from "mq/exec/snapshotTick.ts"; } from "mq/exec/snapshotTick.ts";
import {takeBulkSnapshotForVideosWorker} from "mq/exec/executors.ts";
Deno.addSignalListener("SIGINT", async () => { Deno.addSignalListener("SIGINT", async () => {
logger.log("SIGINT Received: Shutting down workers...", "mq"); logger.log("SIGINT Received: Shutting down workers...", "mq");