merge: branch 'main' into ref/structure
This commit is contained in:
commit
054d28e796
@ -3,10 +3,10 @@ import { VERSION } from "./main.ts";
|
|||||||
import { createHandlers } from "./utils.ts";
|
import { createHandlers } from "./utils.ts";
|
||||||
|
|
||||||
export const rootHandler = createHandlers((c) => {
|
export const rootHandler = createHandlers((c) => {
|
||||||
let singer: Singer | Singer[] | null;
|
let singer: Singer | Singer[];
|
||||||
const shouldShowSpecialSinger = Math.random() < 0.016;
|
const shouldShowSpecialSinger = Math.random() < 0.016;
|
||||||
if (getSingerForBirthday().length !== 0) {
|
if (getSingerForBirthday().length !== 0) {
|
||||||
singer = getSingerForBirthday();
|
singer = JSON.parse(JSON.stringify(getSingerForBirthday())) as Singer[];
|
||||||
for (const s of singer) {
|
for (const s of singer) {
|
||||||
delete s.birthday;
|
delete s.birthday;
|
||||||
s.message = `祝${s.name}生日快乐~`;
|
s.message = `祝${s.name}生日快乐~`;
|
||||||
|
@ -6,8 +6,16 @@ export async function getVideosNearMilestone(client: Client) {
|
|||||||
const queryResult = await client.queryObject<LatestSnapshotType>(`
|
const queryResult = await client.queryObject<LatestSnapshotType>(`
|
||||||
SELECT ls.*
|
SELECT ls.*
|
||||||
FROM latest_video_snapshot ls
|
FROM latest_video_snapshot ls
|
||||||
|
RIGHT JOIN songs ON songs.aid = ls.aid
|
||||||
WHERE
|
WHERE
|
||||||
views < 100000 OR
|
(views >= 50000 AND views < 100000) OR
|
||||||
|
(views >= 900000 AND views < 1000000) OR
|
||||||
|
(views >= 9900000 AND views < 10000000)
|
||||||
|
UNION
|
||||||
|
SELECT ls.*
|
||||||
|
FROM latest_video_snapshot ls
|
||||||
|
WHERE
|
||||||
|
(views >= 90000 AND views < 100000) OR
|
||||||
(views >= 900000 AND views < 1000000) OR
|
(views >= 900000 AND views < 1000000) OR
|
||||||
(views >= 9900000 AND views < 10000000)
|
(views >= 9900000 AND views < 10000000)
|
||||||
`);
|
`);
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
import { SnapshotScheduleType } from "@core/db/schema";
|
import { SnapshotScheduleType } from "@core/db/schema";
|
||||||
import logger from "log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
import { MINUTE } from "@std/datetime";
|
import { MINUTE } from "$std/datetime/constants.ts";
|
||||||
import { redis } from "@core/db/redis.ts";
|
import { redis } from "@core/db/redis.ts";
|
||||||
import { Redis } from "ioredis";
|
import { Redis } from "ioredis";
|
||||||
|
|
||||||
@ -272,11 +272,17 @@ export async function getSnapshotsInNextSecond(client: Client) {
|
|||||||
|
|
||||||
export async function getBulkSnapshotsInNextSecond(client: Client) {
|
export async function getBulkSnapshotsInNextSecond(client: Client) {
|
||||||
const query = `
|
const query = `
|
||||||
SELECT *
|
SELECT *
|
||||||
FROM snapshot_schedule
|
FROM snapshot_schedule
|
||||||
WHERE started_at <= NOW() + INTERVAL '15 seconds' AND status = 'pending' AND type = 'normal'
|
WHERE (started_at <= NOW() + INTERVAL '15 seconds')
|
||||||
ORDER BY started_at
|
AND status = 'pending'
|
||||||
LIMIT 1000;
|
AND (type = 'normal' OR type = 'archive')
|
||||||
|
ORDER BY CASE
|
||||||
|
WHEN type = 'normal' THEN 1
|
||||||
|
WHEN type = 'archive' THEN 2
|
||||||
|
END,
|
||||||
|
started_at
|
||||||
|
LIMIT 1000;
|
||||||
`;
|
`;
|
||||||
const res = await client.queryObject<SnapshotScheduleType>(query, []);
|
const res = await client.queryObject<SnapshotScheduleType>(query, []);
|
||||||
return res.rows;
|
return res.rows;
|
||||||
@ -306,3 +312,14 @@ export async function getVideosWithoutActiveSnapshotSchedule(client: Client) {
|
|||||||
const res = await client.queryObject<{ aid: number }>(query, []);
|
const res = await client.queryObject<{ aid: number }>(query, []);
|
||||||
return res.rows.map((r) => Number(r.aid));
|
return res.rows.map((r) => Number(r.aid));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function getAllVideosWithoutActiveSnapshotSchedule(client: Client) {
|
||||||
|
const query: string = `
|
||||||
|
SELECT s.aid
|
||||||
|
FROM bilibili_metadata s
|
||||||
|
LEFT JOIN snapshot_schedule ss ON s.aid = ss.aid AND (ss.status = 'pending' OR ss.status = 'processing')
|
||||||
|
WHERE ss.aid IS NULL
|
||||||
|
`;
|
||||||
|
const res = await client.queryObject<{ aid: number }>(query, []);
|
||||||
|
return res.rows.map((r) => Number(r.aid));
|
||||||
|
}
|
||||||
|
@ -12,7 +12,7 @@
|
|||||||
"worker:filter": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/filterWorker.ts",
|
"worker:filter": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/filterWorker.ts",
|
||||||
"adder": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts",
|
"adder": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts",
|
||||||
"bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts",
|
"bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts",
|
||||||
"all": "concurrently 'deno task worker:main' 'deno task adder' 'deno task bullui' 'deno task worker:filter'",
|
"all": "concurrently --restart-tries -1 'deno task worker:main' 'deno task adder' 'deno task bullui' 'deno task worker:filter'",
|
||||||
"test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run"
|
"test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run"
|
||||||
},
|
},
|
||||||
"lint": {
|
"lint": {
|
||||||
|
40
packages/crawler/mq/exec/archiveSnapshots.ts
Normal file
40
packages/crawler/mq/exec/archiveSnapshots.ts
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
import { Job } from "npm:bullmq@5.45.2";
|
||||||
|
import { getAllVideosWithoutActiveSnapshotSchedule, scheduleSnapshot } from "db/snapshotSchedule.ts";
|
||||||
|
import { withDbConnection } from "db/withConnection.ts";
|
||||||
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
|
import logger from "log/logger.ts";
|
||||||
|
import { lockManager } from "mq/lockManager.ts";
|
||||||
|
import { getLatestVideoSnapshot } from "db/snapshot.ts";
|
||||||
|
import { HOUR, MINUTE } from "$std/datetime/constants.ts";
|
||||||
|
|
||||||
|
export const archiveSnapshotsWorker = async (_job: Job) =>
|
||||||
|
await withDbConnection<void>(async (client: Client) => {
|
||||||
|
const startedAt = Date.now();
|
||||||
|
if (await lockManager.isLocked("dispatchArchiveSnapshots")) {
|
||||||
|
logger.log("dispatchArchiveSnapshots is already running", "mq");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
await lockManager.acquireLock("dispatchArchiveSnapshots", 30 * 60);
|
||||||
|
const aids = await getAllVideosWithoutActiveSnapshotSchedule(client);
|
||||||
|
for (const rawAid of aids) {
|
||||||
|
const aid = Number(rawAid);
|
||||||
|
const latestSnapshot = await getLatestVideoSnapshot(client, aid);
|
||||||
|
const now = Date.now();
|
||||||
|
const lastSnapshotedAt = latestSnapshot?.time ?? now;
|
||||||
|
const interval = 168;
|
||||||
|
logger.log(
|
||||||
|
`Scheduled archive snapshot for aid ${aid} in ${interval} hours.`,
|
||||||
|
"mq",
|
||||||
|
"fn:archiveSnapshotsWorker",
|
||||||
|
);
|
||||||
|
const targetTime = lastSnapshotedAt + interval * HOUR;
|
||||||
|
await scheduleSnapshot(client, aid, "archive", targetTime);
|
||||||
|
if (now - startedAt > 250 * MINUTE) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, (e) => {
|
||||||
|
logger.error(e as Error, "mq", "fn:archiveSnapshotsWorker");
|
||||||
|
}, async () => {
|
||||||
|
await lockManager.releaseLock("dispatchArchiveSnapshots");
|
||||||
|
});
|
@ -2,3 +2,4 @@ export * from "mq/exec/getLatestVideos.ts";
|
|||||||
export * from "./getVideoInfo.ts";
|
export * from "./getVideoInfo.ts";
|
||||||
export * from "./collectSongs.ts";
|
export * from "./collectSongs.ts";
|
||||||
export * from "./takeBulkSnapshot.ts";
|
export * from "./takeBulkSnapshot.ts";
|
||||||
|
export * from "./archiveSnapshots.ts";
|
||||||
|
@ -3,8 +3,7 @@ import { queueLatestVideos } from "mq/task/queueLatestVideo.ts";
|
|||||||
import { withDbConnection } from "db/withConnection.ts";
|
import { withDbConnection } from "db/withConnection.ts";
|
||||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
|
|
||||||
|
|
||||||
export const getLatestVideosWorker = (_job: Job): Promise<void> =>
|
export const getLatestVideosWorker = (_job: Job): Promise<void> =>
|
||||||
withDbConnection(async (client: Client) => {
|
withDbConnection(async (client: Client) => {
|
||||||
await queueLatestVideos(client)
|
await queueLatestVideos(client);
|
||||||
});
|
});
|
||||||
|
@ -11,5 +11,5 @@ export const getVideoInfoWorker = async (job: Job): Promise<void> =>
|
|||||||
logger.warn("aid does not exists", "mq", "job:getVideoInfo");
|
logger.warn("aid does not exists", "mq", "job:getVideoInfo");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
await insertVideoInfo(client, aid)
|
await insertVideoInfo(client, aid);
|
||||||
});
|
});
|
||||||
|
@ -4,6 +4,7 @@ import { getLatestVideoSnapshot, getVideosNearMilestone } from "db/snapshot.ts";
|
|||||||
import {
|
import {
|
||||||
bulkGetVideosWithoutProcessingSchedules,
|
bulkGetVideosWithoutProcessingSchedules,
|
||||||
bulkSetSnapshotStatus,
|
bulkSetSnapshotStatus,
|
||||||
|
getAllVideosWithoutActiveSnapshotSchedule,
|
||||||
getBulkSnapshotsInNextSecond,
|
getBulkSnapshotsInNextSecond,
|
||||||
getSnapshotsInNextSecond,
|
getSnapshotsInNextSecond,
|
||||||
getVideosWithoutActiveSnapshotSchedule,
|
getVideosWithoutActiveSnapshotSchedule,
|
||||||
@ -47,13 +48,19 @@ export const bulkSnapshotTickWorker = async (_job: Job) => {
|
|||||||
const filteredAids = await bulkGetVideosWithoutProcessingSchedules(client, aids);
|
const filteredAids = await bulkGetVideosWithoutProcessingSchedules(client, aids);
|
||||||
if (filteredAids.length === 0) continue;
|
if (filteredAids.length === 0) continue;
|
||||||
await bulkSetSnapshotStatus(client, filteredAids, "processing");
|
await bulkSetSnapshotStatus(client, filteredAids, "processing");
|
||||||
const dataMap: { [key: number]: number } = {};
|
const schedulesData = group.map((schedule) => {
|
||||||
for (const schedule of group) {
|
return {
|
||||||
const id = Number(schedule.id);
|
aid: Number(schedule.aid),
|
||||||
dataMap[id] = Number(schedule.aid);
|
id: Number(schedule.id),
|
||||||
}
|
type: schedule.type,
|
||||||
|
created_at: schedule.created_at,
|
||||||
|
started_at: schedule.started_at,
|
||||||
|
finished_at: schedule.finished_at,
|
||||||
|
status: schedule.status,
|
||||||
|
};
|
||||||
|
});
|
||||||
await SnapshotQueue.add("bulkSnapshotVideo", {
|
await SnapshotQueue.add("bulkSnapshotVideo", {
|
||||||
map: dataMap,
|
schedules: schedulesData,
|
||||||
}, { priority: 3 });
|
}, { priority: 3 });
|
||||||
}
|
}
|
||||||
return `OK`;
|
return `OK`;
|
||||||
@ -79,7 +86,7 @@ export const snapshotTickWorker = async (_job: Job) => {
|
|||||||
const aid = Number(schedule.aid);
|
const aid = Number(schedule.aid);
|
||||||
await setSnapshotStatus(client, schedule.id, "processing");
|
await setSnapshotStatus(client, schedule.id, "processing");
|
||||||
await SnapshotQueue.add("snapshotVideo", {
|
await SnapshotQueue.add("snapshotVideo", {
|
||||||
aid: aid,
|
aid: Number(aid),
|
||||||
id: Number(schedule.id),
|
id: Number(schedule.id),
|
||||||
type: schedule.type ?? "normal",
|
type: schedule.type ?? "normal",
|
||||||
}, { priority });
|
}, { priority });
|
||||||
@ -222,7 +229,7 @@ export const takeSnapshotForVideoWorker = async (job: Job) => {
|
|||||||
"mq",
|
"mq",
|
||||||
"fn:takeSnapshotForVideoWorker",
|
"fn:takeSnapshotForVideoWorker",
|
||||||
);
|
);
|
||||||
await setSnapshotStatus(client, id, "completed");
|
await setSnapshotStatus(client, id, "no_proxy");
|
||||||
await scheduleSnapshot(client, aid, type, Date.now() + retryInterval);
|
await scheduleSnapshot(client, aid, type, Date.now() + retryInterval);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -11,15 +11,17 @@ import logger from "log/logger.ts";
|
|||||||
import { NetSchedulerError } from "@core/net/delegate.ts";
|
import { NetSchedulerError } from "@core/net/delegate.ts";
|
||||||
import { HOUR, MINUTE, SECOND } from "@std/datetime";
|
import { HOUR, MINUTE, SECOND } from "@std/datetime";
|
||||||
import { getRegularSnapshotInterval } from "../task/regularSnapshotInterval.ts";
|
import { getRegularSnapshotInterval } from "../task/regularSnapshotInterval.ts";
|
||||||
|
import { SnapshotScheduleType } from "@core/db/schema";
|
||||||
|
|
||||||
export const takeBulkSnapshotForVideosWorker = async (job: Job) => {
|
export const takeBulkSnapshotForVideosWorker = async (job: Job) => {
|
||||||
const dataMap: { [key: number]: number } = job.data.map;
|
const schedules: SnapshotScheduleType[] = job.data.schedules;
|
||||||
const ids = Object.keys(dataMap).map((id) => Number(id));
|
const ids = schedules.map((schedule) => Number(schedule.id));
|
||||||
const aidsToFetch: number[] = [];
|
const aidsToFetch: number[] = [];
|
||||||
const client = await db.connect();
|
const client = await db.connect();
|
||||||
try {
|
try {
|
||||||
for (const id of ids) {
|
for (const schedule of schedules) {
|
||||||
const aid = Number(dataMap[id]);
|
const aid = Number(schedule.aid);
|
||||||
|
const id = Number(schedule.id);
|
||||||
const exists = await snapshotScheduleExists(client, id);
|
const exists = await snapshotScheduleExists(client, id);
|
||||||
if (!exists) {
|
if (!exists) {
|
||||||
continue;
|
continue;
|
||||||
@ -43,8 +45,8 @@ export const takeBulkSnapshotForVideosWorker = async (job: Job) => {
|
|||||||
const shares = stat.share;
|
const shares = stat.share;
|
||||||
const favorites = stat.collect;
|
const favorites = stat.collect;
|
||||||
const query: string = `
|
const query: string = `
|
||||||
INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites)
|
INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites)
|
||||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||||
`;
|
`;
|
||||||
await client.queryObject(
|
await client.queryObject(
|
||||||
query,
|
query,
|
||||||
@ -54,7 +56,11 @@ export const takeBulkSnapshotForVideosWorker = async (job: Job) => {
|
|||||||
logger.log(`Taken snapshot for video ${aid} in bulk.`, "net", "fn:takeBulkSnapshotForVideosWorker");
|
logger.log(`Taken snapshot for video ${aid} in bulk.`, "net", "fn:takeBulkSnapshotForVideosWorker");
|
||||||
}
|
}
|
||||||
await bulkSetSnapshotStatus(client, ids, "completed");
|
await bulkSetSnapshotStatus(client, ids, "completed");
|
||||||
for (const aid of aidsToFetch) {
|
|
||||||
|
for (const schedule of schedules) {
|
||||||
|
const aid = Number(schedule.aid);
|
||||||
|
const type = schedule.type;
|
||||||
|
if (type == "archive") continue;
|
||||||
const interval = await getRegularSnapshotInterval(client, aid);
|
const interval = await getRegularSnapshotInterval(client, aid);
|
||||||
logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq");
|
logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq");
|
||||||
await scheduleSnapshot(client, aid, "normal", Date.now() + interval * HOUR);
|
await scheduleSnapshot(client, aid, "normal", Date.now() + interval * HOUR);
|
||||||
@ -67,8 +73,8 @@ export const takeBulkSnapshotForVideosWorker = async (job: Job) => {
|
|||||||
"mq",
|
"mq",
|
||||||
"fn:takeBulkSnapshotForVideosWorker",
|
"fn:takeBulkSnapshotForVideosWorker",
|
||||||
);
|
);
|
||||||
await bulkSetSnapshotStatus(client, ids, "completed");
|
await bulkSetSnapshotStatus(client, ids, "no_proxy");
|
||||||
await bulkScheduleSnapshot(client, aidsToFetch, "normal", Date.now() + 2 * MINUTE);
|
await bulkScheduleSnapshot(client, aidsToFetch, "normal", Date.now() + 20 * MINUTE * Math.random());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
logger.error(e as Error, "mq", "fn:takeBulkSnapshotForVideosWorker");
|
logger.error(e as Error, "mq", "fn:takeBulkSnapshotForVideosWorker");
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
import { MINUTE, SECOND } from "@std/datetime";
|
import { HOUR, MINUTE, SECOND } from "$std/datetime/constants.ts";
|
||||||
import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "mq/index.ts";
|
import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "mq/index.ts";
|
||||||
import logger from "log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
import { initSnapshotWindowCounts } from "db/snapshotSchedule.ts";
|
import { initSnapshotWindowCounts } from "db/snapshotSchedule.ts";
|
||||||
@ -55,6 +55,11 @@ export async function initMQ() {
|
|||||||
immediately: true,
|
immediately: true,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
await SnapshotQueue.upsertJobScheduler("dispatchArchiveSnapshots", {
|
||||||
|
every: 6 * HOUR,
|
||||||
|
immediately: true,
|
||||||
|
});
|
||||||
|
|
||||||
await SnapshotQueue.upsertJobScheduler("scheduleCleanup", {
|
await SnapshotQueue.upsertJobScheduler("scheduleCleanup", {
|
||||||
every: 30 * MINUTE,
|
every: 30 * MINUTE,
|
||||||
immediately: true,
|
immediately: true,
|
||||||
|
@ -1,5 +1,11 @@
|
|||||||
import { ConnectionOptions, Job, Worker } from "bullmq";
|
import { ConnectionOptions, Job, Worker } from "bullmq";
|
||||||
import { collectSongsWorker, getLatestVideosWorker, getVideoInfoWorker } from "mq/exec/executors.ts";
|
import {
|
||||||
|
archiveSnapshotsWorker,
|
||||||
|
collectSongsWorker,
|
||||||
|
getLatestVideosWorker,
|
||||||
|
getVideoInfoWorker,
|
||||||
|
takeBulkSnapshotForVideosWorker,
|
||||||
|
} from "mq/exec/executors.ts";
|
||||||
import { redis } from "@core/db/redis.ts";
|
import { redis } from "@core/db/redis.ts";
|
||||||
import logger from "log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
import { lockManager } from "mq/lockManager.ts";
|
import { lockManager } from "mq/lockManager.ts";
|
||||||
@ -12,10 +18,22 @@ import {
|
|||||||
snapshotTickWorker,
|
snapshotTickWorker,
|
||||||
takeSnapshotForVideoWorker,
|
takeSnapshotForVideoWorker,
|
||||||
} from "mq/exec/snapshotTick.ts";
|
} from "mq/exec/snapshotTick.ts";
|
||||||
import {takeBulkSnapshotForVideosWorker} from "mq/exec/executors.ts";
|
|
||||||
|
const releaseLockForJob = async (name: string) => {
|
||||||
|
await lockManager.releaseLock(name);
|
||||||
|
logger.log(`Released lock: ${name}`, "mq");
|
||||||
|
};
|
||||||
|
|
||||||
|
const releaseAllLocks = async () => {
|
||||||
|
const locks = ["dispatchRegularSnapshots", "dispatchArchiveSnapshots", "getLatestVideos"];
|
||||||
|
for (const lock of locks) {
|
||||||
|
await releaseLockForJob(lock);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
Deno.addSignalListener("SIGINT", async () => {
|
Deno.addSignalListener("SIGINT", async () => {
|
||||||
logger.log("SIGINT Received: Shutting down workers...", "mq");
|
logger.log("SIGINT Received: Shutting down workers...", "mq");
|
||||||
|
await releaseAllLocks();
|
||||||
await latestVideoWorker.close(true);
|
await latestVideoWorker.close(true);
|
||||||
await snapshotWorker.close(true);
|
await snapshotWorker.close(true);
|
||||||
Deno.exit();
|
Deno.exit();
|
||||||
@ -23,6 +41,7 @@ Deno.addSignalListener("SIGINT", async () => {
|
|||||||
|
|
||||||
Deno.addSignalListener("SIGTERM", async () => {
|
Deno.addSignalListener("SIGTERM", async () => {
|
||||||
logger.log("SIGTERM Received: Shutting down workers...", "mq");
|
logger.log("SIGTERM Received: Shutting down workers...", "mq");
|
||||||
|
await releaseAllLocks();
|
||||||
await latestVideoWorker.close(true);
|
await latestVideoWorker.close(true);
|
||||||
await snapshotWorker.close(true);
|
await snapshotWorker.close(true);
|
||||||
Deno.exit();
|
Deno.exit();
|
||||||
@ -59,10 +78,6 @@ latestVideoWorker.on("error", (err) => {
|
|||||||
logger.error(e.rawError, e.service, e.codePath);
|
logger.error(e.rawError, e.service, e.codePath);
|
||||||
});
|
});
|
||||||
|
|
||||||
latestVideoWorker.on("closed", async () => {
|
|
||||||
await lockManager.releaseLock("getLatestVideos");
|
|
||||||
});
|
|
||||||
|
|
||||||
const snapshotWorker = new Worker(
|
const snapshotWorker = new Worker(
|
||||||
"snapshot",
|
"snapshot",
|
||||||
async (job: Job) => {
|
async (job: Job) => {
|
||||||
@ -81,6 +96,8 @@ const snapshotWorker = new Worker(
|
|||||||
return await takeBulkSnapshotForVideosWorker(job);
|
return await takeBulkSnapshotForVideosWorker(job);
|
||||||
case "bulkSnapshotTick":
|
case "bulkSnapshotTick":
|
||||||
return await bulkSnapshotTickWorker(job);
|
return await bulkSnapshotTickWorker(job);
|
||||||
|
case "dispatchArchiveSnapshots":
|
||||||
|
return await archiveSnapshotsWorker(job);
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -92,7 +109,3 @@ snapshotWorker.on("error", (err) => {
|
|||||||
const e = err as WorkerError;
|
const e = err as WorkerError;
|
||||||
logger.error(e.rawError, e.service, e.codePath);
|
logger.error(e.rawError, e.service, e.codePath);
|
||||||
});
|
});
|
||||||
|
|
||||||
snapshotWorker.on("closed", async () => {
|
|
||||||
await lockManager.releaseLock("dispatchRegularSnapshots");
|
|
||||||
});
|
|
||||||
|
Loading…
Reference in New Issue
Block a user