Compare commits

..

No commits in common. "0930bbe6f46f5169f8c060f6a4120d84768fd75e" and "b080c51c3e8d4158a7c7227f05edfc032ade73aa" have entirely different histories.

12 changed files with 42 additions and 137 deletions

View File

@ -3,10 +3,10 @@ import { VERSION } from "./main.ts";
import { createHandlers } from "./utils.ts"; import { createHandlers } from "./utils.ts";
export const rootHandler = createHandlers((c) => { export const rootHandler = createHandlers((c) => {
let singer: Singer | Singer[]; let singer: Singer | Singer[] | null;
const shouldShowSpecialSinger = Math.random() < 0.016; const shouldShowSpecialSinger = Math.random() < 0.016;
if (getSingerForBirthday().length !== 0) { if (getSingerForBirthday().length !== 0) {
singer = JSON.parse(JSON.stringify(getSingerForBirthday())) as Singer[]; singer = getSingerForBirthday();
for (const s of singer) { for (const s of singer) {
delete s.birthday; delete s.birthday;
s.message = `${s.name}生日快乐~`; s.message = `${s.name}生日快乐~`;

View File

@ -6,16 +6,8 @@ export async function getVideosNearMilestone(client: Client) {
const queryResult = await client.queryObject<LatestSnapshotType>(` const queryResult = await client.queryObject<LatestSnapshotType>(`
SELECT ls.* SELECT ls.*
FROM latest_video_snapshot ls FROM latest_video_snapshot ls
RIGHT JOIN songs ON songs.aid = ls.aid
WHERE WHERE
(views >= 50000 AND views < 100000) OR views < 100000 OR
(views >= 900000 AND views < 1000000) OR
(views >= 9900000 AND views < 10000000)
UNION
SELECT ls.*
FROM latest_video_snapshot ls
WHERE
(views >= 90000 AND views < 100000) OR
(views >= 900000 AND views < 1000000) OR (views >= 900000 AND views < 1000000) OR
(views >= 9900000 AND views < 10000000) (views >= 9900000 AND views < 10000000)
`); `);

View File

@ -1,7 +1,7 @@
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { SnapshotScheduleType } from "@core/db/schema"; import { SnapshotScheduleType } from "@core/db/schema";
import logger from "log/logger.ts"; import logger from "log/logger.ts";
import { MINUTE } from "$std/datetime/constants.ts"; import { MINUTE } from "@std/datetime";
import { redis } from "@core/db/redis.ts"; import { redis } from "@core/db/redis.ts";
import { Redis } from "ioredis"; import { Redis } from "ioredis";
@ -274,14 +274,8 @@ export async function getBulkSnapshotsInNextSecond(client: Client) {
const query = ` const query = `
SELECT * SELECT *
FROM snapshot_schedule FROM snapshot_schedule
WHERE (started_at <= NOW() + INTERVAL '15 seconds') WHERE started_at <= NOW() + INTERVAL '15 seconds' AND status = 'pending' AND type = 'normal'
AND status = 'pending' ORDER BY started_at
AND (type = 'normal' OR type = 'archive')
ORDER BY CASE
WHEN type = 'normal' THEN 1
WHEN type = 'archive' THEN 2
END,
started_at
LIMIT 1000; LIMIT 1000;
`; `;
const res = await client.queryObject<SnapshotScheduleType>(query, []); const res = await client.queryObject<SnapshotScheduleType>(query, []);
@ -312,14 +306,3 @@ export async function getVideosWithoutActiveSnapshotSchedule(client: Client) {
const res = await client.queryObject<{ aid: number }>(query, []); const res = await client.queryObject<{ aid: number }>(query, []);
return res.rows.map((r) => Number(r.aid)); return res.rows.map((r) => Number(r.aid));
} }
export async function getAllVideosWithoutActiveSnapshotSchedule(client: Client) {
const query: string = `
SELECT s.aid
FROM bilibili_metadata s
LEFT JOIN snapshot_schedule ss ON s.aid = ss.aid AND (ss.status = 'pending' OR ss.status = 'processing')
WHERE ss.aid IS NULL
`;
const res = await client.queryObject<{ aid: number }>(query, []);
return res.rows.map((r) => Number(r.aid));
}

View File

@ -12,7 +12,7 @@
"worker:filter": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/filterWorker.ts", "worker:filter": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/filterWorker.ts",
"adder": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts", "adder": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts",
"bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts", "bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts",
"all": "concurrently --restart-tries -1 'deno task worker:main' 'deno task adder' 'deno task bullui' 'deno task worker:filter'", "all": "concurrently 'deno task worker:main' 'deno task adder' 'deno task bullui' 'deno task worker:filter'",
"test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run" "test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run"
}, },
"lint": { "lint": {

View File

@ -1,40 +0,0 @@
import { Job } from "npm:bullmq@5.45.2";
import { getAllVideosWithoutActiveSnapshotSchedule, scheduleSnapshot } from "db/snapshotSchedule.ts";
import { withDbConnection } from "db/withConnection.ts";
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import logger from "log/logger.ts";
import { lockManager } from "mq/lockManager.ts";
import { getLatestVideoSnapshot } from "db/snapshot.ts";
import { HOUR, MINUTE } from "$std/datetime/constants.ts";
export const archiveSnapshotsWorker = async (_job: Job) =>
await withDbConnection<void>(async (client: Client) => {
const startedAt = Date.now();
if (await lockManager.isLocked("dispatchArchiveSnapshots")) {
logger.log("dispatchArchiveSnapshots is already running", "mq");
return;
}
await lockManager.acquireLock("dispatchArchiveSnapshots", 30 * 60);
const aids = await getAllVideosWithoutActiveSnapshotSchedule(client);
for (const rawAid of aids) {
const aid = Number(rawAid);
const latestSnapshot = await getLatestVideoSnapshot(client, aid);
const now = Date.now();
const lastSnapshotedAt = latestSnapshot?.time ?? now;
const interval = 168;
logger.log(
`Scheduled archive snapshot for aid ${aid} in ${interval} hours.`,
"mq",
"fn:archiveSnapshotsWorker",
);
const targetTime = lastSnapshotedAt + interval * HOUR;
await scheduleSnapshot(client, aid, "archive", targetTime);
if (now - startedAt > 250 * MINUTE) {
return;
}
}
}, (e) => {
logger.error(e as Error, "mq", "fn:archiveSnapshotsWorker");
}, async () => {
await lockManager.releaseLock("dispatchArchiveSnapshots");
});

View File

@ -2,4 +2,3 @@ export * from "mq/exec/getLatestVideos.ts";
export * from "./getVideoInfo.ts"; export * from "./getVideoInfo.ts";
export * from "./collectSongs.ts"; export * from "./collectSongs.ts";
export * from "./takeBulkSnapshot.ts"; export * from "./takeBulkSnapshot.ts";
export * from "./archiveSnapshots.ts";

View File

@ -3,7 +3,8 @@ import { queueLatestVideos } from "mq/task/queueLatestVideo.ts";
import { withDbConnection } from "db/withConnection.ts"; import { withDbConnection } from "db/withConnection.ts";
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
export const getLatestVideosWorker = (_job: Job): Promise<void> => export const getLatestVideosWorker = (_job: Job): Promise<void> =>
withDbConnection(async (client: Client) => { withDbConnection(async (client: Client) => {
await queueLatestVideos(client); await queueLatestVideos(client)
}); });

View File

@ -11,5 +11,5 @@ export const getVideoInfoWorker = async (job: Job): Promise<void> =>
logger.warn("aid does not exists", "mq", "job:getVideoInfo"); logger.warn("aid does not exists", "mq", "job:getVideoInfo");
return; return;
} }
await insertVideoInfo(client, aid); await insertVideoInfo(client, aid)
}); });

View File

@ -47,19 +47,13 @@ export const bulkSnapshotTickWorker = async (_job: Job) => {
const filteredAids = await bulkGetVideosWithoutProcessingSchedules(client, aids); const filteredAids = await bulkGetVideosWithoutProcessingSchedules(client, aids);
if (filteredAids.length === 0) continue; if (filteredAids.length === 0) continue;
await bulkSetSnapshotStatus(client, filteredAids, "processing"); await bulkSetSnapshotStatus(client, filteredAids, "processing");
const schedulesData = group.map((schedule) => { const dataMap: { [key: number]: number } = {};
return { for (const schedule of group) {
aid: Number(schedule.aid), const id = Number(schedule.id);
id: Number(schedule.id), dataMap[id] = Number(schedule.aid);
type: schedule.type, }
created_at: schedule.created_at,
started_at: schedule.started_at,
finished_at: schedule.finished_at,
status: schedule.status,
};
});
await SnapshotQueue.add("bulkSnapshotVideo", { await SnapshotQueue.add("bulkSnapshotVideo", {
schedules: schedulesData, map: dataMap,
}, { priority: 3 }); }, { priority: 3 });
} }
return `OK`; return `OK`;
@ -85,7 +79,7 @@ export const snapshotTickWorker = async (_job: Job) => {
const aid = Number(schedule.aid); const aid = Number(schedule.aid);
await setSnapshotStatus(client, schedule.id, "processing"); await setSnapshotStatus(client, schedule.id, "processing");
await SnapshotQueue.add("snapshotVideo", { await SnapshotQueue.add("snapshotVideo", {
aid: Number(aid), aid: aid,
id: Number(schedule.id), id: Number(schedule.id),
type: schedule.type ?? "normal", type: schedule.type ?? "normal",
}, { priority }); }, { priority });
@ -228,7 +222,7 @@ export const takeSnapshotForVideoWorker = async (job: Job) => {
"mq", "mq",
"fn:takeSnapshotForVideoWorker", "fn:takeSnapshotForVideoWorker",
); );
await setSnapshotStatus(client, id, "no_proxy"); await setSnapshotStatus(client, id, "completed");
await scheduleSnapshot(client, aid, type, Date.now() + retryInterval); await scheduleSnapshot(client, aid, type, Date.now() + retryInterval);
return; return;
} }

View File

@ -11,17 +11,15 @@ import logger from "log/logger.ts";
import { NetSchedulerError } from "@core/net/delegate.ts"; import { NetSchedulerError } from "@core/net/delegate.ts";
import { HOUR, MINUTE, SECOND } from "@std/datetime"; import { HOUR, MINUTE, SECOND } from "@std/datetime";
import { getRegularSnapshotInterval } from "../task/regularSnapshotInterval.ts"; import { getRegularSnapshotInterval } from "../task/regularSnapshotInterval.ts";
import { SnapshotScheduleType } from "@core/db/schema";
export const takeBulkSnapshotForVideosWorker = async (job: Job) => { export const takeBulkSnapshotForVideosWorker = async (job: Job) => {
const schedules: SnapshotScheduleType[] = job.data.schedules; const dataMap: { [key: number]: number } = job.data.map;
const ids = schedules.map((schedule) => Number(schedule.id)); const ids = Object.keys(dataMap).map((id) => Number(id));
const aidsToFetch: number[] = []; const aidsToFetch: number[] = [];
const client = await db.connect(); const client = await db.connect();
try { try {
for (const schedule of schedules) { for (const id of ids) {
const aid = Number(schedule.aid); const aid = Number(dataMap[id]);
const id = Number(schedule.id);
const exists = await snapshotScheduleExists(client, id); const exists = await snapshotScheduleExists(client, id);
if (!exists) { if (!exists) {
continue; continue;
@ -56,11 +54,7 @@ export const takeBulkSnapshotForVideosWorker = async (job: Job) => {
logger.log(`Taken snapshot for video ${aid} in bulk.`, "net", "fn:takeBulkSnapshotForVideosWorker"); logger.log(`Taken snapshot for video ${aid} in bulk.`, "net", "fn:takeBulkSnapshotForVideosWorker");
} }
await bulkSetSnapshotStatus(client, ids, "completed"); await bulkSetSnapshotStatus(client, ids, "completed");
for (const aid of aidsToFetch) {
for (const schedule of schedules) {
const aid = Number(schedule.aid);
const type = schedule.type;
if (type == "archive") continue;
const interval = await getRegularSnapshotInterval(client, aid); const interval = await getRegularSnapshotInterval(client, aid);
logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq"); logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq");
await scheduleSnapshot(client, aid, "normal", Date.now() + interval * HOUR); await scheduleSnapshot(client, aid, "normal", Date.now() + interval * HOUR);
@ -73,8 +67,8 @@ export const takeBulkSnapshotForVideosWorker = async (job: Job) => {
"mq", "mq",
"fn:takeBulkSnapshotForVideosWorker", "fn:takeBulkSnapshotForVideosWorker",
); );
await bulkSetSnapshotStatus(client, ids, "no_proxy"); await bulkSetSnapshotStatus(client, ids, "completed");
await bulkScheduleSnapshot(client, aidsToFetch, "normal", Date.now() + 20 * MINUTE * Math.random()); await bulkScheduleSnapshot(client, aidsToFetch, "normal", Date.now() + 2 * MINUTE);
return; return;
} }
logger.error(e as Error, "mq", "fn:takeBulkSnapshotForVideosWorker"); logger.error(e as Error, "mq", "fn:takeBulkSnapshotForVideosWorker");

View File

@ -1,4 +1,4 @@
import { HOUR, MINUTE, SECOND } from "$std/datetime/constants.ts"; import { MINUTE, SECOND } from "@std/datetime";
import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "mq/index.ts"; import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "mq/index.ts";
import logger from "log/logger.ts"; import logger from "log/logger.ts";
import { initSnapshotWindowCounts } from "db/snapshotSchedule.ts"; import { initSnapshotWindowCounts } from "db/snapshotSchedule.ts";
@ -55,11 +55,6 @@ export async function initMQ() {
immediately: true, immediately: true,
}); });
await SnapshotQueue.upsertJobScheduler("dispatchArchiveSnapshots", {
every: 6 * HOUR,
immediately: true,
});
await SnapshotQueue.upsertJobScheduler("scheduleCleanup", { await SnapshotQueue.upsertJobScheduler("scheduleCleanup", {
every: 30 * MINUTE, every: 30 * MINUTE,
immediately: true, immediately: true,

View File

@ -1,11 +1,5 @@
import { ConnectionOptions, Job, Worker } from "bullmq"; import { ConnectionOptions, Job, Worker } from "bullmq";
import { import { collectSongsWorker, getLatestVideosWorker, getVideoInfoWorker } from "mq/exec/executors.ts";
archiveSnapshotsWorker,
collectSongsWorker,
getLatestVideosWorker,
getVideoInfoWorker,
takeBulkSnapshotForVideosWorker,
} from "mq/exec/executors.ts";
import { redis } from "@core/db/redis.ts"; import { redis } from "@core/db/redis.ts";
import logger from "log/logger.ts"; import logger from "log/logger.ts";
import { lockManager } from "mq/lockManager.ts"; import { lockManager } from "mq/lockManager.ts";
@ -18,22 +12,10 @@ import {
snapshotTickWorker, snapshotTickWorker,
takeSnapshotForVideoWorker, takeSnapshotForVideoWorker,
} from "mq/exec/snapshotTick.ts"; } from "mq/exec/snapshotTick.ts";
import {takeBulkSnapshotForVideosWorker} from "mq/exec/executors.ts";
const releaseLockForJob = async (name: string) => {
await lockManager.releaseLock(name);
logger.log(`Released lock: ${name}`, "mq");
};
const releaseAllLocks = async () => {
const locks = ["dispatchRegularSnapshots", "dispatchArchiveSnapshots", "getLatestVideos"];
for (const lock of locks) {
await releaseLockForJob(lock);
}
};
Deno.addSignalListener("SIGINT", async () => { Deno.addSignalListener("SIGINT", async () => {
logger.log("SIGINT Received: Shutting down workers...", "mq"); logger.log("SIGINT Received: Shutting down workers...", "mq");
await releaseAllLocks();
await latestVideoWorker.close(true); await latestVideoWorker.close(true);
await snapshotWorker.close(true); await snapshotWorker.close(true);
Deno.exit(); Deno.exit();
@ -41,7 +23,6 @@ Deno.addSignalListener("SIGINT", async () => {
Deno.addSignalListener("SIGTERM", async () => { Deno.addSignalListener("SIGTERM", async () => {
logger.log("SIGTERM Received: Shutting down workers...", "mq"); logger.log("SIGTERM Received: Shutting down workers...", "mq");
await releaseAllLocks();
await latestVideoWorker.close(true); await latestVideoWorker.close(true);
await snapshotWorker.close(true); await snapshotWorker.close(true);
Deno.exit(); Deno.exit();
@ -78,6 +59,10 @@ latestVideoWorker.on("error", (err) => {
logger.error(e.rawError, e.service, e.codePath); logger.error(e.rawError, e.service, e.codePath);
}); });
latestVideoWorker.on("closed", async () => {
await lockManager.releaseLock("getLatestVideos");
});
const snapshotWorker = new Worker( const snapshotWorker = new Worker(
"snapshot", "snapshot",
async (job: Job) => { async (job: Job) => {
@ -96,8 +81,6 @@ const snapshotWorker = new Worker(
return await takeBulkSnapshotForVideosWorker(job); return await takeBulkSnapshotForVideosWorker(job);
case "bulkSnapshotTick": case "bulkSnapshotTick":
return await bulkSnapshotTickWorker(job); return await bulkSnapshotTickWorker(job);
case "dispatchArchiveSnapshots":
return await archiveSnapshotsWorker(job);
default: default:
break; break;
} }
@ -109,3 +92,7 @@ snapshotWorker.on("error", (err) => {
const e = err as WorkerError; const e = err as WorkerError;
logger.error(e.rawError, e.service, e.codePath); logger.error(e.rawError, e.service, e.codePath);
}); });
snapshotWorker.on("closed", async () => {
await lockManager.releaseLock("dispatchRegularSnapshots");
});