Compare commits

..

No commits in common. "d44ba8a0aee3f93802905a3bbb1dada481349fdc" and "0455abce2e9916cc1ad0d3375b3894b789c13623" have entirely different histories.

13 changed files with 134 additions and 334 deletions

View File

@ -12,7 +12,7 @@
"update": "deno run -A -r https://fresh.deno.dev/update .",
"worker:main": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write --allow-run ./src/worker.ts",
"worker:filter": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net --allow-write ./src/filterWorker.ts",
"adder": "deno run --env-file=.env --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts",
"adder": "deno run --allow-env --allow-read --allow-ffi --allow-net ./src/jobAdder.ts",
"bullui": "deno run --allow-read --allow-env --allow-ffi --allow-net ./src/bullui.ts",
"all": "concurrently 'deno task worker:main' 'deno task adder' 'deno task bullui' 'deno task worker:filter'",
"test": "deno test ./test/ --allow-env --allow-ffi --allow-read --allow-net --allow-write --allow-run"

View File

@ -75,13 +75,3 @@ export async function setBiliVideoStatus(client: Client, aid: number, status: nu
[status, aid],
);
}
export async function getBiliVideoStatus(client: Client, aid: number) {
const queryResult = await client.queryObject<{ status: number }>(
`SELECT status FROM bilibili_metadata WHERE aid = $1`,
[aid],
);
const rows = queryResult.rows;
if (rows.length === 0) return 0;
return rows[0].status;
}

View File

@ -7,7 +7,6 @@ export async function getVideosNearMilestone(client: Client) {
FROM latest_video_snapshot ls
INNER JOIN
songs s ON ls.aid = s.aid
AND s.deleted = false
WHERE
s.deleted = false AND
(views >= 90000 AND views < 100000) OR

View File

@ -3,62 +3,6 @@ import { formatTimestampToPsql } from "lib/utils/formatTimestampToPostgre.ts";
import { SnapshotScheduleType } from "./schema.d.ts";
import logger from "lib/log/logger.ts";
import { MINUTE } from "$std/datetime/constants.ts";
import { redis } from "lib/db/redis.ts";
import { Redis } from "ioredis";
const WINDOW_SIZE = 2880;
const REDIS_KEY = "cvsa:snapshot_window_counts";
let lastAvailableWindow: { offset: number; count: number } | null = null;
function getCurrentWindowIndex(): number {
const now = new Date();
const minutesSinceMidnight = now.getHours() * 60 + now.getMinutes();
const currentWindow = Math.floor(minutesSinceMidnight / 5);
return currentWindow;
}
export async function refreshSnapshotWindowCounts(client: Client, redisClient: Redis) {
const now = new Date();
const startTime = now.getTime();
const result = await client.queryObject<{ window_start: Date; count: number }>`
SELECT
date_trunc('hour', started_at) +
(EXTRACT(minute FROM started_at)::int / 5 * INTERVAL '5 minutes') AS window_start,
COUNT(*) AS count
FROM snapshot_schedule
WHERE started_at >= NOW() AND status = 'pending' AND started_at <= NOW() + INTERVAL '10 days'
GROUP BY 1
ORDER BY window_start
`
await redisClient.del(REDIS_KEY);
const currentWindow = getCurrentWindowIndex();
for (const row of result.rows) {
const targetOffset = Math.floor((row.window_start.getTime() - startTime) / (5 * MINUTE));
const offset = (currentWindow + targetOffset);
if (offset >= 0 && offset < WINDOW_SIZE) {
await redisClient.hset(REDIS_KEY, offset.toString(), Number(row.count));
}
}
lastAvailableWindow = null;
}
export async function initSnapshotWindowCounts(client: Client, redisClient: Redis) {
await refreshSnapshotWindowCounts(client, redisClient);
setInterval(async () => {
await refreshSnapshotWindowCounts(client, redisClient);
}, 5 * MINUTE);
}
async function getWindowCount(redisClient: Redis, offset: number): Promise<number> {
const count = await redisClient.hget(REDIS_KEY, offset.toString());
return count ? parseInt(count, 10) : 0;
}
export async function snapshotScheduleExists(client: Client, id: number) {
const res = await client.queryObject<{ id: number }>(
@ -68,6 +12,9 @@ export async function snapshotScheduleExists(client: Client, id: number) {
return res.rows.length > 0;
}
/*
Returns true if the specified `aid` has at least one record with "pending" or "processing" status.
*/
export async function videoHasActiveSchedule(client: Client, aid: number) {
const res = await client.queryObject<{ status: string }>(
`SELECT status FROM snapshot_schedule WHERE aid = $1 AND (status = 'pending' OR status = 'processing')`,
@ -113,31 +60,6 @@ export async function findClosestSnapshot(
};
}
export async function findSnapshotBefore(
client: Client,
aid: number,
targetTime: Date,
): Promise<Snapshot | null> {
const query = `
SELECT created_at, views
FROM video_snapshot
WHERE aid = $1
AND created_at <= $2::timestamptz
ORDER BY created_at DESC
LIMIT 1
`;
const result = await client.queryObject<{ created_at: string; views: number }>(
query,
[aid, targetTime.toISOString()],
);
if (result.rows.length === 0) return null;
const row = result.rows[0];
return {
created_at: new Date(row.created_at).getTime(),
views: row.views,
};
}
export async function hasAtLeast2Snapshots(client: Client, aid: number) {
const res = await client.queryObject<{ count: number }>(
`SELECT COUNT(*) FROM video_snapshot WHERE aid = $1`,
@ -183,12 +105,10 @@ export async function getSnapshotScheduleCountWithinRange(client: Client, start:
* @param aid The aid of the video.
* @param targetTime Scheduled time for snapshot. (Timestamp in milliseconds)
*/
export async function scheduleSnapshot(client: Client, aid: number, type: string, targetTime: number, force: boolean = false) {
if (await videoHasActiveSchedule(client, aid) && !force) return;
let adjustedTime = new Date(targetTime);
if (type !== "milestone" && type !== "new") {
adjustedTime = await adjustSnapshotTime(new Date(targetTime), 1000, redis);
}
export async function scheduleSnapshot(client: Client, aid: number, type: string, targetTime: number) {
if (await videoHasActiveSchedule(client, aid)) return;
const allowedCount = type === "milestone" ? 2000 : 800;
const adjustedTime = await adjustSnapshotTime(client, new Date(targetTime), allowedCount);
logger.log(`Scheduled snapshot for ${aid} at ${adjustedTime.toISOString()}`, "mq", "fn:scheduleSnapshot");
return client.queryObject(
`INSERT INTO snapshot_schedule (aid, type, started_at) VALUES ($1, $2, $3)`,
@ -196,56 +116,76 @@ export async function scheduleSnapshot(client: Client, aid: number, type: string
);
}
/**
* Adjust the trigger time of the snapshot to ensure it does not exceed the frequency limit
* @param client PostgreSQL client
* @param expectedStartTime The expected snapshot time
* @param allowedCounts The number of snapshots allowed in a 5-minute window (default: 2000)
* @returns The adjusted actual snapshot time within the first available window
*/
export async function adjustSnapshotTime(
client: Client,
expectedStartTime: Date,
allowedCounts: number = 1000,
redisClient: Redis,
allowedCounts: number = 2000,
): Promise<Date> {
const currentWindow = getCurrentWindowIndex();
const targetOffset = Math.floor((expectedStartTime.getTime() - Date.now()) / (5 * MINUTE)) - 6;
// Query to find the closest available window by checking both past and future windows
const findWindowQuery = `
WITH base AS (
SELECT
date_trunc('minute', $1::timestamp)
- (EXTRACT(minute FROM $1::timestamp)::int % 5 * INTERVAL '1 minute') AS base_time
),
offsets AS (
SELECT generate_series(-100, 100) AS "offset"
),
candidate_windows AS (
SELECT
(base.base_time + ("offset" * INTERVAL '5 minutes')) AS window_start,
ABS("offset") AS distance
FROM base
CROSS JOIN offsets
)
SELECT
window_start
FROM
candidate_windows cw
LEFT JOIN
snapshot_schedule s
ON
s.started_at >= cw.window_start
AND s.started_at < cw.window_start + INTERVAL '5 minutes'
AND s.status = 'pending'
GROUP BY
cw.window_start, cw.distance
HAVING
COUNT(s.*) < $2
ORDER BY
cw.distance, cw.window_start
LIMIT 1;
`;
let initialOffset = currentWindow + Math.max(targetOffset, 0);
try {
// Execute query to find the first available window
const windowResult = await client.queryObject<{ window_start: Date }>(
findWindowQuery,
[expectedStartTime, allowedCounts],
);
if (lastAvailableWindow && lastAvailableWindow.count < allowedCounts) {
initialOffset = Math.max(lastAvailableWindow.offset - 2, 0);
}
let timePerIteration = 0;
const t = performance.now();
for (let i = initialOffset; i < WINDOW_SIZE; i++) {
const offset = i;
const count = await getWindowCount(redisClient, offset);
if (count < allowedCounts) {
const newCount = await redisClient.hincrby(REDIS_KEY, offset.toString(), 1);
lastAvailableWindow = { offset, count: newCount };
const startPoint = new Date();
startPoint.setHours(0, 0, 0, 0);
const startTime = startPoint.getTime();
const windowStart = startTime + offset * 5 * MINUTE;
const randomDelay = Math.floor(Math.random() * 5 * MINUTE);
const delayedDate = new Date(windowStart + randomDelay);
const now = new Date();
if (delayedDate.getTime() < now.getTime()) {
const elapsed = performance.now() - t;
timePerIteration = elapsed / (i+1);
logger.log(`${timePerIteration.toFixed(3)}ms * ${i+1}iterations`, "perf", "fn:adjustSnapshotTime");
return now;
}
const elapsed = performance.now() - t;
timePerIteration = elapsed / (i+1);
logger.log(`${timePerIteration.toFixed(3)}ms * ${i+1}iterations`, "perf", "fn:adjustSnapshotTime");
return delayedDate;
}
}
const elapsed = performance.now() - t;
timePerIteration = elapsed / WINDOW_SIZE;
logger.log(`${timePerIteration.toFixed(3)}ms * ${WINDOW_SIZE}iterations`, "perf", "fn:adjustSnapshotTime");
// If no available window found, return original time (may exceed limit)
if (windowResult.rows.length === 0) {
return expectedStartTime;
}
// Get the target window start time
const windowStart = windowResult.rows[0].window_start;
// Add random delay within the 5-minute window to distribute load
const randomDelay = Math.floor(Math.random() * 5 * MINUTE);
return new Date(windowStart.getTime() + randomDelay);
} catch {
return expectedStartTime; // Fallback to original time
}
}
export async function getSnapshotsInNextSecond(client: Client) {
const query = `
@ -258,7 +198,7 @@ export async function getSnapshotsInNextSecond(client: Client) {
ELSE 1
END,
started_at
LIMIT 10;
LIMIT 3;
`;
const res = await client.queryObject<SnapshotScheduleType>(query, []);
return res.rows;

View File

@ -1,5 +1,4 @@
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { parseTimestampFromPsql } from "lib/utils/formatTimestampToPostgre.ts";
export async function getNotCollectedSongs(client: Client) {
const queryResult = await client.queryObject<{ aid: number }>(`
@ -28,18 +27,3 @@ export async function aidExistsInSongs(client: Client, aid: number) {
);
return queryResult.rows[0].exists;
}
export async function getSongsPublihsedAt(client: Client, aid: number) {
const queryResult = await client.queryObject<{ published_at: string }>(
`
SELECT published_at
FROM songs
WHERE aid = $1;
`,
[aid],
);
if (queryResult.rows.length === 0) {
return null;
}
return parseTimestampFromPsql(queryResult.rows[0].published_at);
}

View File

@ -7,8 +7,6 @@ import logger from "lib/log/logger.ts";
import { lockManager } from "lib/mq/lockManager.ts";
import { aidExistsInSongs } from "lib/db/songs.ts";
import { insertIntoSongs } from "lib/mq/task/collectSongs.ts";
import { scheduleSnapshot } from "lib/db/snapshotSchedule.ts";
import { MINUTE } from "$std/datetime/constants.ts";
export const classifyVideoWorker = async (job: Job) => {
const client = await db.connect();
@ -29,7 +27,6 @@ export const classifyVideoWorker = async (job: Job) => {
const exists = await aidExistsInSongs(client, aid);
if (!exists && label !== 0) {
await scheduleSnapshot(client, aid, "new", Date.now() + 10 * MINUTE, true);
await insertIntoSongs(client, aid);
}

View File

@ -3,7 +3,6 @@ import { db } from "lib/db/init.ts";
import { getLatestVideoSnapshot, getVideosNearMilestone } from "lib/db/snapshot.ts";
import {
findClosestSnapshot,
findSnapshotBefore,
getLatestSnapshot,
getSnapshotsInNextSecond,
getVideosWithoutActiveSnapshotSchedule,
@ -19,10 +18,9 @@ import logger from "lib/log/logger.ts";
import { SnapshotQueue } from "lib/mq/index.ts";
import { insertVideoSnapshot } from "lib/mq/task/getVideoStats.ts";
import { NetSchedulerError } from "lib/mq/scheduler.ts";
import { getBiliVideoStatus, setBiliVideoStatus } from "lib/db/allData.ts";
import { setBiliVideoStatus } from "lib/db/allData.ts";
import { truncate } from "lib/utils/truncate.ts";
import { lockManager } from "lib/mq/lockManager.ts";
import { getSongsPublihsedAt } from "lib/db/songs.ts";
const priorityMap: { [key: string]: number } = {
"milestone": 1,
@ -32,7 +30,6 @@ const priorityMap: { [key: string]: number } = {
const snapshotTypeToTaskMap: { [key: string]: string } = {
"milestone": "snapshotMilestoneVideo",
"normal": "snapshotVideo",
"new": "snapshotMilestoneVideo",
};
export const snapshotTickWorker = async (_job: Job) => {
@ -40,15 +37,11 @@ export const snapshotTickWorker = async (_job: Job) => {
try {
const schedules = await getSnapshotsInNextSecond(client);
for (const schedule of schedules) {
if (await videoHasProcessingSchedule(client, schedule.aid)) {
return `ALREADY_PROCESSING`;
}
let priority = 3;
if (schedule.type && priorityMap[schedule.type]) {
priority = priorityMap[schedule.type];
}
const aid = Number(schedule.aid);
await setSnapshotStatus(client, schedule.id, "processing");
await SnapshotQueue.add("snapshotVideo", {
aid: aid,
id: Number(schedule.id),
@ -137,30 +130,11 @@ export const collectMilestoneSnapshotsWorker = async (_job: Job) => {
}
};
const getRegularSnapshotInterval = async (client: Client, aid: number) => {
const now = Date.now();
const date = new Date(now - 24 * HOUR);
const oldSnapshot = await findSnapshotBefore(client, aid, date);
const latestSnapshot = await getLatestSnapshot(client, aid);
if (!oldSnapshot || !latestSnapshot) return 0;
if (oldSnapshot.created_at === latestSnapshot.created_at) return 0;
const hoursDiff = (latestSnapshot.created_at - oldSnapshot.created_at) / HOUR;
if (hoursDiff < 8) return 24;
const viewsDiff = latestSnapshot.views - oldSnapshot.views;
if (viewsDiff === 0) return 72;
const speedPerDay = viewsDiff / hoursDiff * 24;
if (speedPerDay < 6) return 36;
if (speedPerDay < 120) return 24;
if (speedPerDay < 320) return 12;
return 6;
};
export const regularSnapshotsWorker = async (_job: Job) => {
const client = await db.connect();
const startedAt = Date.now();
if (await lockManager.isLocked("dispatchRegularSnapshots")) {
logger.log("dispatchRegularSnapshots is already running", "mq");
client.release();
return;
}
await lockManager.acquireLock("dispatchRegularSnapshots", 30 * 60);
@ -171,9 +145,7 @@ export const regularSnapshotsWorker = async (_job: Job) => {
const latestSnapshot = await getLatestVideoSnapshot(client, aid);
const now = Date.now();
const lastSnapshotedAt = latestSnapshot?.time ?? now;
const interval = await getRegularSnapshotInterval(client, aid);
logger.log(`Schedule regular snapshot for aid ${aid} in ${interval} hours.`, "mq")
const targetTime = truncate(lastSnapshotedAt + interval * HOUR, now + 1, now + 100000 * WEEK);
const targetTime = truncate(lastSnapshotedAt + 24 * HOUR, now + 1, now + 100000 * WEEK);
await scheduleSnapshot(client, aid, "normal", targetTime);
if (now - startedAt > 25 * MINUTE) {
return;
@ -196,49 +168,23 @@ export const takeSnapshotForVideoWorker = async (job: Job) => {
const retryInterval = type === "milestone" ? 5 * SECOND : 2 * MINUTE;
const exists = await snapshotScheduleExists(client, id);
if (!exists) {
client.release();
return;
}
const status = await getBiliVideoStatus(client, aid);
if (status !== 0) {
client.release();
return `REFUSE_WORKING_BILI_STATUS_${status}`;
}
try {
if (await videoHasProcessingSchedule(client, aid)) {
return `ALREADY_PROCESSING`;
}
await setSnapshotStatus(client, id, "processing");
const stat = await insertVideoSnapshot(client, aid, task);
if (typeof stat === "number") {
await setBiliVideoStatus(client, aid, stat);
await setSnapshotStatus(client, id, "completed");
return `GET_BILI_STATUS_${stat}`;
return `BILI_STATUS_${stat}`;
}
await setSnapshotStatus(client, id, "completed");
if (type === "normal") {
const interval = await getRegularSnapshotInterval(client, aid);
logger.log(`Schedule regular snapshot for aid ${aid} in ${interval} hours.`, "mq")
await scheduleSnapshot(client, aid, type, Date.now() + interval * HOUR);
await scheduleSnapshot(client, aid, type, Date.now() + 24 * HOUR);
return `DONE`;
} else if (type === "new") {
const publihsedAt = await getSongsPublihsedAt(client, aid);
const timeSincePublished = stat.time - publihsedAt!;
const viewsPerHour = stat.views / timeSincePublished * HOUR;
if (timeSincePublished > 48 * HOUR) {
return `DONE`;
}
if (timeSincePublished > 2 * HOUR && viewsPerHour < 10) {
return `DONE`;
}
let intervalMins = 240;
if (viewsPerHour > 50) {
intervalMins = 120;
}
if (viewsPerHour > 100) {
intervalMins = 60;
}
if (viewsPerHour > 1000) {
intervalMins = 15;
}
await scheduleSnapshot(client, aid, type, Date.now() + intervalMins * MINUTE, true);
}
if (type !== "milestone") return `DONE`;
const eta = await getAdjustedShortTermETA(client, aid);
@ -264,29 +210,3 @@ export const takeSnapshotForVideoWorker = async (job: Job) => {
client.release();
}
};
export const scheduleCleanupWorker = async (_job: Job) => {
const client = await db.connect();
try {
const query = `
SELECT id, aid, type
FROM snapshot_schedule
WHERE status IN ('pending', 'processing')
AND started_at < NOW() - INTERVAL '5 minutes'
`;
const { rows } = await client.queryObject<{ id: bigint; aid: bigint; type: string }>(query);
if (rows.length === 0) return;
for (const row of rows) {
const id = Number(row.id);
const aid = Number(row.aid);
const type = row.type;
await setSnapshotStatus(client, id, "timeout");
await scheduleSnapshot(client, aid, type, Date.now() + 10 * SECOND);
logger.log(`Schedule ${id} has no response received for 5 minutes, rescheduled.`, "mq", "fn:scheduleCleanupWorker")
}
} catch (e) {
logger.error(e as Error, "mq", "fn:scheduleCleanupWorker");
} finally {
client.release();
}
};

View File

@ -1,30 +1,20 @@
import { MINUTE, SECOND } from "$std/datetime/constants.ts";
import { ClassifyVideoQueue, LatestVideosQueue, SnapshotQueue } from "lib/mq/index.ts";
import logger from "lib/log/logger.ts";
import { initSnapshotWindowCounts } from "lib/db/snapshotSchedule.ts";
import { db } from "lib/db/init.ts";
import { redis } from "lib/db/redis.ts";
export async function initMQ() {
const client = await db.connect();
try {
await initSnapshotWindowCounts(client, redis);
await LatestVideosQueue.upsertJobScheduler("getLatestVideos", {
every: 1 * MINUTE,
immediately: true,
});
await ClassifyVideoQueue.upsertJobScheduler("classifyVideos", {
every: 5 * MINUTE,
immediately: true,
});
await LatestVideosQueue.upsertJobScheduler("collectSongs", {
every: 3 * MINUTE,
immediately: true,
});
await SnapshotQueue.upsertJobScheduler("snapshotTick", {
every: 1 * SECOND,
immediately: true,
@ -45,13 +35,7 @@ export async function initMQ() {
immediately: true,
});
await SnapshotQueue.upsertJobScheduler("scheduleCleanup", {
every: 30 * MINUTE,
immediately: true,
});
await SnapshotQueue.removeJobScheduler("scheduleSnapshotTick");
logger.log("Message queue initialized.");
} finally {
client.release();
}
}

View File

@ -333,12 +333,6 @@ const biliLimiterConfig: RateLimiterConfig[] = [
},
];
const bili_test = biliLimiterConfig;
bili_test[0].max = 10;
bili_test[1].max = 36;
bili_test[2].max = 150;
bili_test[3].max = 1000;
/*
Execution order for setup:
@ -370,7 +364,7 @@ for (const region of regions) {
netScheduler.addTask("getVideoInfo", "bilibili", "all");
netScheduler.addTask("getLatestVideos", "bilibili", "all");
netScheduler.addTask("snapshotMilestoneVideo", "bilibili", regions.map((region) => `alicloud-${region}`));
netScheduler.addTask("snapshotVideo", "bili_test", [
netScheduler.addTask("snapshotVideo", "bilibili", [
"alicloud-qingdao",
"alicloud-shanghai",
"alicloud-zhangjiakou",
@ -381,8 +375,7 @@ netScheduler.addTask("snapshotVideo", "bili_test", [
netScheduler.setTaskLimiter("getVideoInfo", videoInfoRateLimiterConfig);
netScheduler.setTaskLimiter("getLatestVideos", null);
netScheduler.setTaskLimiter("snapshotMilestoneVideo", null);
netScheduler.setTaskLimiter("snapshotVideo", null);
netScheduler.setTaskLimiter("snapshotVideo", videoInfoRateLimiterConfig);
netScheduler.setProviderLimiter("bilibili", biliLimiterConfig);
netScheduler.setProviderLimiter("bili_test", bili_test);
export default netScheduler;

View File

@ -1,8 +1,6 @@
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
import { aidExistsInSongs, getNotCollectedSongs } from "lib/db/songs.ts";
import logger from "lib/log/logger.ts";
import { scheduleSnapshot } from "lib/db/snapshotSchedule.ts";
import { MINUTE } from "$std/datetime/constants.ts";
export async function collectSongs(client: Client) {
const aids = await getNotCollectedSongs(client);
@ -10,7 +8,6 @@ export async function collectSongs(client: Client) {
const exists = await aidExistsInSongs(client, aid);
if (exists) continue;
await insertIntoSongs(client, aid);
await scheduleSnapshot(client, aid, "new", Date.now() + 10 * MINUTE, true);
logger.log(`Video ${aid} was added into the songs table.`, "mq", "fn:collectSongs");
}
}

18
lib/net/bilibili.d.ts vendored
View File

@ -9,24 +9,6 @@ export type VideoListResponse = BaseResponse<VideoListData>;
export type VideoDetailsResponse = BaseResponse<VideoDetailsData>;
export type VideoTagsResponse = BaseResponse<VideoTagsData>;
export type VideoInfoResponse = BaseResponse<VideoInfoData>;
export type MediaListInfoResponse = BaseResponse<MediaListInfoData>;
type MediaListInfoData = MediaListInfoItem[];
interface MediaListInfoItem {
bvid: string;
id: number;
cnt_info: {
coin: number;
collect: number;
danmaku: number;
play: number;
reply: number;
share: number;
thumb_up: number;
}
}
interface VideoInfoData {
bvid: string;

View File

@ -10,7 +10,6 @@ import {
regularSnapshotsWorker,
snapshotTickWorker,
takeSnapshotForVideoWorker,
scheduleCleanupWorker
} from "lib/mq/exec/snapshotTick.ts";
Deno.addSignalListener("SIGINT", async () => {
@ -81,14 +80,11 @@ const snapshotWorker = new Worker(
case "dispatchRegularSnapshots":
await regularSnapshotsWorker(job);
break;
case "scheduleCleanup":
await scheduleCleanupWorker(job);
break;
default:
break;
}
},
{ connection: redis as ConnectionOptions, concurrency: 50, removeOnComplete: { count: 2000 } },
{ connection: redis as ConnectionOptions, concurrency: 10, removeOnComplete: { count: 2000 } },
);
snapshotWorker.on("error", (err) => {
@ -98,4 +94,4 @@ snapshotWorker.on("error", (err) => {
snapshotWorker.on("closed", async () => {
await lockManager.releaseLock("dispatchRegularSnapshots");
});
})

View File

@ -0,0 +1,18 @@
import { assertEquals, assertInstanceOf, assertNotEquals } from "@std/assert";
import { findClosestSnapshot } from "lib/db/snapshotSchedule.ts";
import { postgresConfig } from "lib/db/pgConfig.ts";
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
Deno.test("Snapshot Schedule - getShortTermTimeFeaturesForVideo", async () => {
const client = new Client(postgresConfig);
try {
const result = await findClosestSnapshot(client, 247308539, new Date(1741983383000));
assertNotEquals(result, null);
const created_at = result!.created_at;
const views = result!.views;
assertInstanceOf(created_at, Date);
assertEquals(typeof views, "number");
} finally {
client.end();
}
});