merge: branch 'main' into gitbook
This commit is contained in:
commit
003249ac4d
@ -17,6 +17,8 @@
|
|||||||
<excludeFolder url="file://$MODULE_DIR$/.idea" />
|
<excludeFolder url="file://$MODULE_DIR$/.idea" />
|
||||||
<excludeFolder url="file://$MODULE_DIR$/.vscode" />
|
<excludeFolder url="file://$MODULE_DIR$/.vscode" />
|
||||||
<excludeFolder url="file://$MODULE_DIR$/.zed" />
|
<excludeFolder url="file://$MODULE_DIR$/.zed" />
|
||||||
|
<excludeFolder url="file://$MODULE_DIR$/packages/frontend/.astro" />
|
||||||
|
<excludeFolder url="file://$MODULE_DIR$/scripts" />
|
||||||
</content>
|
</content>
|
||||||
<orderEntry type="inheritedJdk" />
|
<orderEntry type="inheritedJdk" />
|
||||||
<orderEntry type="sourceFolder" forTests="false" />
|
<orderEntry type="sourceFolder" forTests="false" />
|
||||||
|
@ -5,11 +5,14 @@
|
|||||||
"hono": "jsr:@hono/hono@^4.7.5",
|
"hono": "jsr:@hono/hono@^4.7.5",
|
||||||
"zod": "npm:zod",
|
"zod": "npm:zod",
|
||||||
"yup": "npm:yup",
|
"yup": "npm:yup",
|
||||||
"@core/": "../core/"
|
"@core/": "../core/",
|
||||||
|
"log/": "../core/log/",
|
||||||
|
"@crawler/net/videoInfo": "../crawler/net/getVideoInfo.ts",
|
||||||
|
"ioredis": "npm:ioredis"
|
||||||
},
|
},
|
||||||
"tasks": {
|
"tasks": {
|
||||||
"dev": "deno serve --env-file=.env --allow-env --allow-net --watch main.ts",
|
"dev": "deno serve --env-file=.env --allow-env --allow-net --allow-read --allow-write --allow-run --watch main.ts",
|
||||||
"start": "deno serve --env-file=.env --allow-env --allow-net --host 127.0.0.1 main.ts"
|
"start": "deno serve --env-file=.env --allow-env --allow-net --allow-read --allow-write --allow-run --host 127.0.0.1 main.ts"
|
||||||
},
|
},
|
||||||
"compilerOptions": {
|
"compilerOptions": {
|
||||||
"jsx": "precompile",
|
"jsx": "precompile",
|
||||||
|
@ -3,6 +3,7 @@ import { dbCredMiddleware, dbMiddleware } from "./database.ts";
|
|||||||
import { rootHandler } from "./root.ts";
|
import { rootHandler } from "./root.ts";
|
||||||
import { getSnapshotsHanlder } from "./snapshots.ts";
|
import { getSnapshotsHanlder } from "./snapshots.ts";
|
||||||
import { registerHandler } from "./register.ts";
|
import { registerHandler } from "./register.ts";
|
||||||
|
import { videoInfoHandler } from "./videoInfo.ts";
|
||||||
|
|
||||||
export const app = new Hono();
|
export const app = new Hono();
|
||||||
|
|
||||||
@ -14,10 +15,12 @@ app.get("/", ...rootHandler);
|
|||||||
app.get("/video/:id/snapshots", ...getSnapshotsHanlder);
|
app.get("/video/:id/snapshots", ...getSnapshotsHanlder);
|
||||||
app.post("/user", ...registerHandler);
|
app.post("/user", ...registerHandler);
|
||||||
|
|
||||||
|
app.get("/video/:id/info", ...videoInfoHandler);
|
||||||
|
|
||||||
const fetch = app.fetch;
|
const fetch = app.fetch;
|
||||||
|
|
||||||
export default {
|
export default {
|
||||||
fetch,
|
fetch,
|
||||||
} satisfies Deno.ServeDefaultExport;
|
} satisfies Deno.ServeDefaultExport;
|
||||||
|
|
||||||
export const VERSION = "0.3.0";
|
export const VERSION = "0.4.2";
|
||||||
|
@ -3,10 +3,10 @@ import { VERSION } from "./main.ts";
|
|||||||
import { createHandlers } from "./utils.ts";
|
import { createHandlers } from "./utils.ts";
|
||||||
|
|
||||||
export const rootHandler = createHandlers((c) => {
|
export const rootHandler = createHandlers((c) => {
|
||||||
let singer: Singer | Singer[] | null = null;
|
let singer: Singer | Singer[];
|
||||||
const shouldShowSpecialSinger = Math.random() < 0.016;
|
const shouldShowSpecialSinger = Math.random() < 0.016;
|
||||||
if (getSingerForBirthday().length !== 0) {
|
if (getSingerForBirthday().length !== 0) {
|
||||||
singer = getSingerForBirthday();
|
singer = JSON.parse(JSON.stringify(getSingerForBirthday())) as Singer[];
|
||||||
for (const s of singer) {
|
for (const s of singer) {
|
||||||
delete s.birthday;
|
delete s.birthday;
|
||||||
s.message = `祝${s.name}生日快乐~`;
|
s.message = `祝${s.name}生日快乐~`;
|
||||||
|
@ -12,7 +12,7 @@ const SnapshotQueryParamsSchema = object({
|
|||||||
reverse: boolean().optional(),
|
reverse: boolean().optional(),
|
||||||
});
|
});
|
||||||
|
|
||||||
const idSchema = mixed().test(
|
export const idSchema = mixed().test(
|
||||||
"is-valid-id",
|
"is-valid-id",
|
||||||
'id must be a string starting with "av" followed by digits, or "BV" followed by 10 alphanumeric characters, or a positive integer',
|
'id must be a string starting with "av" followed by digits, or "BV" followed by 10 alphanumeric characters, or a positive integer',
|
||||||
async (value) => {
|
async (value) => {
|
||||||
|
86
packages/backend/videoInfo.ts
Normal file
86
packages/backend/videoInfo.ts
Normal file
@ -0,0 +1,86 @@
|
|||||||
|
import logger from "log/logger.ts";
|
||||||
|
import { Redis } from "ioredis";
|
||||||
|
import { number, ValidationError } from "yup";
|
||||||
|
import { createHandlers } from "./utils.ts";
|
||||||
|
import { getVideoInfo, getVideoInfoByBV } from "@crawler/net/videoInfo";
|
||||||
|
import { idSchema } from "./snapshots.ts";
|
||||||
|
import { NetSchedulerError } from "@core/net/delegate.ts";
|
||||||
|
import type { Context } from "hono";
|
||||||
|
import type { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
|
import type { BlankEnv, BlankInput } from "hono/types";
|
||||||
|
import type { VideoInfoData } from "@core/net/bilibili.d.ts";
|
||||||
|
|
||||||
|
|
||||||
|
const redis = new Redis({ maxRetriesPerRequest: null });
|
||||||
|
const CACHE_EXPIRATION_SECONDS = 60;
|
||||||
|
|
||||||
|
type ContextType = Context<BlankEnv, "/video/:id/info", BlankInput>;
|
||||||
|
|
||||||
|
async function insertVideoSnapshot(client: Client, data: VideoInfoData) {
|
||||||
|
const views = data.stat.view;
|
||||||
|
const danmakus = data.stat.danmaku;
|
||||||
|
const replies = data.stat.reply;
|
||||||
|
const likes = data.stat.like;
|
||||||
|
const coins = data.stat.coin;
|
||||||
|
const shares = data.stat.share;
|
||||||
|
const favorites = data.stat.favorite;
|
||||||
|
const aid = data.aid;
|
||||||
|
|
||||||
|
const query: string = `
|
||||||
|
INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites)
|
||||||
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||||
|
`;
|
||||||
|
|
||||||
|
await client.queryObject(
|
||||||
|
query,
|
||||||
|
[aid, views, danmakus, replies, likes, coins, shares, favorites],
|
||||||
|
);
|
||||||
|
|
||||||
|
logger.log(`Inserted into snapshot for video ${aid} by videoInfo API.`, "api", "fn:insertVideoSnapshot");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
export const videoInfoHandler = createHandlers(async (c: ContextType) => {
|
||||||
|
const client = c.get("db");
|
||||||
|
try {
|
||||||
|
const id = await idSchema.validate(c.req.param("id"));
|
||||||
|
let videoId: string | number = id as string;
|
||||||
|
if (videoId.startsWith("av")) {
|
||||||
|
videoId = parseInt(videoId.slice(2));
|
||||||
|
} else if (await number().isValid(videoId)) {
|
||||||
|
videoId = parseInt(videoId);
|
||||||
|
}
|
||||||
|
|
||||||
|
const cacheKey = `cvsa:videoInfo:${videoId}`;
|
||||||
|
const cachedData = await redis.get(cacheKey);
|
||||||
|
|
||||||
|
if (cachedData) {
|
||||||
|
return c.json(JSON.parse(cachedData));
|
||||||
|
}
|
||||||
|
|
||||||
|
let result: VideoInfoData | number;
|
||||||
|
if (typeof videoId === "number") {
|
||||||
|
result = await getVideoInfo(videoId, "getVideoInfo");
|
||||||
|
} else {
|
||||||
|
result = await getVideoInfoByBV(videoId, "getVideoInfo");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (typeof result === "number") {
|
||||||
|
return c.json({ message: "Error fetching video info", code: result }, 500);
|
||||||
|
}
|
||||||
|
|
||||||
|
await redis.setex(cacheKey, CACHE_EXPIRATION_SECONDS, JSON.stringify(result));
|
||||||
|
|
||||||
|
await insertVideoSnapshot(client, result);
|
||||||
|
|
||||||
|
return c.json(result);
|
||||||
|
} catch (e) {
|
||||||
|
if (e instanceof ValidationError) {
|
||||||
|
return c.json({ message: "Invalid query parameters", errors: e.errors }, 400);
|
||||||
|
} else if (e instanceof NetSchedulerError) {
|
||||||
|
return c.json({ message: "Error fetching video info", code: e.code }, 500);
|
||||||
|
} else {
|
||||||
|
return c.json({ message: "Unhandled error", error: e }, 500);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
@ -1,4 +1,12 @@
|
|||||||
{
|
{
|
||||||
"name": "@cvsa/core",
|
"name": "@cvsa/core",
|
||||||
"exports": "./main.ts"
|
"exports": "./main.ts",
|
||||||
|
"imports": {
|
||||||
|
"ioredis": "npm:ioredis",
|
||||||
|
"log/": "./log/",
|
||||||
|
"db/": "./db/",
|
||||||
|
"$std/": "https://deno.land/std@0.216.0/",
|
||||||
|
"mq/": "./mq/",
|
||||||
|
"chalk": "npm:chalk"
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
import winston, { format, transports } from "npm:winston";
|
import winston, { format, transports } from "npm:winston";
|
||||||
import { TransformableInfo } from "npm:logform";
|
import type { TransformableInfo } from "npm:logform";
|
||||||
import chalk from "chalk";
|
import chalk from "chalk";
|
||||||
|
|
||||||
const customFormat = format.printf((info: TransformableInfo) => {
|
const customFormat = format.printf((info: TransformableInfo) => {
|
@ -1,4 +1,4 @@
|
|||||||
import { SlidingWindow } from "mq/slidingWindow.ts";
|
import { SlidingWindow } from "./slidingWindow.ts";
|
||||||
|
|
||||||
export interface RateLimiterConfig {
|
export interface RateLimiterConfig {
|
||||||
window: SlidingWindow;
|
window: SlidingWindow;
|
@ -1,5 +1,5 @@
|
|||||||
import logger from "log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
import { RateLimiter, RateLimiterConfig } from "mq/rateLimiter.ts";
|
import { RateLimiter, type RateLimiterConfig } from "mq/rateLimiter.ts";
|
||||||
import { SlidingWindow } from "mq/slidingWindow.ts";
|
import { SlidingWindow } from "mq/slidingWindow.ts";
|
||||||
import { redis } from "db/redis.ts";
|
import { redis } from "db/redis.ts";
|
||||||
import Redis from "ioredis";
|
import Redis from "ioredis";
|
||||||
@ -69,14 +69,6 @@ class NetworkDelegate {
|
|||||||
this.proxies[proxyName] = { type, data };
|
this.proxies[proxyName] = { type, data };
|
||||||
}
|
}
|
||||||
|
|
||||||
private cleanupProxyLimiters(proxyName: string): void {
|
|
||||||
for (const limiterId in this.proxyLimiters) {
|
|
||||||
if (limiterId.startsWith(`proxy-${proxyName}`)) {
|
|
||||||
delete this.proxyLimiters[limiterId];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
addTask(taskName: string, provider: string, proxies: string[] | "all"): void {
|
addTask(taskName: string, provider: string, proxies: string[] | "all"): void {
|
||||||
this.tasks[taskName] = { provider, proxies };
|
this.tasks[taskName] = { provider, proxies };
|
||||||
}
|
}
|
||||||
@ -271,6 +263,7 @@ class NetworkDelegate {
|
|||||||
const out = decoder.decode(output.stdout);
|
const out = decoder.decode(output.stdout);
|
||||||
const rawData = JSON.parse(out);
|
const rawData = JSON.parse(out);
|
||||||
if (rawData.statusCode !== 200) {
|
if (rawData.statusCode !== 200) {
|
||||||
|
// noinspection ExceptionCaughtLocallyJS
|
||||||
throw new NetSchedulerError(
|
throw new NetSchedulerError(
|
||||||
`Error proxying ${url} to ali-fc region ${region}, code: ${rawData.statusCode}.`,
|
`Error proxying ${url} to ali-fc region ${region}, code: ${rawData.statusCode}.`,
|
||||||
"ALICLOUD_PROXY_ERR",
|
"ALICLOUD_PROXY_ERR",
|
@ -4,7 +4,15 @@ import { SnapshotNumber } from "mq/task/getVideoStats.ts";
|
|||||||
|
|
||||||
export async function getVideosNearMilestone(client: Client) {
|
export async function getVideosNearMilestone(client: Client) {
|
||||||
const queryResult = await client.queryObject<LatestSnapshotType>(`
|
const queryResult = await client.queryObject<LatestSnapshotType>(`
|
||||||
SELECT ls
|
SELECT ls.*
|
||||||
|
FROM latest_video_snapshot ls
|
||||||
|
RIGHT JOIN songs ON songs.aid = ls.aid
|
||||||
|
WHERE
|
||||||
|
(views >= 50000 AND views < 100000) OR
|
||||||
|
(views >= 900000 AND views < 1000000) OR
|
||||||
|
(views >= 9900000 AND views < 10000000)
|
||||||
|
UNION
|
||||||
|
SELECT ls.*
|
||||||
FROM latest_video_snapshot ls
|
FROM latest_video_snapshot ls
|
||||||
WHERE
|
WHERE
|
||||||
(views >= 90000 AND views < 100000) OR
|
(views >= 90000 AND views < 100000) OR
|
||||||
|
@ -2,7 +2,7 @@ import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
|||||||
import { SnapshotScheduleType } from "@core/db/schema";
|
import { SnapshotScheduleType } from "@core/db/schema";
|
||||||
import logger from "log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
import { MINUTE } from "$std/datetime/constants.ts";
|
import { MINUTE } from "$std/datetime/constants.ts";
|
||||||
import { redis } from "db/redis.ts";
|
import { redis } from "@core/db/redis.ts";
|
||||||
import { Redis } from "ioredis";
|
import { Redis } from "ioredis";
|
||||||
|
|
||||||
const REDIS_KEY = "cvsa:snapshot_window_counts";
|
const REDIS_KEY = "cvsa:snapshot_window_counts";
|
||||||
@ -69,6 +69,14 @@ export async function videoHasActiveSchedule(client: Client, aid: number) {
|
|||||||
return res.rows.length > 0;
|
return res.rows.length > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function videoHasActiveScheduleWithType(client: Client, aid: number, type: string) {
|
||||||
|
const res = await client.queryObject<{ status: string }>(
|
||||||
|
`SELECT status FROM snapshot_schedule WHERE aid = $1 AND (status = 'pending' OR status = 'processing') AND type = $2`,
|
||||||
|
[aid, type],
|
||||||
|
);
|
||||||
|
return res.rows.length > 0;
|
||||||
|
}
|
||||||
|
|
||||||
export async function videoHasProcessingSchedule(client: Client, aid: number) {
|
export async function videoHasProcessingSchedule(client: Client, aid: number) {
|
||||||
const res = await client.queryObject<{ status: string }>(
|
const res = await client.queryObject<{ status: string }>(
|
||||||
`SELECT status FROM snapshot_schedule WHERE aid = $1 AND status = 'processing'`,
|
`SELECT status FROM snapshot_schedule WHERE aid = $1 AND status = 'processing'`,
|
||||||
@ -173,7 +181,7 @@ export async function scheduleSnapshot(
|
|||||||
targetTime: number,
|
targetTime: number,
|
||||||
force: boolean = false,
|
force: boolean = false,
|
||||||
) {
|
) {
|
||||||
if (await videoHasActiveSchedule(client, aid) && !force) return;
|
if (await videoHasActiveScheduleWithType(client, aid, type) && !force) return;
|
||||||
let adjustedTime = new Date(targetTime);
|
let adjustedTime = new Date(targetTime);
|
||||||
if (type !== "milestone" && type !== "new") {
|
if (type !== "milestone" && type !== "new") {
|
||||||
adjustedTime = await adjustSnapshotTime(new Date(targetTime), 1000, redis);
|
adjustedTime = await adjustSnapshotTime(new Date(targetTime), 1000, redis);
|
||||||
@ -266,8 +274,14 @@ export async function getBulkSnapshotsInNextSecond(client: Client) {
|
|||||||
const query = `
|
const query = `
|
||||||
SELECT *
|
SELECT *
|
||||||
FROM snapshot_schedule
|
FROM snapshot_schedule
|
||||||
WHERE started_at <= NOW() + INTERVAL '15 seconds' AND status = 'pending' AND type = 'normal'
|
WHERE (started_at <= NOW() + INTERVAL '15 seconds')
|
||||||
ORDER BY started_at
|
AND status = 'pending'
|
||||||
|
AND (type = 'normal' OR type = 'archive')
|
||||||
|
ORDER BY CASE
|
||||||
|
WHEN type = 'normal' THEN 1
|
||||||
|
WHEN type = 'archive' THEN 2
|
||||||
|
END,
|
||||||
|
started_at
|
||||||
LIMIT 1000;
|
LIMIT 1000;
|
||||||
`;
|
`;
|
||||||
const res = await client.queryObject<SnapshotScheduleType>(query, []);
|
const res = await client.queryObject<SnapshotScheduleType>(query, []);
|
||||||
@ -298,3 +312,15 @@ export async function getVideosWithoutActiveSnapshotSchedule(client: Client) {
|
|||||||
const res = await client.queryObject<{ aid: number }>(query, []);
|
const res = await client.queryObject<{ aid: number }>(query, []);
|
||||||
return res.rows.map((r) => Number(r.aid));
|
return res.rows.map((r) => Number(r.aid));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function getAllVideosWithoutActiveSnapshotSchedule(client: Client) {
|
||||||
|
const query: string = `
|
||||||
|
SELECT s.aid
|
||||||
|
FROM bilibili_metadata s
|
||||||
|
LEFT JOIN snapshot_schedule ss ON s.aid = ss.aid AND (ss.status = 'pending' OR ss.status = 'processing')
|
||||||
|
WHERE ss.aid IS NULL
|
||||||
|
`;
|
||||||
|
const res = await client.queryObject<{ aid: number }>(query, []);
|
||||||
|
return res.rows.map((r) => Number(r.aid));
|
||||||
|
}
|
||||||
|
|
||||||
|
@ -12,7 +12,7 @@
|
|||||||
"worker:filter": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/filterWorker.ts",
|
"worker:filter": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/filterWorker.ts",
|
||||||
"adder": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts",
|
"adder": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts",
|
||||||
"bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts",
|
"bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts",
|
||||||
"all": "concurrently 'deno task worker:main' 'deno task adder' 'deno task bullui' 'deno task worker:filter'",
|
"all": "concurrently --restart-tries -1 'deno task worker:main' 'deno task adder' 'deno task bullui' 'deno task worker:filter'",
|
||||||
"test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run"
|
"test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run"
|
||||||
},
|
},
|
||||||
"lint": {
|
"lint": {
|
||||||
@ -27,7 +27,8 @@
|
|||||||
"bullmq": "npm:bullmq",
|
"bullmq": "npm:bullmq",
|
||||||
"mq/": "./mq/",
|
"mq/": "./mq/",
|
||||||
"db/": "./db/",
|
"db/": "./db/",
|
||||||
"log/": "./log/",
|
"@core/": "../core/",
|
||||||
|
"log/": "../core/log/",
|
||||||
"net/": "./net/",
|
"net/": "./net/",
|
||||||
"ml/": "./ml/",
|
"ml/": "./ml/",
|
||||||
"utils/": "./utils/",
|
"utils/": "./utils/",
|
||||||
|
@ -6,7 +6,7 @@ import {
|
|||||||
bulkScheduleSnapshot,
|
bulkScheduleSnapshot,
|
||||||
bulkSetSnapshotStatus,
|
bulkSetSnapshotStatus,
|
||||||
findClosestSnapshot,
|
findClosestSnapshot,
|
||||||
findSnapshotBefore,
|
findSnapshotBefore, getAllVideosWithoutActiveSnapshotSchedule,
|
||||||
getBulkSnapshotsInNextSecond,
|
getBulkSnapshotsInNextSecond,
|
||||||
getLatestSnapshot,
|
getLatestSnapshot,
|
||||||
getSnapshotsInNextSecond,
|
getSnapshotsInNextSecond,
|
||||||
@ -21,13 +21,14 @@ import { HOUR, MINUTE, SECOND, WEEK } from "$std/datetime/constants.ts";
|
|||||||
import logger from "log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
import { SnapshotQueue } from "mq/index.ts";
|
import { SnapshotQueue } from "mq/index.ts";
|
||||||
import { insertVideoSnapshot } from "mq/task/getVideoStats.ts";
|
import { insertVideoSnapshot } from "mq/task/getVideoStats.ts";
|
||||||
import { NetSchedulerError } from "net/delegate.ts";
|
import { NetSchedulerError } from "@core/net/delegate.ts";
|
||||||
import { getBiliVideoStatus, setBiliVideoStatus } from "db/allData.ts";
|
import { getBiliVideoStatus, setBiliVideoStatus } from "db/allData.ts";
|
||||||
import { truncate } from "utils/truncate.ts";
|
import { truncate } from "utils/truncate.ts";
|
||||||
import { lockManager } from "mq/lockManager.ts";
|
import { lockManager } from "mq/lockManager.ts";
|
||||||
import { getSongsPublihsedAt } from "db/songs.ts";
|
import { getSongsPublihsedAt } from "db/songs.ts";
|
||||||
import { bulkGetVideoStats } from "net/bulkGetVideoStats.ts";
|
import { bulkGetVideoStats } from "net/bulkGetVideoStats.ts";
|
||||||
import { getAdjustedShortTermETA } from "../scheduling.ts";
|
import { getAdjustedShortTermETA } from "../scheduling.ts";
|
||||||
|
import {SnapshotScheduleType} from "@core/db/schema";
|
||||||
|
|
||||||
const priorityMap: { [key: string]: number } = {
|
const priorityMap: { [key: string]: number } = {
|
||||||
"milestone": 1,
|
"milestone": 1,
|
||||||
@ -52,15 +53,22 @@ export const bulkSnapshotTickWorker = async (_job: Job) => {
|
|||||||
const filteredAids = await bulkGetVideosWithoutProcessingSchedules(client, aids);
|
const filteredAids = await bulkGetVideosWithoutProcessingSchedules(client, aids);
|
||||||
if (filteredAids.length === 0) continue;
|
if (filteredAids.length === 0) continue;
|
||||||
await bulkSetSnapshotStatus(client, filteredAids, "processing");
|
await bulkSetSnapshotStatus(client, filteredAids, "processing");
|
||||||
const dataMap: { [key: number]: number } = {};
|
const schedulesData = group.map((schedule) => {
|
||||||
for (const schedule of group) {
|
return {
|
||||||
const id = Number(schedule.id);
|
aid: Number(schedule.aid),
|
||||||
dataMap[id] = Number(schedule.aid);
|
id: Number(schedule.id),
|
||||||
|
type: schedule.type,
|
||||||
|
created_at: schedule.created_at,
|
||||||
|
started_at: schedule.started_at,
|
||||||
|
finished_at: schedule.finished_at,
|
||||||
|
status: schedule.status
|
||||||
}
|
}
|
||||||
|
})
|
||||||
await SnapshotQueue.add("bulkSnapshotVideo", {
|
await SnapshotQueue.add("bulkSnapshotVideo", {
|
||||||
map: dataMap,
|
schedules: schedulesData,
|
||||||
}, { priority: 3 });
|
}, { priority: 3 });
|
||||||
}
|
}
|
||||||
|
return `OK`
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
logger.error(e as Error);
|
logger.error(e as Error);
|
||||||
} finally {
|
} finally {
|
||||||
@ -74,7 +82,7 @@ export const snapshotTickWorker = async (_job: Job) => {
|
|||||||
const schedules = await getSnapshotsInNextSecond(client);
|
const schedules = await getSnapshotsInNextSecond(client);
|
||||||
for (const schedule of schedules) {
|
for (const schedule of schedules) {
|
||||||
if (await videoHasProcessingSchedule(client, Number(schedule.aid))) {
|
if (await videoHasProcessingSchedule(client, Number(schedule.aid))) {
|
||||||
return `ALREADY_PROCESSING`;
|
continue;
|
||||||
}
|
}
|
||||||
let priority = 3;
|
let priority = 3;
|
||||||
if (schedule.type && priorityMap[schedule.type]) {
|
if (schedule.type && priorityMap[schedule.type]) {
|
||||||
@ -83,11 +91,12 @@ export const snapshotTickWorker = async (_job: Job) => {
|
|||||||
const aid = Number(schedule.aid);
|
const aid = Number(schedule.aid);
|
||||||
await setSnapshotStatus(client, schedule.id, "processing");
|
await setSnapshotStatus(client, schedule.id, "processing");
|
||||||
await SnapshotQueue.add("snapshotVideo", {
|
await SnapshotQueue.add("snapshotVideo", {
|
||||||
aid: aid,
|
aid: Number(aid),
|
||||||
id: Number(schedule.id),
|
id: Number(schedule.id),
|
||||||
type: schedule.type ?? "normal",
|
type: schedule.type ?? "normal",
|
||||||
}, { priority });
|
}, { priority });
|
||||||
}
|
}
|
||||||
|
return `OK`;
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
logger.error(e as Error);
|
logger.error(e as Error);
|
||||||
} finally {
|
} finally {
|
||||||
@ -108,14 +117,15 @@ export const collectMilestoneSnapshotsWorker = async (_job: Job) => {
|
|||||||
for (const video of videos) {
|
for (const video of videos) {
|
||||||
const aid = Number(video.aid);
|
const aid = Number(video.aid);
|
||||||
const eta = await getAdjustedShortTermETA(client, aid);
|
const eta = await getAdjustedShortTermETA(client, aid);
|
||||||
if (eta > 72) continue;
|
if (eta > 144) continue;
|
||||||
const now = Date.now();
|
const now = Date.now();
|
||||||
const scheduledNextSnapshotDelay = eta * HOUR;
|
const scheduledNextSnapshotDelay = eta * HOUR;
|
||||||
const maxInterval = 4 * HOUR;
|
const maxInterval = 4 * HOUR;
|
||||||
const minInterval = 1 * SECOND;
|
const minInterval = 1 * SECOND;
|
||||||
const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval);
|
const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval);
|
||||||
const targetTime = now + delay;
|
const targetTime = now + delay;
|
||||||
await scheduleSnapshot(client, aid, "milestone", targetTime, true);
|
await scheduleSnapshot(client, aid, "milestone", targetTime);
|
||||||
|
logger.log(`Scheduled milestone snapshot for aid ${aid} in ${(delay / MINUTE).toFixed(2)} mins.`, "mq");
|
||||||
}
|
}
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
logger.error(e as Error, "mq", "fn:collectMilestoneSnapshotsWorker");
|
logger.error(e as Error, "mq", "fn:collectMilestoneSnapshotsWorker");
|
||||||
@ -143,6 +153,38 @@ const getRegularSnapshotInterval = async (client: Client, aid: number) => {
|
|||||||
return 6;
|
return 6;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
export const archiveSnapshotsWorker = async (_job: Job) => {
|
||||||
|
const client = await db.connect();
|
||||||
|
const startedAt = Date.now();
|
||||||
|
if (await lockManager.isLocked("dispatchArchiveSnapshots")) {
|
||||||
|
logger.log("dispatchArchiveSnapshots is already running", "mq");
|
||||||
|
client.release();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
await lockManager.acquireLock("dispatchArchiveSnapshots", 30 * 60);
|
||||||
|
try {
|
||||||
|
const aids = await getAllVideosWithoutActiveSnapshotSchedule(client);
|
||||||
|
for (const rawAid of aids) {
|
||||||
|
const aid = Number(rawAid);
|
||||||
|
const latestSnapshot = await getLatestVideoSnapshot(client, aid);
|
||||||
|
const now = Date.now();
|
||||||
|
const lastSnapshotedAt = latestSnapshot?.time ?? now;
|
||||||
|
const interval = 168;
|
||||||
|
logger.log(`Scheduled archive snapshot for aid ${aid} in ${interval} hours.`, "mq", "fn:archiveSnapshotsWorker");
|
||||||
|
const targetTime = lastSnapshotedAt + interval * HOUR;
|
||||||
|
await scheduleSnapshot(client, aid, "archive", targetTime);
|
||||||
|
if (now - startedAt > 250 * MINUTE) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
logger.error(e as Error, "mq", "fn:archiveSnapshotsWorker");
|
||||||
|
} finally {
|
||||||
|
await lockManager.releaseLock("dispatchArchiveSnapshots");
|
||||||
|
client.release();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
export const regularSnapshotsWorker = async (_job: Job) => {
|
export const regularSnapshotsWorker = async (_job: Job) => {
|
||||||
const client = await db.connect();
|
const client = await db.connect();
|
||||||
const startedAt = Date.now();
|
const startedAt = Date.now();
|
||||||
@ -176,13 +218,14 @@ export const regularSnapshotsWorker = async (_job: Job) => {
|
|||||||
};
|
};
|
||||||
|
|
||||||
export const takeBulkSnapshotForVideosWorker = async (job: Job) => {
|
export const takeBulkSnapshotForVideosWorker = async (job: Job) => {
|
||||||
const dataMap: { [key: number]: number } = job.data.map;
|
const schedules: SnapshotScheduleType[] = job.data.schedules;
|
||||||
const ids = Object.keys(dataMap).map((id) => Number(id));
|
const ids = schedules.map((schedule) => Number(schedule.id));
|
||||||
const aidsToFetch: number[] = [];
|
const aidsToFetch: number[] = [];
|
||||||
const client = await db.connect();
|
const client = await db.connect();
|
||||||
try {
|
try {
|
||||||
for (const id of ids) {
|
for (const schedule of schedules) {
|
||||||
const aid = Number(dataMap[id]);
|
const aid = Number(schedule.aid);
|
||||||
|
const id = Number(schedule.id);
|
||||||
const exists = await snapshotScheduleExists(client, id);
|
const exists = await snapshotScheduleExists(client, id);
|
||||||
if (!exists) {
|
if (!exists) {
|
||||||
continue;
|
continue;
|
||||||
@ -217,7 +260,11 @@ export const takeBulkSnapshotForVideosWorker = async (job: Job) => {
|
|||||||
logger.log(`Taken snapshot for video ${aid} in bulk.`, "net", "fn:takeBulkSnapshotForVideosWorker");
|
logger.log(`Taken snapshot for video ${aid} in bulk.`, "net", "fn:takeBulkSnapshotForVideosWorker");
|
||||||
}
|
}
|
||||||
await bulkSetSnapshotStatus(client, ids, "completed");
|
await bulkSetSnapshotStatus(client, ids, "completed");
|
||||||
for (const aid of aidsToFetch) {
|
|
||||||
|
for (const schedule of schedules) {
|
||||||
|
const aid = Number(schedule.aid);
|
||||||
|
const type = schedule.type;
|
||||||
|
if (type == 'archive') continue;
|
||||||
const interval = await getRegularSnapshotInterval(client, aid);
|
const interval = await getRegularSnapshotInterval(client, aid);
|
||||||
logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq");
|
logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq");
|
||||||
await scheduleSnapshot(client, aid, "normal", Date.now() + interval * HOUR);
|
await scheduleSnapshot(client, aid, "normal", Date.now() + interval * HOUR);
|
||||||
@ -230,8 +277,8 @@ export const takeBulkSnapshotForVideosWorker = async (job: Job) => {
|
|||||||
"mq",
|
"mq",
|
||||||
"fn:takeBulkSnapshotForVideosWorker",
|
"fn:takeBulkSnapshotForVideosWorker",
|
||||||
);
|
);
|
||||||
await bulkSetSnapshotStatus(client, ids, "completed");
|
await bulkSetSnapshotStatus(client, ids, "no_proxy");
|
||||||
await bulkScheduleSnapshot(client, aidsToFetch, "normal", Date.now() + 2 * MINUTE);
|
await bulkScheduleSnapshot(client, aidsToFetch, "normal", Date.now() + 20 * MINUTE * Math.random());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
logger.error(e as Error, "mq", "fn:takeBulkSnapshotForVideosWorker");
|
logger.error(e as Error, "mq", "fn:takeBulkSnapshotForVideosWorker");
|
||||||
@ -296,10 +343,11 @@ export const takeSnapshotForVideoWorker = async (job: Job) => {
|
|||||||
}
|
}
|
||||||
if (type !== "milestone") return `DONE`;
|
if (type !== "milestone") return `DONE`;
|
||||||
const eta = await getAdjustedShortTermETA(client, aid);
|
const eta = await getAdjustedShortTermETA(client, aid);
|
||||||
if (eta > 72) return "ETA_TOO_LONG";
|
if (eta > 144) return "ETA_TOO_LONG";
|
||||||
const now = Date.now();
|
const now = Date.now();
|
||||||
const targetTime = now + eta * HOUR;
|
const targetTime = now + eta * HOUR;
|
||||||
await scheduleSnapshot(client, aid, type, targetTime);
|
await scheduleSnapshot(client, aid, type, targetTime);
|
||||||
|
await setSnapshotStatus(client, id, "completed");
|
||||||
return `DONE`;
|
return `DONE`;
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") {
|
if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") {
|
||||||
@ -308,7 +356,7 @@ export const takeSnapshotForVideoWorker = async (job: Job) => {
|
|||||||
"mq",
|
"mq",
|
||||||
"fn:takeSnapshotForVideoWorker",
|
"fn:takeSnapshotForVideoWorker",
|
||||||
);
|
);
|
||||||
await setSnapshotStatus(client, id, "completed");
|
await setSnapshotStatus(client, id, "no_proxy");
|
||||||
await scheduleSnapshot(client, aid, type, Date.now() + retryInterval);
|
await scheduleSnapshot(client, aid, type, Date.now() + retryInterval);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1,9 +1,9 @@
|
|||||||
import { MINUTE, SECOND } from "$std/datetime/constants.ts";
|
import { HOUR, MINUTE, SECOND } from "$std/datetime/constants.ts";
|
||||||
import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "mq/index.ts";
|
import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "mq/index.ts";
|
||||||
import logger from "log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
import { initSnapshotWindowCounts } from "db/snapshotSchedule.ts";
|
import { initSnapshotWindowCounts } from "db/snapshotSchedule.ts";
|
||||||
import { db } from "db/init.ts";
|
import { db } from "db/init.ts";
|
||||||
import { redis } from "db/redis.ts";
|
import { redis } from "@core/db/redis.ts";
|
||||||
|
|
||||||
export async function initMQ() {
|
export async function initMQ() {
|
||||||
const client = await db.connect();
|
const client = await db.connect();
|
||||||
@ -30,8 +30,8 @@ export async function initMQ() {
|
|||||||
immediately: true,
|
immediately: true,
|
||||||
}, {
|
}, {
|
||||||
opts: {
|
opts: {
|
||||||
removeOnComplete: 1,
|
removeOnComplete: 300,
|
||||||
removeOnFail: 1,
|
removeOnFail: 600,
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -40,8 +40,8 @@ export async function initMQ() {
|
|||||||
immediately: true,
|
immediately: true,
|
||||||
}, {
|
}, {
|
||||||
opts: {
|
opts: {
|
||||||
removeOnComplete: 1,
|
removeOnComplete: 60,
|
||||||
removeOnFail: 1,
|
removeOnFail: 600,
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
@ -55,6 +55,11 @@ export async function initMQ() {
|
|||||||
immediately: true,
|
immediately: true,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
await SnapshotQueue.upsertJobScheduler("dispatchArchiveSnapshots", {
|
||||||
|
every: 6 * HOUR,
|
||||||
|
immediately: true,
|
||||||
|
});
|
||||||
|
|
||||||
await SnapshotQueue.upsertJobScheduler("scheduleCleanup", {
|
await SnapshotQueue.upsertJobScheduler("scheduleCleanup", {
|
||||||
every: 30 * MINUTE,
|
every: 30 * MINUTE,
|
||||||
immediately: true,
|
immediately: true,
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
import { Redis } from "ioredis";
|
import { Redis } from "ioredis";
|
||||||
import { redis } from "db/redis.ts";
|
import { redis } from "../../core/db/redis.ts";
|
||||||
|
|
||||||
class LockManager {
|
class LockManager {
|
||||||
private redis: Redis;
|
private redis: Redis;
|
||||||
|
@ -1,9 +1,3 @@
|
|||||||
/*
|
|
||||||
* Returns the minimum ETA in hours for the next snapshot
|
|
||||||
* @param client - Postgres client
|
|
||||||
* @param aid - aid of the video
|
|
||||||
* @returns ETA in hours
|
|
||||||
*/
|
|
||||||
import { findClosestSnapshot, getLatestSnapshot, hasAtLeast2Snapshots } from "db/snapshotSchedule.ts";
|
import { findClosestSnapshot, getLatestSnapshot, hasAtLeast2Snapshots } from "db/snapshotSchedule.ts";
|
||||||
import { truncate } from "utils/truncate.ts";
|
import { truncate } from "utils/truncate.ts";
|
||||||
import { closetMilestone } from "./exec/snapshotTick.ts";
|
import { closetMilestone } from "./exec/snapshotTick.ts";
|
||||||
@ -12,6 +6,12 @@ import { HOUR, MINUTE } from "$std/datetime/constants.ts";
|
|||||||
|
|
||||||
const log = (value: number, base: number = 10) => Math.log(value) / Math.log(base);
|
const log = (value: number, base: number = 10) => Math.log(value) / Math.log(base);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Returns the minimum ETA in hours for the next snapshot
|
||||||
|
* @param client - Postgres client
|
||||||
|
* @param aid - aid of the video
|
||||||
|
* @returns ETA in hours
|
||||||
|
*/
|
||||||
export const getAdjustedShortTermETA = async (client: Client, aid: number) => {
|
export const getAdjustedShortTermETA = async (client: Client, aid: number) => {
|
||||||
const latestSnapshot = await getLatestSnapshot(client, aid);
|
const latestSnapshot = await getLatestSnapshot(client, aid);
|
||||||
// Immediately dispatch a snapshot if there is no snapshot yet
|
// Immediately dispatch a snapshot if there is no snapshot yet
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
import networkDelegate from "./delegate.ts";
|
import networkDelegate from "@core/net/delegate.ts";
|
||||||
import { MediaListInfoData, MediaListInfoResponse } from "net/bilibili.d.ts";
|
import { MediaListInfoData, MediaListInfoResponse } from "@core/net/bilibili.d.ts";
|
||||||
import logger from "log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
import { VideoListResponse } from "net/bilibili.d.ts";
|
import { VideoListResponse } from "@core/net/bilibili.d.ts";
|
||||||
import logger from "log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
import networkDelegate from "./delegate.ts";
|
import networkDelegate from "@core/net/delegate.ts";
|
||||||
|
|
||||||
export async function getLatestVideoAids(page: number = 1, pageSize: number = 10): Promise<number[]> {
|
export async function getLatestVideoAids(page: number = 1, pageSize: number = 10): Promise<number[]> {
|
||||||
const startFrom = 1 + pageSize * (page - 1);
|
const startFrom = 1 + pageSize * (page - 1);
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
import networkDelegate from "./delegate.ts";
|
import networkDelegate from "@core/net/delegate.ts";
|
||||||
import { VideoDetailsData, VideoDetailsResponse } from "net/bilibili.d.ts";
|
import { VideoDetailsData, VideoDetailsResponse } from "@core/net/bilibili.d.ts";
|
||||||
import logger from "log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
|
|
||||||
export async function getVideoDetails(aid: number): Promise<VideoDetailsData | null> {
|
export async function getVideoDetails(aid: number): Promise<VideoDetailsData | null> {
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
import networkDelegate from "./delegate.ts";
|
import networkDelegate from "@core/net/delegate.ts";
|
||||||
import { VideoInfoData, VideoInfoResponse } from "net/bilibili.d.ts";
|
import { VideoInfoData, VideoInfoResponse } from "@core/net/bilibili.d.ts";
|
||||||
import logger from "log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -25,3 +25,27 @@ export async function getVideoInfo(aid: number, task: string): Promise<VideoInfo
|
|||||||
}
|
}
|
||||||
return data.data;
|
return data.data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Fetch video metadata from bilibili API by BVID
|
||||||
|
* @param {string} bvid - The video's BVID
|
||||||
|
* @param {string} task - The task name used in scheduler. It can be one of the following:
|
||||||
|
* - snapshotVideo
|
||||||
|
* - getVideoInfo
|
||||||
|
* - snapshotMilestoneVideo
|
||||||
|
* @returns {Promise<VideoInfoData | number>} VideoInfoData or the error code returned by bilibili API
|
||||||
|
* @throws {NetSchedulerError} - The error will be thrown in following cases:
|
||||||
|
* - No proxy is available currently: with error code `NO_PROXY_AVAILABLE`
|
||||||
|
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
|
||||||
|
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
|
||||||
|
*/
|
||||||
|
export async function getVideoInfoByBV(bvid: string, task: string): Promise<VideoInfoData | number> {
|
||||||
|
const url = `https://api.bilibili.com/x/web-interface/view?bvid=${bvid}`;
|
||||||
|
const data = await networkDelegate.request<VideoInfoResponse>(url, task);
|
||||||
|
const errMessage = `Error fetching metadata for ${bvid}:`;
|
||||||
|
if (data.code !== 0) {
|
||||||
|
logger.error(errMessage + data.code + "-" + data.message, "net", "fn:getVideoInfoByBV");
|
||||||
|
return data.code;
|
||||||
|
}
|
||||||
|
return data.data;
|
||||||
|
}
|
@ -1,5 +1,5 @@
|
|||||||
import { ConnectionOptions, Job, Worker } from "bullmq";
|
import { ConnectionOptions, Job, Worker } from "bullmq";
|
||||||
import { redis } from "db/redis.ts";
|
import { redis } from "../../core/db/redis.ts";
|
||||||
import logger from "log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
import { classifyVideosWorker, classifyVideoWorker } from "mq/exec/classifyVideo.ts";
|
import { classifyVideosWorker, classifyVideoWorker } from "mq/exec/classifyVideo.ts";
|
||||||
import { WorkerError } from "mq/schema.ts";
|
import { WorkerError } from "mq/schema.ts";
|
||||||
|
@ -1,11 +1,12 @@
|
|||||||
import { ConnectionOptions, Job, Worker } from "bullmq";
|
import { ConnectionOptions, Job, Worker } from "bullmq";
|
||||||
import { collectSongsWorker, getLatestVideosWorker } from "mq/executors.ts";
|
import { collectSongsWorker, getLatestVideosWorker } from "mq/executors.ts";
|
||||||
import { redis } from "db/redis.ts";
|
import { redis } from "@core/db/redis.ts";
|
||||||
import logger from "log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
import { lockManager } from "mq/lockManager.ts";
|
import { lockManager } from "mq/lockManager.ts";
|
||||||
import { WorkerError } from "mq/schema.ts";
|
import { WorkerError } from "mq/schema.ts";
|
||||||
import { getVideoInfoWorker } from "mq/exec/getLatestVideos.ts";
|
import { getVideoInfoWorker } from "mq/exec/getLatestVideos.ts";
|
||||||
import {
|
import {
|
||||||
|
archiveSnapshotsWorker,
|
||||||
bulkSnapshotTickWorker,
|
bulkSnapshotTickWorker,
|
||||||
collectMilestoneSnapshotsWorker,
|
collectMilestoneSnapshotsWorker,
|
||||||
regularSnapshotsWorker,
|
regularSnapshotsWorker,
|
||||||
@ -15,8 +16,21 @@ import {
|
|||||||
takeSnapshotForVideoWorker,
|
takeSnapshotForVideoWorker,
|
||||||
} from "mq/exec/snapshotTick.ts";
|
} from "mq/exec/snapshotTick.ts";
|
||||||
|
|
||||||
|
const releaseLockForJob = async (name: string) => {
|
||||||
|
await lockManager.releaseLock(name);
|
||||||
|
logger.log(`Released lock: ${name}`, "mq");
|
||||||
|
}
|
||||||
|
|
||||||
|
const releaseAllLocks = async () => {
|
||||||
|
const locks = ["dispatchRegularSnapshots", "dispatchArchiveSnapshots", "getLatestVideos"];
|
||||||
|
for (const lock of locks) {
|
||||||
|
await releaseLockForJob(lock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Deno.addSignalListener("SIGINT", async () => {
|
Deno.addSignalListener("SIGINT", async () => {
|
||||||
logger.log("SIGINT Received: Shutting down workers...", "mq");
|
logger.log("SIGINT Received: Shutting down workers...", "mq");
|
||||||
|
await releaseAllLocks();
|
||||||
await latestVideoWorker.close(true);
|
await latestVideoWorker.close(true);
|
||||||
await snapshotWorker.close(true);
|
await snapshotWorker.close(true);
|
||||||
Deno.exit();
|
Deno.exit();
|
||||||
@ -24,6 +38,7 @@ Deno.addSignalListener("SIGINT", async () => {
|
|||||||
|
|
||||||
Deno.addSignalListener("SIGTERM", async () => {
|
Deno.addSignalListener("SIGTERM", async () => {
|
||||||
logger.log("SIGTERM Received: Shutting down workers...", "mq");
|
logger.log("SIGTERM Received: Shutting down workers...", "mq");
|
||||||
|
await releaseAllLocks();
|
||||||
await latestVideoWorker.close(true);
|
await latestVideoWorker.close(true);
|
||||||
await snapshotWorker.close(true);
|
await snapshotWorker.close(true);
|
||||||
Deno.exit();
|
Deno.exit();
|
||||||
@ -34,14 +49,11 @@ const latestVideoWorker = new Worker(
|
|||||||
async (job: Job) => {
|
async (job: Job) => {
|
||||||
switch (job.name) {
|
switch (job.name) {
|
||||||
case "getLatestVideos":
|
case "getLatestVideos":
|
||||||
await getLatestVideosWorker(job);
|
return await getLatestVideosWorker(job);
|
||||||
break;
|
|
||||||
case "getVideoInfo":
|
case "getVideoInfo":
|
||||||
await getVideoInfoWorker(job);
|
return await getVideoInfoWorker(job);
|
||||||
break;
|
|
||||||
case "collectSongs":
|
case "collectSongs":
|
||||||
await collectSongsWorker(job);
|
return await collectSongsWorker(job);
|
||||||
break;
|
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -63,35 +75,26 @@ latestVideoWorker.on("error", (err) => {
|
|||||||
logger.error(e.rawError, e.service, e.codePath);
|
logger.error(e.rawError, e.service, e.codePath);
|
||||||
});
|
});
|
||||||
|
|
||||||
latestVideoWorker.on("closed", async () => {
|
|
||||||
await lockManager.releaseLock("getLatestVideos");
|
|
||||||
});
|
|
||||||
|
|
||||||
const snapshotWorker = new Worker(
|
const snapshotWorker = new Worker(
|
||||||
"snapshot",
|
"snapshot",
|
||||||
async (job: Job) => {
|
async (job: Job) => {
|
||||||
switch (job.name) {
|
switch (job.name) {
|
||||||
case "snapshotVideo":
|
case "snapshotVideo":
|
||||||
await takeSnapshotForVideoWorker(job);
|
return await takeSnapshotForVideoWorker(job);
|
||||||
break;
|
|
||||||
case "snapshotTick":
|
case "snapshotTick":
|
||||||
await snapshotTickWorker(job);
|
return await snapshotTickWorker(job);
|
||||||
break;
|
|
||||||
case "collectMilestoneSnapshots":
|
case "collectMilestoneSnapshots":
|
||||||
await collectMilestoneSnapshotsWorker(job);
|
return await collectMilestoneSnapshotsWorker(job);
|
||||||
break;
|
|
||||||
case "dispatchRegularSnapshots":
|
case "dispatchRegularSnapshots":
|
||||||
await regularSnapshotsWorker(job);
|
return await regularSnapshotsWorker(job);
|
||||||
break;
|
|
||||||
case "scheduleCleanup":
|
case "scheduleCleanup":
|
||||||
await scheduleCleanupWorker(job);
|
return await scheduleCleanupWorker(job);
|
||||||
break;
|
|
||||||
case "bulkSnapshotVideo":
|
case "bulkSnapshotVideo":
|
||||||
await takeBulkSnapshotForVideosWorker(job);
|
return await takeBulkSnapshotForVideosWorker(job);
|
||||||
break;
|
|
||||||
case "bulkSnapshotTick":
|
case "bulkSnapshotTick":
|
||||||
await bulkSnapshotTickWorker(job);
|
return await bulkSnapshotTickWorker(job);
|
||||||
break;
|
case "dispatchArchiveSnapshots":
|
||||||
|
return await archiveSnapshotsWorker(job);
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -103,7 +106,3 @@ snapshotWorker.on("error", (err) => {
|
|||||||
const e = err as WorkerError;
|
const e = err as WorkerError;
|
||||||
logger.error(e.rawError, e.service, e.codePath);
|
logger.error(e.rawError, e.service, e.codePath);
|
||||||
});
|
});
|
||||||
|
|
||||||
snapshotWorker.on("closed", async () => {
|
|
||||||
await lockManager.releaseLock("dispatchRegularSnapshots");
|
|
||||||
});
|
|
||||||
|
1012
packages/frontend/bun.lock
Normal file
1012
packages/frontend/bun.lock
Normal file
File diff suppressed because it is too large
Load Diff
@ -9,12 +9,15 @@
|
|||||||
"astro": "astro"
|
"astro": "astro"
|
||||||
},
|
},
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
|
"@astrojs/node": "^9.1.3",
|
||||||
|
"@astrojs/svelte": "^7.0.9",
|
||||||
"@astrojs/tailwind": "^6.0.2",
|
"@astrojs/tailwind": "^6.0.2",
|
||||||
"argon2id": "^1.0.1",
|
"argon2id": "^1.0.1",
|
||||||
"astro": "^5.5.5",
|
"astro": "^5.5.5",
|
||||||
"autoprefixer": "^10.4.21",
|
"autoprefixer": "^10.4.21",
|
||||||
"pg": "^8.11.11",
|
"pg": "^8.11.11",
|
||||||
"postcss": "^8.5.3",
|
"postcss": "^8.5.3",
|
||||||
|
"svelte": "^5.25.7",
|
||||||
"tailwindcss": "^3.0.24",
|
"tailwindcss": "^3.0.24",
|
||||||
"vite-tsconfig-paths": "^5.1.4"
|
"vite-tsconfig-paths": "^5.1.4"
|
||||||
},
|
},
|
||||||
|
Loading…
Reference in New Issue
Block a user