Compare commits
13 Commits
0614067278
...
7689e687ff
Author | SHA1 | Date | |
---|---|---|---|
7689e687ff | |||
651eef0b9e | |||
68bd46fd8a | |||
13ea8fec8b | |||
3d9e98c949 | |||
c7dd1cfc2e | |||
e0a19499e1 | |||
0930bbe6f4 | |||
054d28e796 | |||
b080c51c3e | |||
a9582722f4 | |||
4ee4d2ede9 | |||
f21ff45dd3 |
@ -1,6 +1,6 @@
|
|||||||
<?xml version="1.0" encoding="UTF-8"?>
|
<?xml version="1.0" encoding="UTF-8"?>
|
||||||
<project version="4">
|
<project version="4">
|
||||||
<component name="DenoSettings">
|
<component name="DenoSettings">
|
||||||
<option name="useDenoValue" value="ENABLE" />
|
<option name="denoInit" value="{ "enable": true, "lint": true, "unstable": true, "importMap": "import_map.json", "config": "deno.json", "fmt": { "useTabs": true, "lineWidth": 120, "indentWidth": 4, "semiColons": true, "proseWrap": "always" } }" />
|
||||||
</component>
|
</component>
|
||||||
</project>
|
</project>
|
@ -3,33 +3,33 @@
|
|||||||
// For a full list of overridable settings, and general information on folder-specific settings,
|
// For a full list of overridable settings, and general information on folder-specific settings,
|
||||||
// see the documentation: https://zed.dev/docs/configuring-zed#settings-files
|
// see the documentation: https://zed.dev/docs/configuring-zed#settings-files
|
||||||
{
|
{
|
||||||
"lsp": {
|
"lsp": {
|
||||||
"deno": {
|
"deno": {
|
||||||
"settings": {
|
"settings": {
|
||||||
"deno": {
|
"deno": {
|
||||||
"enable": true
|
"enable": true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"languages": {
|
"languages": {
|
||||||
"TypeScript": {
|
"TypeScript": {
|
||||||
"language_servers": [
|
"language_servers": [
|
||||||
"deno",
|
"deno",
|
||||||
"!typescript-language-server",
|
"!typescript-language-server",
|
||||||
"!vtsls",
|
"!vtsls",
|
||||||
"!eslint"
|
"!eslint"
|
||||||
],
|
],
|
||||||
"formatter": "language_server"
|
"formatter": "language_server"
|
||||||
},
|
},
|
||||||
"TSX": {
|
"TSX": {
|
||||||
"language_servers": [
|
"language_servers": [
|
||||||
"deno",
|
"deno",
|
||||||
"!typescript-language-server",
|
"!typescript-language-server",
|
||||||
"!vtsls",
|
"!vtsls",
|
||||||
"!eslint"
|
"!eslint"
|
||||||
],
|
],
|
||||||
"formatter": "language_server"
|
"formatter": "language_server"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -10,7 +10,6 @@ import type { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
|||||||
import type { BlankEnv, BlankInput } from "hono/types";
|
import type { BlankEnv, BlankInput } from "hono/types";
|
||||||
import type { VideoInfoData } from "@core/net/bilibili.d.ts";
|
import type { VideoInfoData } from "@core/net/bilibili.d.ts";
|
||||||
|
|
||||||
|
|
||||||
const redis = new Redis({ maxRetriesPerRequest: null });
|
const redis = new Redis({ maxRetriesPerRequest: null });
|
||||||
const CACHE_EXPIRATION_SECONDS = 60;
|
const CACHE_EXPIRATION_SECONDS = 60;
|
||||||
|
|
||||||
@ -39,7 +38,6 @@ async function insertVideoSnapshot(client: Client, data: VideoInfoData) {
|
|||||||
logger.log(`Inserted into snapshot for video ${aid} by videoInfo API.`, "api", "fn:insertVideoSnapshot");
|
logger.log(`Inserted into snapshot for video ${aid} by videoInfo API.`, "api", "fn:insertVideoSnapshot");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
export const videoInfoHandler = createHandlers(async (c: ContextType) => {
|
export const videoInfoHandler = createHandlers(async (c: ContextType) => {
|
||||||
const client = c.get("db");
|
const client = c.get("db");
|
||||||
try {
|
try {
|
||||||
@ -83,4 +81,4 @@ export const videoInfoHandler = createHandlers(async (c: ContextType) => {
|
|||||||
return c.json({ message: "Unhandled error", error: e }, 500);
|
return c.json({ message: "Unhandled error", error: e }, 500);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -323,4 +323,3 @@ export async function getAllVideosWithoutActiveSnapshotSchedule(client: Client)
|
|||||||
const res = await client.queryObject<{ aid: number }>(query, []);
|
const res = await client.queryObject<{ aid: number }>(query, []);
|
||||||
return res.rows.map((r) => Number(r.aid));
|
return res.rows.map((r) => Number(r.aid));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -11,7 +11,7 @@ import { db } from "db/init.ts";
|
|||||||
*/
|
*/
|
||||||
export async function withDbConnection<T>(
|
export async function withDbConnection<T>(
|
||||||
operation: (client: Client) => Promise<T>,
|
operation: (client: Client) => Promise<T>,
|
||||||
errorHandling?: (error: unknown) => void,
|
errorHandling?: (error: unknown, client: Client) => void,
|
||||||
cleanup?: () => void,
|
cleanup?: () => void,
|
||||||
): Promise<T | undefined> {
|
): Promise<T | undefined> {
|
||||||
const client = await db.connect();
|
const client = await db.connect();
|
||||||
@ -19,7 +19,7 @@ export async function withDbConnection<T>(
|
|||||||
return await operation(client);
|
return await operation(client);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
if (errorHandling) {
|
if (errorHandling) {
|
||||||
errorHandling(error);
|
errorHandling(error, client);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
throw error;
|
throw error;
|
||||||
|
@ -23,6 +23,7 @@
|
|||||||
"imports": {
|
"imports": {
|
||||||
"@std/assert": "jsr:@std/assert@1",
|
"@std/assert": "jsr:@std/assert@1",
|
||||||
"$std/": "https://deno.land/std@0.216.0/",
|
"$std/": "https://deno.land/std@0.216.0/",
|
||||||
|
"@std/datetime": "jsr:@std/datetime@^0.225.4",
|
||||||
"@huggingface/transformers": "npm:@huggingface/transformers@3.0.0",
|
"@huggingface/transformers": "npm:@huggingface/transformers@3.0.0",
|
||||||
"bullmq": "npm:bullmq",
|
"bullmq": "npm:bullmq",
|
||||||
"mq/": "./mq/",
|
"mq/": "./mq/",
|
||||||
|
@ -4,4 +4,4 @@
|
|||||||
// SO HERE'S A PLACHOLDER EXPORT FOR DENO:
|
// SO HERE'S A PLACHOLDER EXPORT FOR DENO:
|
||||||
export const DENO = "FUCK YOU DENO";
|
export const DENO = "FUCK YOU DENO";
|
||||||
// Oh, maybe export the version is a good idea
|
// Oh, maybe export the version is a good idea
|
||||||
export const VERSION = "1.0.18";
|
export const VERSION = "1.0.26";
|
||||||
|
40
packages/crawler/mq/exec/archiveSnapshots.ts
Normal file
40
packages/crawler/mq/exec/archiveSnapshots.ts
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
import { Job } from "npm:bullmq@5.45.2";
|
||||||
|
import { getAllVideosWithoutActiveSnapshotSchedule, scheduleSnapshot } from "db/snapshotSchedule.ts";
|
||||||
|
import { withDbConnection } from "db/withConnection.ts";
|
||||||
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
|
import logger from "log/logger.ts";
|
||||||
|
import { lockManager } from "mq/lockManager.ts";
|
||||||
|
import { getLatestVideoSnapshot } from "db/snapshot.ts";
|
||||||
|
import { HOUR, MINUTE } from "$std/datetime/constants.ts";
|
||||||
|
|
||||||
|
export const archiveSnapshotsWorker = async (_job: Job) =>
|
||||||
|
await withDbConnection<void>(async (client: Client) => {
|
||||||
|
const startedAt = Date.now();
|
||||||
|
if (await lockManager.isLocked("dispatchArchiveSnapshots")) {
|
||||||
|
logger.log("dispatchArchiveSnapshots is already running", "mq");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
await lockManager.acquireLock("dispatchArchiveSnapshots", 30 * 60);
|
||||||
|
const aids = await getAllVideosWithoutActiveSnapshotSchedule(client);
|
||||||
|
for (const rawAid of aids) {
|
||||||
|
const aid = Number(rawAid);
|
||||||
|
const latestSnapshot = await getLatestVideoSnapshot(client, aid);
|
||||||
|
const now = Date.now();
|
||||||
|
const lastSnapshotedAt = latestSnapshot?.time ?? now;
|
||||||
|
const interval = 168;
|
||||||
|
logger.log(
|
||||||
|
`Scheduled archive snapshot for aid ${aid} in ${interval} hours.`,
|
||||||
|
"mq",
|
||||||
|
"fn:archiveSnapshotsWorker",
|
||||||
|
);
|
||||||
|
const targetTime = lastSnapshotedAt + interval * HOUR;
|
||||||
|
await scheduleSnapshot(client, aid, "archive", targetTime);
|
||||||
|
if (now - startedAt > 250 * MINUTE) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, (e) => {
|
||||||
|
logger.error(e as Error, "mq", "fn:archiveSnapshotsWorker");
|
||||||
|
}, async () => {
|
||||||
|
await lockManager.releaseLock("dispatchArchiveSnapshots");
|
||||||
|
});
|
@ -8,7 +8,7 @@ import { lockManager } from "mq/lockManager.ts";
|
|||||||
import { aidExistsInSongs } from "db/songs.ts";
|
import { aidExistsInSongs } from "db/songs.ts";
|
||||||
import { insertIntoSongs } from "mq/task/collectSongs.ts";
|
import { insertIntoSongs } from "mq/task/collectSongs.ts";
|
||||||
import { scheduleSnapshot } from "db/snapshotSchedule.ts";
|
import { scheduleSnapshot } from "db/snapshotSchedule.ts";
|
||||||
import { MINUTE } from "$std/datetime/constants.ts";
|
import { MINUTE } from "@std/datetime";
|
||||||
|
|
||||||
export const classifyVideoWorker = async (job: Job) => {
|
export const classifyVideoWorker = async (job: Job) => {
|
||||||
const client = await db.connect();
|
const client = await db.connect();
|
||||||
|
9
packages/crawler/mq/exec/collectSongs.ts
Normal file
9
packages/crawler/mq/exec/collectSongs.ts
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
import { Job } from "npm:bullmq@5.45.2";
|
||||||
|
import { collectSongs } from "mq/task/collectSongs.ts";
|
||||||
|
import { withDbConnection } from "db/withConnection.ts";
|
||||||
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
|
|
||||||
|
export const collectSongsWorker = (_job: Job): Promise<void> =>
|
||||||
|
withDbConnection(async (client: Client) => {
|
||||||
|
await collectSongs(client);
|
||||||
|
});
|
29
packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts
Normal file
29
packages/crawler/mq/exec/dispatchMilestoneSnapshots.ts
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
import { Job } from "npm:bullmq@5.45.2";
|
||||||
|
import { withDbConnection } from "db/withConnection.ts";
|
||||||
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
|
import { getVideosNearMilestone } from "db/snapshot.ts";
|
||||||
|
import { getAdjustedShortTermETA } from "mq/scheduling.ts";
|
||||||
|
import { truncate } from "utils/truncate.ts";
|
||||||
|
import { scheduleSnapshot } from "db/snapshotSchedule.ts";
|
||||||
|
import logger from "log/logger.ts";
|
||||||
|
import { HOUR, MINUTE, SECOND } from "@std/datetime";
|
||||||
|
|
||||||
|
export const dispatchMilestoneSnapshotsWorker = (_job: Job): Promise<void> =>
|
||||||
|
withDbConnection(async (client: Client) => {
|
||||||
|
const videos = await getVideosNearMilestone(client);
|
||||||
|
for (const video of videos) {
|
||||||
|
const aid = Number(video.aid);
|
||||||
|
const eta = await getAdjustedShortTermETA(client, aid);
|
||||||
|
if (eta > 144) continue;
|
||||||
|
const now = Date.now();
|
||||||
|
const scheduledNextSnapshotDelay = eta * HOUR;
|
||||||
|
const maxInterval = 4 * HOUR;
|
||||||
|
const minInterval = 1 * SECOND;
|
||||||
|
const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval);
|
||||||
|
const targetTime = now + delay;
|
||||||
|
await scheduleSnapshot(client, aid, "milestone", targetTime);
|
||||||
|
logger.log(`Scheduled milestone snapshot for aid ${aid} in ${(delay / MINUTE).toFixed(2)} mins.`, "mq");
|
||||||
|
}
|
||||||
|
}, (e) => {
|
||||||
|
logger.error(e as Error, "mq", "fn:dispatchMilestoneSnapshotsWorker");
|
||||||
|
});
|
39
packages/crawler/mq/exec/dispatchRegularSnapshots.ts
Normal file
39
packages/crawler/mq/exec/dispatchRegularSnapshots.ts
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
import { Job } from "npm:bullmq@5.45.2";
|
||||||
|
import { withDbConnection } from "db/withConnection.ts";
|
||||||
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
|
import { getLatestVideoSnapshot } from "db/snapshot.ts";
|
||||||
|
import { truncate } from "utils/truncate.ts";
|
||||||
|
import { getVideosWithoutActiveSnapshotSchedule, scheduleSnapshot } from "db/snapshotSchedule.ts";
|
||||||
|
import logger from "log/logger.ts";
|
||||||
|
import { HOUR, MINUTE, WEEK } from "@std/datetime";
|
||||||
|
import { lockManager } from "../lockManager.ts";
|
||||||
|
import { getRegularSnapshotInterval } from "../task/regularSnapshotInterval.ts";
|
||||||
|
|
||||||
|
export const dispatchRegularSnapshotsWorker = async (_job: Job): Promise<void> =>
|
||||||
|
await withDbConnection(async (client: Client) => {
|
||||||
|
const startedAt = Date.now();
|
||||||
|
if (await lockManager.isLocked("dispatchRegularSnapshots")) {
|
||||||
|
logger.log("dispatchRegularSnapshots is already running", "mq");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
await lockManager.acquireLock("dispatchRegularSnapshots", 30 * 60);
|
||||||
|
|
||||||
|
const aids = await getVideosWithoutActiveSnapshotSchedule(client);
|
||||||
|
for (const rawAid of aids) {
|
||||||
|
const aid = Number(rawAid);
|
||||||
|
const latestSnapshot = await getLatestVideoSnapshot(client, aid);
|
||||||
|
const now = Date.now();
|
||||||
|
const lastSnapshotedAt = latestSnapshot?.time ?? now;
|
||||||
|
const interval = await getRegularSnapshotInterval(client, aid);
|
||||||
|
logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq");
|
||||||
|
const targetTime = truncate(lastSnapshotedAt + interval * HOUR, now + 1, now + 100000 * WEEK);
|
||||||
|
await scheduleSnapshot(client, aid, "normal", targetTime);
|
||||||
|
if (now - startedAt > 25 * MINUTE) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, (e) => {
|
||||||
|
logger.error(e as Error, "mq", "fn:regularSnapshotsWorker");
|
||||||
|
}, async () => {
|
||||||
|
await lockManager.releaseLock("dispatchRegularSnapshots");
|
||||||
|
});
|
10
packages/crawler/mq/exec/executors.ts
Normal file
10
packages/crawler/mq/exec/executors.ts
Normal file
@ -0,0 +1,10 @@
|
|||||||
|
export * from "mq/exec/getLatestVideos.ts";
|
||||||
|
export * from "./getVideoInfo.ts";
|
||||||
|
export * from "./collectSongs.ts";
|
||||||
|
export * from "./takeBulkSnapshot.ts";
|
||||||
|
export * from "./archiveSnapshots.ts";
|
||||||
|
export * from "./dispatchMilestoneSnapshots.ts";
|
||||||
|
export * from "./dispatchRegularSnapshots.ts";
|
||||||
|
export * from "./snapshotVideo.ts";
|
||||||
|
export * from "./scheduleCleanup.ts";
|
||||||
|
export * from "./snapshotTick.ts";
|
@ -1,37 +1,9 @@
|
|||||||
import { Job } from "bullmq";
|
import { Job } from "bullmq";
|
||||||
import { queueLatestVideos } from "mq/task/queueLatestVideo.ts";
|
import { queueLatestVideos } from "mq/task/queueLatestVideo.ts";
|
||||||
import { db } from "db/init.ts";
|
import { withDbConnection } from "db/withConnection.ts";
|
||||||
import { insertVideoInfo } from "mq/task/getVideoDetails.ts";
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
import { collectSongs } from "mq/task/collectSongs.ts";
|
|
||||||
|
|
||||||
export const getLatestVideosWorker = async (_job: Job): Promise<void> => {
|
export const getLatestVideosWorker = (_job: Job): Promise<void> =>
|
||||||
const client = await db.connect();
|
withDbConnection(async (client: Client) => {
|
||||||
try {
|
|
||||||
await queueLatestVideos(client);
|
await queueLatestVideos(client);
|
||||||
} finally {
|
});
|
||||||
client.release();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
export const collectSongsWorker = async (_job: Job): Promise<void> => {
|
|
||||||
const client = await db.connect();
|
|
||||||
try {
|
|
||||||
await collectSongs(client);
|
|
||||||
} finally {
|
|
||||||
client.release();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
export const getVideoInfoWorker = async (job: Job): Promise<number> => {
|
|
||||||
const client = await db.connect();
|
|
||||||
try {
|
|
||||||
const aid = job.data.aid;
|
|
||||||
if (!aid) {
|
|
||||||
return 3;
|
|
||||||
}
|
|
||||||
await insertVideoInfo(client, aid);
|
|
||||||
return 0;
|
|
||||||
} finally {
|
|
||||||
client.release();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
15
packages/crawler/mq/exec/getVideoInfo.ts
Normal file
15
packages/crawler/mq/exec/getVideoInfo.ts
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
import { Job } from "npm:bullmq@5.45.2";
|
||||||
|
import { insertVideoInfo } from "mq/task/getVideoDetails.ts";
|
||||||
|
import { withDbConnection } from "db/withConnection.ts";
|
||||||
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
|
import logger from "log/logger.ts";
|
||||||
|
|
||||||
|
export const getVideoInfoWorker = async (job: Job): Promise<void> =>
|
||||||
|
await withDbConnection<void>(async (client: Client) => {
|
||||||
|
const aid = job.data.aid;
|
||||||
|
if (!aid) {
|
||||||
|
logger.warn("aid does not exists", "mq", "job:getVideoInfo");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
await insertVideoInfo(client, aid);
|
||||||
|
});
|
45
packages/crawler/mq/exec/scheduleCleanup.ts
Normal file
45
packages/crawler/mq/exec/scheduleCleanup.ts
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
import { Job } from "npm:bullmq@5.45.2";
|
||||||
|
import { withDbConnection } from "db/withConnection.ts";
|
||||||
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
|
import logger from "log/logger.ts";
|
||||||
|
import { scheduleSnapshot, setSnapshotStatus } from "db/snapshotSchedule.ts";
|
||||||
|
import { SECOND } from "@std/datetime";
|
||||||
|
import { getTimeoutSchedulesCount } from "mq/task/getTimeoutSchedulesCount.ts";
|
||||||
|
import { removeAllTimeoutSchedules } from "mq/task/removeAllTimeoutSchedules.ts";
|
||||||
|
|
||||||
|
export const scheduleCleanupWorker = async (_job: Job): Promise<void> =>
|
||||||
|
await withDbConnection<void>(async (client: Client) => {
|
||||||
|
if (await getTimeoutSchedulesCount(client) > 2000) {
|
||||||
|
await removeAllTimeoutSchedules(client);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const query: string = `
|
||||||
|
SELECT id, aid, type
|
||||||
|
FROM snapshot_schedule
|
||||||
|
WHERE status IN ('pending', 'processing')
|
||||||
|
AND started_at < NOW() - INTERVAL '30 minutes'
|
||||||
|
UNION
|
||||||
|
SELECT id, aid, type
|
||||||
|
FROM snapshot_schedule
|
||||||
|
WHERE status IN ('pending', 'processing')
|
||||||
|
AND started_at < NOW() - INTERVAL '2 minutes'
|
||||||
|
AND type = 'milestone'
|
||||||
|
`;
|
||||||
|
const { rows } = await client.queryObject<{ id: bigint; aid: bigint; type: string }>(query);
|
||||||
|
if (rows.length === 0) return;
|
||||||
|
for (const row of rows) {
|
||||||
|
const id = Number(row.id);
|
||||||
|
const aid = Number(row.aid);
|
||||||
|
const type = row.type;
|
||||||
|
await setSnapshotStatus(client, id, "timeout");
|
||||||
|
await scheduleSnapshot(client, aid, type, Date.now() + 10 * SECOND);
|
||||||
|
logger.log(
|
||||||
|
`Schedule ${id} has not received any response in a while, rescheduled.`,
|
||||||
|
"mq",
|
||||||
|
"fn:scheduleCleanupWorker",
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}, (e) => {
|
||||||
|
logger.error(e as Error, "mq", "fn:scheduleCleanupWorker");
|
||||||
|
});
|
@ -1,46 +1,21 @@
|
|||||||
import { Job } from "bullmq";
|
import { Job } from "bullmq";
|
||||||
import { db } from "db/init.ts";
|
import { db } from "db/init.ts";
|
||||||
import { getLatestVideoSnapshot, getVideosNearMilestone } from "db/snapshot.ts";
|
|
||||||
import {
|
import {
|
||||||
bulkGetVideosWithoutProcessingSchedules,
|
bulkGetVideosWithoutProcessingSchedules,
|
||||||
bulkScheduleSnapshot,
|
|
||||||
bulkSetSnapshotStatus,
|
bulkSetSnapshotStatus,
|
||||||
findClosestSnapshot,
|
|
||||||
findSnapshotBefore, getAllVideosWithoutActiveSnapshotSchedule,
|
|
||||||
getBulkSnapshotsInNextSecond,
|
getBulkSnapshotsInNextSecond,
|
||||||
getLatestSnapshot,
|
|
||||||
getSnapshotsInNextSecond,
|
getSnapshotsInNextSecond,
|
||||||
getVideosWithoutActiveSnapshotSchedule,
|
|
||||||
scheduleSnapshot,
|
|
||||||
setSnapshotStatus,
|
setSnapshotStatus,
|
||||||
snapshotScheduleExists,
|
|
||||||
videoHasProcessingSchedule,
|
videoHasProcessingSchedule,
|
||||||
} from "db/snapshotSchedule.ts";
|
} from "db/snapshotSchedule.ts";
|
||||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
|
||||||
import { HOUR, MINUTE, SECOND, WEEK } from "$std/datetime/constants.ts";
|
|
||||||
import logger from "log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
import { SnapshotQueue } from "mq/index.ts";
|
import { SnapshotQueue } from "mq/index.ts";
|
||||||
import { insertVideoSnapshot } from "mq/task/getVideoStats.ts";
|
|
||||||
import { NetSchedulerError } from "@core/net/delegate.ts";
|
|
||||||
import { getBiliVideoStatus, setBiliVideoStatus } from "db/allData.ts";
|
|
||||||
import { truncate } from "utils/truncate.ts";
|
|
||||||
import { lockManager } from "mq/lockManager.ts";
|
|
||||||
import { getSongsPublihsedAt } from "db/songs.ts";
|
|
||||||
import { bulkGetVideoStats } from "net/bulkGetVideoStats.ts";
|
|
||||||
import { getAdjustedShortTermETA } from "../scheduling.ts";
|
|
||||||
import {SnapshotScheduleType} from "@core/db/schema";
|
|
||||||
|
|
||||||
const priorityMap: { [key: string]: number } = {
|
const priorityMap: { [key: string]: number } = {
|
||||||
"milestone": 1,
|
"milestone": 1,
|
||||||
"normal": 3,
|
"normal": 3,
|
||||||
};
|
};
|
||||||
|
|
||||||
const snapshotTypeToTaskMap: { [key: string]: string } = {
|
|
||||||
"milestone": "snapshotMilestoneVideo",
|
|
||||||
"normal": "snapshotVideo",
|
|
||||||
"new": "snapshotMilestoneVideo",
|
|
||||||
};
|
|
||||||
|
|
||||||
export const bulkSnapshotTickWorker = async (_job: Job) => {
|
export const bulkSnapshotTickWorker = async (_job: Job) => {
|
||||||
const client = await db.connect();
|
const client = await db.connect();
|
||||||
try {
|
try {
|
||||||
@ -61,14 +36,14 @@ export const bulkSnapshotTickWorker = async (_job: Job) => {
|
|||||||
created_at: schedule.created_at,
|
created_at: schedule.created_at,
|
||||||
started_at: schedule.started_at,
|
started_at: schedule.started_at,
|
||||||
finished_at: schedule.finished_at,
|
finished_at: schedule.finished_at,
|
||||||
status: schedule.status
|
status: schedule.status,
|
||||||
}
|
};
|
||||||
})
|
});
|
||||||
await SnapshotQueue.add("bulkSnapshotVideo", {
|
await SnapshotQueue.add("bulkSnapshotVideo", {
|
||||||
schedules: schedulesData,
|
schedules: schedulesData,
|
||||||
}, { priority: 3 });
|
}, { priority: 3 });
|
||||||
}
|
}
|
||||||
return `OK`
|
return `OK`;
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
logger.error(e as Error);
|
logger.error(e as Error);
|
||||||
} finally {
|
} finally {
|
||||||
@ -109,290 +84,3 @@ export const closetMilestone = (views: number) => {
|
|||||||
if (views < 1000000) return 1000000;
|
if (views < 1000000) return 1000000;
|
||||||
return 10000000;
|
return 10000000;
|
||||||
};
|
};
|
||||||
|
|
||||||
export const collectMilestoneSnapshotsWorker = async (_job: Job) => {
|
|
||||||
const client = await db.connect();
|
|
||||||
try {
|
|
||||||
const videos = await getVideosNearMilestone(client);
|
|
||||||
for (const video of videos) {
|
|
||||||
const aid = Number(video.aid);
|
|
||||||
const eta = await getAdjustedShortTermETA(client, aid);
|
|
||||||
if (eta > 144) continue;
|
|
||||||
const now = Date.now();
|
|
||||||
const scheduledNextSnapshotDelay = eta * HOUR;
|
|
||||||
const maxInterval = 4 * HOUR;
|
|
||||||
const minInterval = 1 * SECOND;
|
|
||||||
const delay = truncate(scheduledNextSnapshotDelay, minInterval, maxInterval);
|
|
||||||
const targetTime = now + delay;
|
|
||||||
await scheduleSnapshot(client, aid, "milestone", targetTime);
|
|
||||||
logger.log(`Scheduled milestone snapshot for aid ${aid} in ${(delay / MINUTE).toFixed(2)} mins.`, "mq");
|
|
||||||
}
|
|
||||||
} catch (e) {
|
|
||||||
logger.error(e as Error, "mq", "fn:collectMilestoneSnapshotsWorker");
|
|
||||||
} finally {
|
|
||||||
client.release();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
const getRegularSnapshotInterval = async (client: Client, aid: number) => {
|
|
||||||
const now = Date.now();
|
|
||||||
const date = new Date(now - 24 * HOUR);
|
|
||||||
let oldSnapshot = await findSnapshotBefore(client, aid, date);
|
|
||||||
if (!oldSnapshot) oldSnapshot = await findClosestSnapshot(client, aid, date);
|
|
||||||
const latestSnapshot = await getLatestSnapshot(client, aid);
|
|
||||||
if (!oldSnapshot || !latestSnapshot) return 0;
|
|
||||||
if (oldSnapshot.created_at === latestSnapshot.created_at) return 0;
|
|
||||||
const hoursDiff = (latestSnapshot.created_at - oldSnapshot.created_at) / HOUR;
|
|
||||||
if (hoursDiff < 8) return 24;
|
|
||||||
const viewsDiff = latestSnapshot.views - oldSnapshot.views;
|
|
||||||
if (viewsDiff === 0) return 72;
|
|
||||||
const speedPerDay = viewsDiff / (hoursDiff + 0.001) * 24;
|
|
||||||
if (speedPerDay < 6) return 36;
|
|
||||||
if (speedPerDay < 120) return 24;
|
|
||||||
if (speedPerDay < 320) return 12;
|
|
||||||
return 6;
|
|
||||||
};
|
|
||||||
|
|
||||||
export const archiveSnapshotsWorker = async (_job: Job) => {
|
|
||||||
const client = await db.connect();
|
|
||||||
const startedAt = Date.now();
|
|
||||||
if (await lockManager.isLocked("dispatchArchiveSnapshots")) {
|
|
||||||
logger.log("dispatchArchiveSnapshots is already running", "mq");
|
|
||||||
client.release();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
await lockManager.acquireLock("dispatchArchiveSnapshots", 30 * 60);
|
|
||||||
try {
|
|
||||||
const aids = await getAllVideosWithoutActiveSnapshotSchedule(client);
|
|
||||||
for (const rawAid of aids) {
|
|
||||||
const aid = Number(rawAid);
|
|
||||||
const latestSnapshot = await getLatestVideoSnapshot(client, aid);
|
|
||||||
const now = Date.now();
|
|
||||||
const lastSnapshotedAt = latestSnapshot?.time ?? now;
|
|
||||||
const interval = 168;
|
|
||||||
logger.log(`Scheduled archive snapshot for aid ${aid} in ${interval} hours.`, "mq", "fn:archiveSnapshotsWorker");
|
|
||||||
const targetTime = lastSnapshotedAt + interval * HOUR;
|
|
||||||
await scheduleSnapshot(client, aid, "archive", targetTime);
|
|
||||||
if (now - startedAt > 250 * MINUTE) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (e) {
|
|
||||||
logger.error(e as Error, "mq", "fn:archiveSnapshotsWorker");
|
|
||||||
} finally {
|
|
||||||
await lockManager.releaseLock("dispatchArchiveSnapshots");
|
|
||||||
client.release();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
export const regularSnapshotsWorker = async (_job: Job) => {
|
|
||||||
const client = await db.connect();
|
|
||||||
const startedAt = Date.now();
|
|
||||||
if (await lockManager.isLocked("dispatchRegularSnapshots")) {
|
|
||||||
logger.log("dispatchRegularSnapshots is already running", "mq");
|
|
||||||
client.release();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
await lockManager.acquireLock("dispatchRegularSnapshots", 30 * 60);
|
|
||||||
try {
|
|
||||||
const aids = await getVideosWithoutActiveSnapshotSchedule(client);
|
|
||||||
for (const rawAid of aids) {
|
|
||||||
const aid = Number(rawAid);
|
|
||||||
const latestSnapshot = await getLatestVideoSnapshot(client, aid);
|
|
||||||
const now = Date.now();
|
|
||||||
const lastSnapshotedAt = latestSnapshot?.time ?? now;
|
|
||||||
const interval = await getRegularSnapshotInterval(client, aid);
|
|
||||||
logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq");
|
|
||||||
const targetTime = truncate(lastSnapshotedAt + interval * HOUR, now + 1, now + 100000 * WEEK);
|
|
||||||
await scheduleSnapshot(client, aid, "normal", targetTime);
|
|
||||||
if (now - startedAt > 25 * MINUTE) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (e) {
|
|
||||||
logger.error(e as Error, "mq", "fn:regularSnapshotsWorker");
|
|
||||||
} finally {
|
|
||||||
await lockManager.releaseLock("dispatchRegularSnapshots");
|
|
||||||
client.release();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
export const takeBulkSnapshotForVideosWorker = async (job: Job) => {
|
|
||||||
const schedules: SnapshotScheduleType[] = job.data.schedules;
|
|
||||||
const ids = schedules.map((schedule) => Number(schedule.id));
|
|
||||||
const aidsToFetch: number[] = [];
|
|
||||||
const client = await db.connect();
|
|
||||||
try {
|
|
||||||
for (const schedule of schedules) {
|
|
||||||
const aid = Number(schedule.aid);
|
|
||||||
const id = Number(schedule.id);
|
|
||||||
const exists = await snapshotScheduleExists(client, id);
|
|
||||||
if (!exists) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
aidsToFetch.push(aid);
|
|
||||||
}
|
|
||||||
const data = await bulkGetVideoStats(aidsToFetch);
|
|
||||||
if (typeof data === "number") {
|
|
||||||
await bulkSetSnapshotStatus(client, ids, "failed");
|
|
||||||
await bulkScheduleSnapshot(client, aidsToFetch, "normal", Date.now() + 15 * SECOND);
|
|
||||||
return `GET_BILI_STATUS_${data}`;
|
|
||||||
}
|
|
||||||
for (const video of data) {
|
|
||||||
const aid = video.id;
|
|
||||||
const stat = video.cnt_info;
|
|
||||||
const views = stat.play;
|
|
||||||
const danmakus = stat.danmaku;
|
|
||||||
const replies = stat.reply;
|
|
||||||
const likes = stat.thumb_up;
|
|
||||||
const coins = stat.coin;
|
|
||||||
const shares = stat.share;
|
|
||||||
const favorites = stat.collect;
|
|
||||||
const query: string = `
|
|
||||||
INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites)
|
|
||||||
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
|
||||||
`;
|
|
||||||
await client.queryObject(
|
|
||||||
query,
|
|
||||||
[aid, views, danmakus, replies, likes, coins, shares, favorites],
|
|
||||||
);
|
|
||||||
|
|
||||||
logger.log(`Taken snapshot for video ${aid} in bulk.`, "net", "fn:takeBulkSnapshotForVideosWorker");
|
|
||||||
}
|
|
||||||
await bulkSetSnapshotStatus(client, ids, "completed");
|
|
||||||
|
|
||||||
for (const schedule of schedules) {
|
|
||||||
const aid = Number(schedule.aid);
|
|
||||||
const type = schedule.type;
|
|
||||||
if (type == 'archive') continue;
|
|
||||||
const interval = await getRegularSnapshotInterval(client, aid);
|
|
||||||
logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq");
|
|
||||||
await scheduleSnapshot(client, aid, "normal", Date.now() + interval * HOUR);
|
|
||||||
}
|
|
||||||
return `DONE`;
|
|
||||||
} catch (e) {
|
|
||||||
if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") {
|
|
||||||
logger.warn(
|
|
||||||
`No available proxy for bulk request now.`,
|
|
||||||
"mq",
|
|
||||||
"fn:takeBulkSnapshotForVideosWorker",
|
|
||||||
);
|
|
||||||
await bulkSetSnapshotStatus(client, ids, "no_proxy");
|
|
||||||
await bulkScheduleSnapshot(client, aidsToFetch, "normal", Date.now() + 20 * MINUTE * Math.random());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
logger.error(e as Error, "mq", "fn:takeBulkSnapshotForVideosWorker");
|
|
||||||
await bulkSetSnapshotStatus(client, ids, "failed");
|
|
||||||
} finally {
|
|
||||||
client.release();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
export const takeSnapshotForVideoWorker = async (job: Job) => {
|
|
||||||
const id = job.data.id;
|
|
||||||
const aid = Number(job.data.aid);
|
|
||||||
const type = job.data.type;
|
|
||||||
const task = snapshotTypeToTaskMap[type] ?? "snapshotVideo";
|
|
||||||
const client = await db.connect();
|
|
||||||
const retryInterval = type === "milestone" ? 5 * SECOND : 2 * MINUTE;
|
|
||||||
const exists = await snapshotScheduleExists(client, id);
|
|
||||||
if (!exists) {
|
|
||||||
client.release();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
const status = await getBiliVideoStatus(client, aid);
|
|
||||||
if (status !== 0) {
|
|
||||||
client.release();
|
|
||||||
return `REFUSE_WORKING_BILI_STATUS_${status}`;
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
await setSnapshotStatus(client, id, "processing");
|
|
||||||
const stat = await insertVideoSnapshot(client, aid, task);
|
|
||||||
if (typeof stat === "number") {
|
|
||||||
await setBiliVideoStatus(client, aid, stat);
|
|
||||||
await setSnapshotStatus(client, id, "completed");
|
|
||||||
return `GET_BILI_STATUS_${stat}`;
|
|
||||||
}
|
|
||||||
await setSnapshotStatus(client, id, "completed");
|
|
||||||
if (type === "normal") {
|
|
||||||
const interval = await getRegularSnapshotInterval(client, aid);
|
|
||||||
logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq");
|
|
||||||
await scheduleSnapshot(client, aid, type, Date.now() + interval * HOUR);
|
|
||||||
return `DONE`;
|
|
||||||
} else if (type === "new") {
|
|
||||||
const publihsedAt = await getSongsPublihsedAt(client, aid);
|
|
||||||
const timeSincePublished = stat.time - publihsedAt!;
|
|
||||||
const viewsPerHour = stat.views / timeSincePublished * HOUR;
|
|
||||||
if (timeSincePublished > 48 * HOUR) {
|
|
||||||
return `DONE`;
|
|
||||||
}
|
|
||||||
if (timeSincePublished > 2 * HOUR && viewsPerHour < 10) {
|
|
||||||
return `DONE`;
|
|
||||||
}
|
|
||||||
let intervalMins = 240;
|
|
||||||
if (viewsPerHour > 50) {
|
|
||||||
intervalMins = 120;
|
|
||||||
}
|
|
||||||
if (viewsPerHour > 100) {
|
|
||||||
intervalMins = 60;
|
|
||||||
}
|
|
||||||
if (viewsPerHour > 1000) {
|
|
||||||
intervalMins = 15;
|
|
||||||
}
|
|
||||||
await scheduleSnapshot(client, aid, type, Date.now() + intervalMins * MINUTE, true);
|
|
||||||
}
|
|
||||||
if (type !== "milestone") return `DONE`;
|
|
||||||
const eta = await getAdjustedShortTermETA(client, aid);
|
|
||||||
if (eta > 144) return "ETA_TOO_LONG";
|
|
||||||
const now = Date.now();
|
|
||||||
const targetTime = now + eta * HOUR;
|
|
||||||
await scheduleSnapshot(client, aid, type, targetTime);
|
|
||||||
await setSnapshotStatus(client, id, "completed");
|
|
||||||
return `DONE`;
|
|
||||||
} catch (e) {
|
|
||||||
if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") {
|
|
||||||
logger.warn(
|
|
||||||
`No available proxy for aid ${job.data.aid}.`,
|
|
||||||
"mq",
|
|
||||||
"fn:takeSnapshotForVideoWorker",
|
|
||||||
);
|
|
||||||
await setSnapshotStatus(client, id, "no_proxy");
|
|
||||||
await scheduleSnapshot(client, aid, type, Date.now() + retryInterval);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
logger.error(e as Error, "mq", "fn:takeSnapshotForVideoWorker");
|
|
||||||
await setSnapshotStatus(client, id, "failed");
|
|
||||||
} finally {
|
|
||||||
client.release();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
export const scheduleCleanupWorker = async (_job: Job) => {
|
|
||||||
const client = await db.connect();
|
|
||||||
try {
|
|
||||||
const query = `
|
|
||||||
SELECT id, aid, type
|
|
||||||
FROM snapshot_schedule
|
|
||||||
WHERE status IN ('pending', 'processing')
|
|
||||||
AND started_at < NOW() - INTERVAL '30 minutes'
|
|
||||||
`;
|
|
||||||
const { rows } = await client.queryObject<{ id: bigint; aid: bigint; type: string }>(query);
|
|
||||||
if (rows.length === 0) return;
|
|
||||||
for (const row of rows) {
|
|
||||||
const id = Number(row.id);
|
|
||||||
const aid = Number(row.aid);
|
|
||||||
const type = row.type;
|
|
||||||
await setSnapshotStatus(client, id, "timeout");
|
|
||||||
await scheduleSnapshot(client, aid, type, Date.now() + 10 * SECOND);
|
|
||||||
logger.log(
|
|
||||||
`Schedule ${id} has no response received for 5 minutes, rescheduled.`,
|
|
||||||
"mq",
|
|
||||||
"fn:scheduleCleanupWorker",
|
|
||||||
);
|
|
||||||
}
|
|
||||||
} catch (e) {
|
|
||||||
logger.error(e as Error, "mq", "fn:scheduleCleanupWorker");
|
|
||||||
} finally {
|
|
||||||
client.release();
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
107
packages/crawler/mq/exec/snapshotVideo.ts
Normal file
107
packages/crawler/mq/exec/snapshotVideo.ts
Normal file
@ -0,0 +1,107 @@
|
|||||||
|
import { Job } from "npm:bullmq@5.45.2";
|
||||||
|
import { withDbConnection } from "db/withConnection.ts";
|
||||||
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
|
import { scheduleSnapshot, setSnapshotStatus, snapshotScheduleExists } from "db/snapshotSchedule.ts";
|
||||||
|
import logger from "log/logger.ts";
|
||||||
|
import { HOUR, MINUTE, SECOND } from "@std/datetime";
|
||||||
|
import { lockManager } from "mq/lockManager.ts";
|
||||||
|
import { getBiliVideoStatus, setBiliVideoStatus } from "db/allData.ts";
|
||||||
|
import { insertVideoSnapshot } from "mq/task/getVideoStats.ts";
|
||||||
|
import { getSongsPublihsedAt } from "db/songs.ts";
|
||||||
|
import { getAdjustedShortTermETA } from "mq/scheduling.ts";
|
||||||
|
import { NetSchedulerError } from "@core/net/delegate.ts";
|
||||||
|
|
||||||
|
const snapshotTypeToTaskMap: { [key: string]: string } = {
|
||||||
|
"milestone": "snapshotMilestoneVideo",
|
||||||
|
"normal": "snapshotVideo",
|
||||||
|
"new": "snapshotMilestoneVideo",
|
||||||
|
};
|
||||||
|
|
||||||
|
export const snapshotVideoWorker = async (job: Job): Promise<void> => {
|
||||||
|
const id = job.data.id;
|
||||||
|
const aid = Number(job.data.aid);
|
||||||
|
const type = job.data.type;
|
||||||
|
const task = snapshotTypeToTaskMap[type] ?? "snapshotVideo";
|
||||||
|
const retryInterval = type === "milestone" ? 5 * SECOND : 2 * MINUTE;
|
||||||
|
await withDbConnection(async (client: Client) => {
|
||||||
|
const exists = await snapshotScheduleExists(client, id);
|
||||||
|
if (!exists) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
const status = await getBiliVideoStatus(client, aid);
|
||||||
|
if (status !== 0) {
|
||||||
|
logger.warn(
|
||||||
|
`Video ${aid} has status ${status} in the database. Abort snapshoting.`,
|
||||||
|
"mq",
|
||||||
|
"fn:dispatchRegularSnapshotsWorker",
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
await setSnapshotStatus(client, id, "processing");
|
||||||
|
const stat = await insertVideoSnapshot(client, aid, task);
|
||||||
|
if (typeof stat === "number") {
|
||||||
|
await setBiliVideoStatus(client, aid, stat);
|
||||||
|
await setSnapshotStatus(client, id, "bili_error");
|
||||||
|
logger.warn(
|
||||||
|
`Bilibili return status ${status} when snapshoting for ${aid}.`,
|
||||||
|
"mq",
|
||||||
|
"fn:dispatchRegularSnapshotsWorker",
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
await setSnapshotStatus(client, id, "completed");
|
||||||
|
if (type === "new") {
|
||||||
|
const publihsedAt = await getSongsPublihsedAt(client, aid);
|
||||||
|
const timeSincePublished = stat.time - publihsedAt!;
|
||||||
|
const viewsPerHour = stat.views / timeSincePublished * HOUR;
|
||||||
|
if (timeSincePublished > 48 * HOUR) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (timeSincePublished > 2 * HOUR && viewsPerHour < 10) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
let intervalMins = 240;
|
||||||
|
if (viewsPerHour > 50) {
|
||||||
|
intervalMins = 120;
|
||||||
|
}
|
||||||
|
if (viewsPerHour > 100) {
|
||||||
|
intervalMins = 60;
|
||||||
|
}
|
||||||
|
if (viewsPerHour > 1000) {
|
||||||
|
intervalMins = 15;
|
||||||
|
}
|
||||||
|
await scheduleSnapshot(client, aid, type, Date.now() + intervalMins * MINUTE, true);
|
||||||
|
}
|
||||||
|
if (type !== "milestone") return;
|
||||||
|
const eta = await getAdjustedShortTermETA(client, aid);
|
||||||
|
if (eta > 144) {
|
||||||
|
const etaHoursString = eta.toFixed(2) + " hrs";
|
||||||
|
logger.warn(
|
||||||
|
`ETA (${etaHoursString}) too long for milestone snapshot. aid: ${aid}.`,
|
||||||
|
"mq",
|
||||||
|
"fn:dispatchRegularSnapshotsWorker",
|
||||||
|
);
|
||||||
|
}
|
||||||
|
const now = Date.now();
|
||||||
|
const targetTime = now + eta * HOUR;
|
||||||
|
await scheduleSnapshot(client, aid, type, targetTime);
|
||||||
|
await setSnapshotStatus(client, id, "completed");
|
||||||
|
return;
|
||||||
|
}, async (e, client) => {
|
||||||
|
if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") {
|
||||||
|
logger.warn(
|
||||||
|
`No available proxy for aid ${job.data.aid}.`,
|
||||||
|
"mq",
|
||||||
|
"fn:takeSnapshotForVideoWorker",
|
||||||
|
);
|
||||||
|
await setSnapshotStatus(client, id, "no_proxy");
|
||||||
|
await scheduleSnapshot(client, aid, type, Date.now() + retryInterval);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
logger.error(e as Error, "mq", "fn:takeSnapshotForVideoWorker");
|
||||||
|
await setSnapshotStatus(client, id, "failed");
|
||||||
|
}, async () => {
|
||||||
|
await lockManager.releaseLock("dispatchRegularSnapshots");
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
};
|
85
packages/crawler/mq/exec/takeBulkSnapshot.ts
Normal file
85
packages/crawler/mq/exec/takeBulkSnapshot.ts
Normal file
@ -0,0 +1,85 @@
|
|||||||
|
import { Job } from "npm:bullmq@5.45.2";
|
||||||
|
import { db } from "db/init.ts";
|
||||||
|
import {
|
||||||
|
bulkScheduleSnapshot,
|
||||||
|
bulkSetSnapshotStatus,
|
||||||
|
scheduleSnapshot,
|
||||||
|
snapshotScheduleExists,
|
||||||
|
} from "db/snapshotSchedule.ts";
|
||||||
|
import { bulkGetVideoStats } from "net/bulkGetVideoStats.ts";
|
||||||
|
import logger from "log/logger.ts";
|
||||||
|
import { NetSchedulerError } from "@core/net/delegate.ts";
|
||||||
|
import { HOUR, MINUTE, SECOND } from "@std/datetime";
|
||||||
|
import { getRegularSnapshotInterval } from "../task/regularSnapshotInterval.ts";
|
||||||
|
import { SnapshotScheduleType } from "@core/db/schema";
|
||||||
|
|
||||||
|
export const takeBulkSnapshotForVideosWorker = async (job: Job) => {
|
||||||
|
const schedules: SnapshotScheduleType[] = job.data.schedules;
|
||||||
|
const ids = schedules.map((schedule) => Number(schedule.id));
|
||||||
|
const aidsToFetch: number[] = [];
|
||||||
|
const client = await db.connect();
|
||||||
|
try {
|
||||||
|
for (const schedule of schedules) {
|
||||||
|
const aid = Number(schedule.aid);
|
||||||
|
const id = Number(schedule.id);
|
||||||
|
const exists = await snapshotScheduleExists(client, id);
|
||||||
|
if (!exists) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
aidsToFetch.push(aid);
|
||||||
|
}
|
||||||
|
const data = await bulkGetVideoStats(aidsToFetch);
|
||||||
|
if (typeof data === "number") {
|
||||||
|
await bulkSetSnapshotStatus(client, ids, "failed");
|
||||||
|
await bulkScheduleSnapshot(client, aidsToFetch, "normal", Date.now() + 15 * SECOND);
|
||||||
|
return `GET_BILI_STATUS_${data}`;
|
||||||
|
}
|
||||||
|
for (const video of data) {
|
||||||
|
const aid = video.id;
|
||||||
|
const stat = video.cnt_info;
|
||||||
|
const views = stat.play;
|
||||||
|
const danmakus = stat.danmaku;
|
||||||
|
const replies = stat.reply;
|
||||||
|
const likes = stat.thumb_up;
|
||||||
|
const coins = stat.coin;
|
||||||
|
const shares = stat.share;
|
||||||
|
const favorites = stat.collect;
|
||||||
|
const query: string = `
|
||||||
|
INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites)
|
||||||
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
||||||
|
`;
|
||||||
|
await client.queryObject(
|
||||||
|
query,
|
||||||
|
[aid, views, danmakus, replies, likes, coins, shares, favorites],
|
||||||
|
);
|
||||||
|
|
||||||
|
logger.log(`Taken snapshot for video ${aid} in bulk.`, "net", "fn:takeBulkSnapshotForVideosWorker");
|
||||||
|
}
|
||||||
|
await bulkSetSnapshotStatus(client, ids, "completed");
|
||||||
|
|
||||||
|
for (const schedule of schedules) {
|
||||||
|
const aid = Number(schedule.aid);
|
||||||
|
const type = schedule.type;
|
||||||
|
if (type == "archive") continue;
|
||||||
|
const interval = await getRegularSnapshotInterval(client, aid);
|
||||||
|
logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq");
|
||||||
|
await scheduleSnapshot(client, aid, "normal", Date.now() + interval * HOUR);
|
||||||
|
}
|
||||||
|
return `DONE`;
|
||||||
|
} catch (e) {
|
||||||
|
if (e instanceof NetSchedulerError && e.code === "NO_PROXY_AVAILABLE") {
|
||||||
|
logger.warn(
|
||||||
|
`No available proxy for bulk request now.`,
|
||||||
|
"mq",
|
||||||
|
"fn:takeBulkSnapshotForVideosWorker",
|
||||||
|
);
|
||||||
|
await bulkSetSnapshotStatus(client, ids, "no_proxy");
|
||||||
|
await bulkScheduleSnapshot(client, aidsToFetch, "normal", Date.now() + 20 * MINUTE * Math.random());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
logger.error(e as Error, "mq", "fn:takeBulkSnapshotForVideosWorker");
|
||||||
|
await bulkSetSnapshotStatus(client, ids, "failed");
|
||||||
|
} finally {
|
||||||
|
client.release();
|
||||||
|
}
|
||||||
|
};
|
@ -1 +0,0 @@
|
|||||||
export * from "mq/exec/getLatestVideos.ts";
|
|
@ -45,7 +45,7 @@ export async function initMQ() {
|
|||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
await SnapshotQueue.upsertJobScheduler("collectMilestoneSnapshots", {
|
await SnapshotQueue.upsertJobScheduler("dispatchMilestoneSnapshots", {
|
||||||
every: 5 * MINUTE,
|
every: 5 * MINUTE,
|
||||||
immediately: true,
|
immediately: true,
|
||||||
});
|
});
|
||||||
@ -61,7 +61,7 @@ export async function initMQ() {
|
|||||||
});
|
});
|
||||||
|
|
||||||
await SnapshotQueue.upsertJobScheduler("scheduleCleanup", {
|
await SnapshotQueue.upsertJobScheduler("scheduleCleanup", {
|
||||||
every: 30 * MINUTE,
|
every: 2 * MINUTE,
|
||||||
immediately: true,
|
immediately: true,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -2,7 +2,7 @@ import { findClosestSnapshot, getLatestSnapshot, hasAtLeast2Snapshots } from "db
|
|||||||
import { truncate } from "utils/truncate.ts";
|
import { truncate } from "utils/truncate.ts";
|
||||||
import { closetMilestone } from "./exec/snapshotTick.ts";
|
import { closetMilestone } from "./exec/snapshotTick.ts";
|
||||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
import { HOUR, MINUTE } from "$std/datetime/constants.ts";
|
import { HOUR, MINUTE } from "@std/datetime";
|
||||||
|
|
||||||
const log = (value: number, base: number = 10) => Math.log(value) / Math.log(base);
|
const log = (value: number, base: number = 10) => Math.log(value) / Math.log(base);
|
||||||
|
|
||||||
|
@ -2,7 +2,7 @@ import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
|||||||
import { aidExistsInSongs, getNotCollectedSongs } from "db/songs.ts";
|
import { aidExistsInSongs, getNotCollectedSongs } from "db/songs.ts";
|
||||||
import logger from "log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
import { scheduleSnapshot } from "db/snapshotSchedule.ts";
|
import { scheduleSnapshot } from "db/snapshotSchedule.ts";
|
||||||
import { MINUTE } from "$std/datetime/constants.ts";
|
import { MINUTE } from "@std/datetime";
|
||||||
|
|
||||||
export async function collectSongs(client: Client) {
|
export async function collectSongs(client: Client) {
|
||||||
const aids = await getNotCollectedSongs(client);
|
const aids = await getNotCollectedSongs(client);
|
||||||
|
13
packages/crawler/mq/task/getTimeoutSchedulesCount.ts
Normal file
13
packages/crawler/mq/task/getTimeoutSchedulesCount.ts
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
|
|
||||||
|
export async function getTimeoutSchedulesCount(client: Client) {
|
||||||
|
const query: string = `
|
||||||
|
SELECT COUNT(id)
|
||||||
|
FROM snapshot_schedule
|
||||||
|
WHERE status IN ('pending', 'processing')
|
||||||
|
AND started_at < NOW() - INTERVAL '30 minutes'
|
||||||
|
`;
|
||||||
|
|
||||||
|
const { rows } = await client.queryObject<{ count: number }>(query);
|
||||||
|
return rows[0].count;
|
||||||
|
}
|
@ -4,7 +4,7 @@ import { formatTimestampToPsql } from "utils/formatTimestampToPostgre.ts";
|
|||||||
import logger from "log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
import { ClassifyVideoQueue } from "mq/index.ts";
|
import { ClassifyVideoQueue } from "mq/index.ts";
|
||||||
import { userExistsInBiliUsers, videoExistsInAllData } from "db/allData.ts";
|
import { userExistsInBiliUsers, videoExistsInAllData } from "db/allData.ts";
|
||||||
import { HOUR, SECOND } from "$std/datetime/constants.ts";
|
import { HOUR, SECOND } from "@std/datetime";
|
||||||
|
|
||||||
export async function insertVideoInfo(client: Client, aid: number) {
|
export async function insertVideoInfo(client: Client, aid: number) {
|
||||||
const videoExists = await videoExistsInAllData(client, aid);
|
const videoExists = await videoExistsInAllData(client, aid);
|
||||||
|
@ -2,7 +2,7 @@ import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
|||||||
import { getLatestVideoAids } from "net/getLatestVideoAids.ts";
|
import { getLatestVideoAids } from "net/getLatestVideoAids.ts";
|
||||||
import { videoExistsInAllData } from "db/allData.ts";
|
import { videoExistsInAllData } from "db/allData.ts";
|
||||||
import { sleep } from "utils/sleep.ts";
|
import { sleep } from "utils/sleep.ts";
|
||||||
import { SECOND } from "$std/datetime/constants.ts";
|
import { SECOND } from "@std/datetime";
|
||||||
import logger from "log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
import { LatestVideosQueue } from "mq/index.ts";
|
import { LatestVideosQueue } from "mq/index.ts";
|
||||||
|
|
||||||
|
22
packages/crawler/mq/task/regularSnapshotInterval.ts
Normal file
22
packages/crawler/mq/task/regularSnapshotInterval.ts
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
import { findClosestSnapshot, findSnapshotBefore, getLatestSnapshot } from "db/snapshotSchedule.ts";
|
||||||
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
|
import { HOUR } from "@std/datetime";
|
||||||
|
|
||||||
|
export const getRegularSnapshotInterval = async (client: Client, aid: number) => {
|
||||||
|
const now = Date.now();
|
||||||
|
const date = new Date(now - 24 * HOUR);
|
||||||
|
let oldSnapshot = await findSnapshotBefore(client, aid, date);
|
||||||
|
if (!oldSnapshot) oldSnapshot = await findClosestSnapshot(client, aid, date);
|
||||||
|
const latestSnapshot = await getLatestSnapshot(client, aid);
|
||||||
|
if (!oldSnapshot || !latestSnapshot) return 0;
|
||||||
|
if (oldSnapshot.created_at === latestSnapshot.created_at) return 0;
|
||||||
|
const hoursDiff = (latestSnapshot.created_at - oldSnapshot.created_at) / HOUR;
|
||||||
|
if (hoursDiff < 8) return 24;
|
||||||
|
const viewsDiff = latestSnapshot.views - oldSnapshot.views;
|
||||||
|
if (viewsDiff === 0) return 72;
|
||||||
|
const speedPerDay = viewsDiff / (hoursDiff + 0.001) * 24;
|
||||||
|
if (speedPerDay < 6) return 36;
|
||||||
|
if (speedPerDay < 120) return 24;
|
||||||
|
if (speedPerDay < 320) return 12;
|
||||||
|
return 6;
|
||||||
|
};
|
16
packages/crawler/mq/task/removeAllTimeoutSchedules.ts
Normal file
16
packages/crawler/mq/task/removeAllTimeoutSchedules.ts
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
|
import logger from "log/logger.ts";
|
||||||
|
|
||||||
|
export async function removeAllTimeoutSchedules(client: Client) {
|
||||||
|
logger.log(
|
||||||
|
"Too many timeout schedules, directly removing these schedules...",
|
||||||
|
"mq",
|
||||||
|
"fn:scheduleCleanupWorker",
|
||||||
|
);
|
||||||
|
const query: string = `
|
||||||
|
DELETE FROM snapshot_schedule
|
||||||
|
WHERE status IN ('pending', 'processing')
|
||||||
|
AND started_at < NOW() - INTERVAL '30 minutes'
|
||||||
|
`;
|
||||||
|
await client.queryObject(query);
|
||||||
|
}
|
@ -48,4 +48,4 @@ export async function getVideoInfoByBV(bvid: string, task: string): Promise<Vide
|
|||||||
return data.code;
|
return data.code;
|
||||||
}
|
}
|
||||||
return data.data;
|
return data.data;
|
||||||
}
|
}
|
||||||
|
@ -1,32 +1,33 @@
|
|||||||
import { ConnectionOptions, Job, Worker } from "bullmq";
|
import { ConnectionOptions, Job, Worker } from "bullmq";
|
||||||
import { collectSongsWorker, getLatestVideosWorker } from "mq/executors.ts";
|
import {
|
||||||
|
archiveSnapshotsWorker,
|
||||||
|
bulkSnapshotTickWorker,
|
||||||
|
collectSongsWorker,
|
||||||
|
dispatchMilestoneSnapshotsWorker,
|
||||||
|
dispatchRegularSnapshotsWorker,
|
||||||
|
getLatestVideosWorker,
|
||||||
|
getVideoInfoWorker,
|
||||||
|
scheduleCleanupWorker,
|
||||||
|
snapshotTickWorker,
|
||||||
|
snapshotVideoWorker,
|
||||||
|
takeBulkSnapshotForVideosWorker,
|
||||||
|
} from "mq/exec/executors.ts";
|
||||||
import { redis } from "@core/db/redis.ts";
|
import { redis } from "@core/db/redis.ts";
|
||||||
import logger from "log/logger.ts";
|
import logger from "log/logger.ts";
|
||||||
import { lockManager } from "mq/lockManager.ts";
|
import { lockManager } from "mq/lockManager.ts";
|
||||||
import { WorkerError } from "mq/schema.ts";
|
import { WorkerError } from "mq/schema.ts";
|
||||||
import { getVideoInfoWorker } from "mq/exec/getLatestVideos.ts";
|
|
||||||
import {
|
|
||||||
archiveSnapshotsWorker,
|
|
||||||
bulkSnapshotTickWorker,
|
|
||||||
collectMilestoneSnapshotsWorker,
|
|
||||||
regularSnapshotsWorker,
|
|
||||||
scheduleCleanupWorker,
|
|
||||||
snapshotTickWorker,
|
|
||||||
takeBulkSnapshotForVideosWorker,
|
|
||||||
takeSnapshotForVideoWorker,
|
|
||||||
} from "mq/exec/snapshotTick.ts";
|
|
||||||
|
|
||||||
const releaseLockForJob = async (name: string) => {
|
const releaseLockForJob = async (name: string) => {
|
||||||
await lockManager.releaseLock(name);
|
await lockManager.releaseLock(name);
|
||||||
logger.log(`Released lock: ${name}`, "mq");
|
logger.log(`Released lock: ${name}`, "mq");
|
||||||
}
|
};
|
||||||
|
|
||||||
const releaseAllLocks = async () => {
|
const releaseAllLocks = async () => {
|
||||||
const locks = ["dispatchRegularSnapshots", "dispatchArchiveSnapshots", "getLatestVideos"];
|
const locks = ["dispatchRegularSnapshots", "dispatchArchiveSnapshots", "getLatestVideos"];
|
||||||
for (const lock of locks) {
|
for (const lock of locks) {
|
||||||
await releaseLockForJob(lock);
|
await releaseLockForJob(lock);
|
||||||
}
|
}
|
||||||
}
|
};
|
||||||
|
|
||||||
Deno.addSignalListener("SIGINT", async () => {
|
Deno.addSignalListener("SIGINT", async () => {
|
||||||
logger.log("SIGINT Received: Shutting down workers...", "mq");
|
logger.log("SIGINT Received: Shutting down workers...", "mq");
|
||||||
@ -80,13 +81,13 @@ const snapshotWorker = new Worker(
|
|||||||
async (job: Job) => {
|
async (job: Job) => {
|
||||||
switch (job.name) {
|
switch (job.name) {
|
||||||
case "snapshotVideo":
|
case "snapshotVideo":
|
||||||
return await takeSnapshotForVideoWorker(job);
|
return await snapshotVideoWorker(job);
|
||||||
case "snapshotTick":
|
case "snapshotTick":
|
||||||
return await snapshotTickWorker(job);
|
return await snapshotTickWorker(job);
|
||||||
case "collectMilestoneSnapshots":
|
case "dispatchMilestoneSnapshots":
|
||||||
return await collectMilestoneSnapshotsWorker(job);
|
return await dispatchMilestoneSnapshotsWorker(job);
|
||||||
case "dispatchRegularSnapshots":
|
case "dispatchRegularSnapshots":
|
||||||
return await regularSnapshotsWorker(job);
|
return await dispatchRegularSnapshotsWorker(job);
|
||||||
case "scheduleCleanup":
|
case "scheduleCleanup":
|
||||||
return await scheduleCleanupWorker(job);
|
return await scheduleCleanupWorker(job);
|
||||||
case "bulkSnapshotVideo":
|
case "bulkSnapshotVideo":
|
||||||
@ -105,4 +106,4 @@ const snapshotWorker = new Worker(
|
|||||||
snapshotWorker.on("error", (err) => {
|
snapshotWorker.on("error", (err) => {
|
||||||
const e = err as WorkerError;
|
const e = err as WorkerError;
|
||||||
logger.error(e.rawError, e.service, e.codePath);
|
logger.error(e.rawError, e.service, e.codePath);
|
||||||
});
|
});
|
||||||
|
Loading…
Reference in New Issue
Block a user