diff --git a/.prettierrc b/.prettierrc index 0961adb..d8cc397 100644 --- a/.prettierrc +++ b/.prettierrc @@ -3,6 +3,6 @@ "tabWidth": 4, "trailingComma": "none", "singleQuote": false, - "printWidth": 120, + "printWidth": 100, "endOfLine": "lf" } diff --git a/bun.lock b/bun.lock index 8f40768..36839ca 100644 --- a/bun.lock +++ b/bun.lock @@ -63,6 +63,7 @@ "@bull-board/express": "^6.9.5", "@huggingface/transformers": "^3.5.1", "bullmq": "^5.52.1", + "date-fns": "^4.1.0", "express": "^5.1.0", "ioredis": "^5.6.1", "onnxruntime-node": "1.19.2", diff --git a/packages/core/db/snapshots/milestone.ts b/packages/core/db/snapshots/milestone.ts index c6b6422..5bad4e8 100644 --- a/packages/core/db/snapshots/milestone.ts +++ b/packages/core/db/snapshots/milestone.ts @@ -1,10 +1,10 @@ import { MINUTE, HOUR, getClosetMilestone } from "@core/lib"; import { getLatestSnapshot, getClosestSnapshot } from "@core/db"; -export const getShortTermETA = async (aid: number, targetViews?: number): Promise => { +export const getMilestoneETA = async (aid: number, targetViews?: number): Promise => { const DELTA = 1e-5; let minETAHours = Infinity; - const timeIntervals = [20 * MINUTE, HOUR, 3 * HOUR, 6 * HOUR, 24 * HOUR, 72 * HOUR, 168 * HOUR]; + const timeIntervals = [3 * HOUR, 24 * HOUR, 96 * HOUR]; const currentTimestamp = new Date().getTime(); const latestSnapshot = await getLatestSnapshot(aid); const latestSnapshotTime = new Date(latestSnapshot.time).getTime(); diff --git a/packages/crawler/db/snapshotSchedule.ts b/packages/crawler/db/snapshotSchedule.ts index 0f4223e..db46319 100644 --- a/packages/crawler/db/snapshotSchedule.ts +++ b/packages/crawler/db/snapshotSchedule.ts @@ -341,3 +341,21 @@ export async function getVideosWithoutActiveSnapshotScheduleByType(sql: Psql, ty `; return rows.map((r) => Number(r.aid)); } + +export async function getCommonArchiveAids(sql: Psql) { + const rows = await sql<{ aid: string }[]>` + SELECT b.aid + FROM bilibili_metadata b + LEFT JOIN snapshot_schedule ss ON + b.aid = ss.aid AND + (ss.status = 'pending' OR ss.status = 'processing') AND + ss.type = 'archive' + WHERE ss.aid IS NULL + AND NOT EXISTS ( + SELECT 1 + FROM songs s + WHERE s.aid = b.aid + ) + `; + return rows.map((r) => Number(r.aid)); +} diff --git a/packages/crawler/mq/exec/archiveSnapshots.ts b/packages/crawler/mq/exec/archiveSnapshots.ts index 158ba4b..acfc54b 100644 --- a/packages/crawler/mq/exec/archiveSnapshots.ts +++ b/packages/crawler/mq/exec/archiveSnapshots.ts @@ -1,28 +1,34 @@ import { Job } from "bullmq"; -import { getVideosWithoutActiveSnapshotScheduleByType, scheduleSnapshot } from "db/snapshotSchedule"; +import { + getCommonArchiveAids, + getVideosWithoutActiveSnapshotScheduleByType, + scheduleSnapshot +} from "db/snapshotSchedule"; import logger from "@core/log"; import { lockManager } from "@core/mq/lockManager"; -import { getLatestVideoSnapshot } from "db/snapshot"; import { MINUTE } from "@core/lib"; import { sql } from "@core/db/dbNew"; +import { + nextMonday, + nextSaturday, + formatDistanceStrict, + intervalToDuration, + formatDuration +} from "date-fns"; -function getNextSaturdayMidnightTimestamp(): number { - const now = new Date(); - const currentDay = now.getDay(); - - let daysUntilNextSaturday = (6 - currentDay + 7) % 7; - - if (daysUntilNextSaturday === 0) { - daysUntilNextSaturday = 7; - } - - const nextSaturday = new Date(now); - nextSaturday.setDate(nextSaturday.getDate() + daysUntilNextSaturday); - nextSaturday.setHours(0, 0, 0, 0); - - return nextSaturday.getTime(); +function randomTimestampBetween(start: Date, end: Date) { + const startMs = start.getTime(); + const endMs = end.getTime(); + const randomMs = startMs + Math.random() * (endMs - startMs); + return Math.floor(randomMs); } +const getRandomTimeInNextWeek = (): number => { + const secondMonday = nextMonday(new Date()); + const thirdMonday = nextMonday(secondMonday); + return randomTimestampBetween(secondMonday, thirdMonday); +}; + export const archiveSnapshotsWorker = async (_job: Job) => { try { const startedAt = Date.now(); @@ -34,17 +40,37 @@ export const archiveSnapshotsWorker = async (_job: Job) => { const aids = await getVideosWithoutActiveSnapshotScheduleByType(sql, "archive"); for (const rawAid of aids) { const aid = Number(rawAid); - const latestSnapshot = await getLatestVideoSnapshot(sql, aid); const now = Date.now(); - const lastSnapshotedAt = latestSnapshot?.time ?? now; - const nextSatMidnight = getNextSaturdayMidnightTimestamp(); - const interval = nextSatMidnight - now; + const date = new Date(); + const formatted = formatDistanceStrict(date, nextSaturday(date).getTime(), { + unit: "hour" + }); logger.log( - `Scheduled archive snapshot for aid ${aid} in ${interval} hours.`, + `Scheduled archive snapshot for aid ${aid} in ${formatted}.`, + "mq", + "fn:archiveSnapshotsWorker" + ); + await scheduleSnapshot(sql, aid, "archive", nextSaturday(date).getTime()); + if (now - startedAt > 30 * MINUTE) { + return; + } + } + const aids2 = await getCommonArchiveAids(sql); + for (const rawAid of aids2) { + const aid = Number(rawAid); + const now = Date.now(); + const targetTime = getRandomTimeInNextWeek(); + const interval = intervalToDuration({ + start: new Date(), + end: new Date(targetTime) + }); + const formatted = formatDuration(interval, { format: ["days", "hours"] }); + + logger.log( + `Scheduled archive snapshot for aid ${aid} in ${formatted}.`, "mq", "fn:archiveSnapshotsWorker" ); - const targetTime = lastSnapshotedAt + interval; await scheduleSnapshot(sql, aid, "archive", targetTime); if (now - startedAt > 30 * MINUTE) { return; diff --git a/packages/crawler/package.json b/packages/crawler/package.json index cf0b6b0..f611778 100644 --- a/packages/crawler/package.json +++ b/packages/crawler/package.json @@ -19,9 +19,10 @@ "@bull-board/express": "^6.9.5", "@huggingface/transformers": "^3.5.1", "bullmq": "^5.52.1", + "date-fns": "^4.1.0", "express": "^5.1.0", "ioredis": "^5.6.1", - "postgres": "^3.4.5", - "onnxruntime-node": "1.19.2" + "onnxruntime-node": "1.19.2", + "postgres": "^3.4.5" } } diff --git a/packages/elysia/middlewares/timing.ts b/packages/elysia/middlewares/timing.ts new file mode 100644 index 0000000..b60818f --- /dev/null +++ b/packages/elysia/middlewares/timing.ts @@ -0,0 +1,278 @@ +import { + Elysia, + type MapResponse, + type Context, + type TraceEvent, + type TraceProcess +} from 'elysia' + +type MaybePromise = T | Promise + +class TimeLogger { + private startTimes: Map + private durations: Map + private totalStartTime: number | null + + constructor() { + this.startTimes = new Map() + this.durations = new Map() + this.totalStartTime = null + } + + startTime(name: string) { + this.startTimes.set(name, performance.now()) + } + + endTime(name: string) { + const startTime = this.startTimes.get(name) + if (startTime !== undefined) { + const duration = performance.now() - startTime + this.durations.set(name, duration) + this.startTimes.delete(name) + } + } + + getCompletedDurations() { + return Array.from(this.durations.entries()).map(([name, duration]) => ({ + name, + duration + })) + } + + startTotal(): void { + this.totalStartTime = performance.now() + } + + endTotal(): number | null { + if (this.totalStartTime === null) return null + return performance.now() - this.totalStartTime + } +} + +export interface ServerTimingOptions { + /** + * Should Elysia report data back to client via 'Server-Sent-Event' + */ + report?: boolean + /** + * Allow Server Timing to log specified life-cycle events + */ + trace?: { + /** + * Capture duration from request + * + * @default true + */ + request?: boolean + /** + * Capture duration from parse + * + * @default true + */ + parse?: boolean + /** + * Capture duration from transform + * + * @default true + */ + transform?: boolean + /** + * Capture duration from beforeHandle + * + * @default true + */ + beforeHandle?: boolean + /** + * Capture duration from handle + * + * @default true + */ + handle?: boolean + /** + * Capture duration from afterHandle + * + * @default true + */ + afterHandle?: boolean + /** + * Capture duration from mapResponse + * + * @default true + */ + error?: boolean + /** + * Capture duration from mapResponse + * + * @default true + */ + mapResponse?: boolean + /** + * Capture total duration from start to finish + * + * @default true + */ + total?: boolean + } + /** + * Determine whether or not Server Timing should be enabled + * + * @default NODE_ENV !== 'production' + */ + enabled?: boolean + /** + * A condition whether server timing should be log + * + * @default undefined + */ + allow?: + | MaybePromise + | ((context: Omit) => MaybePromise) + /** + * A custom mapResponse provided by user + * + * @default undefined + */ + mapResponse?: MapResponse +} + +const getLabel = ( + event: TraceEvent, + listener: ( + callback: (process: TraceProcess<'begin', true>) => unknown + ) => unknown, + write: (value: string) => void +) => { + listener(async ({ onStop, onEvent, total }) => { + let label = '' + + if (total === 0) return + + onEvent(({ name, index, onStop }) => { + onStop(({ elapsed }) => { + label += `${event}.${index}.${name || 'anon'};dur=${elapsed},` + }) + }) + + onStop(({ elapsed }) => { + label += `${event};dur=${elapsed},` + + write(label) + }) + }) +} + +export const serverTiming = ({ + allow, + enabled = process.env.NODE_ENV !== 'production', + trace: { + request: traceRequest = true, + parse: traceParse = true, + transform: traceTransform = true, + beforeHandle: traceBeforeHandle = true, + handle: traceHandle = true, + afterHandle: traceAfterHandle = true, + error: traceError = true, + mapResponse: traceMapResponse = true, + total: traceTotal = true + } = {}, + mapResponse +}: ServerTimingOptions = {}) => { + const app = new Elysia().decorate('timeLog', new TimeLogger()).trace( + { as: 'global' }, + async ({ + onRequest, + onParse, + onTransform, + onBeforeHandle, + onHandle, + onAfterHandle, + onMapResponse, + onError, + set, + context, + response, + context: { + request: { method } + } + }) => { + + if (!enabled) return + let label = '' + + const write = (nextValue: string) => { + label += nextValue + } + + let start: number + + onRequest(({ begin }) => { + context.timeLog.startTotal() + start = begin + }) + + if (traceRequest) getLabel('request', onRequest, write) + if (traceParse) getLabel('parse', onParse, write) + if (traceTransform) getLabel('transform', onTransform, write) + if (traceBeforeHandle) + getLabel('beforeHandle', onBeforeHandle, write) + if (traceAfterHandle) getLabel('afterHandle', onAfterHandle, write) + if (traceError) getLabel('error', onError, write) + if (traceMapResponse) getLabel('mapResponse', onMapResponse, write) + + if (traceHandle) + onHandle(({ name, onStop }) => { + onStop(({ elapsed }) => { + label += `handle.${name};dur=${elapsed},` + }) + }) + + onMapResponse(({ onStop }) => { + onStop(async ({ end }) => { + const completedDurations = + context.timeLog.getCompletedDurations() + if (completedDurations.length > 0) { + label += + completedDurations + .map( + ({ name, duration }) => + `${name};dur=${duration}` + ) + .join(', ') + ',' + } + const elapsed = context.timeLog.endTotal(); + + let allowed = allow + if (allowed instanceof Promise) allowed = await allowed + + if (traceTotal) label += `total;dur=${elapsed}` + else label = label.slice(0, -1) + + // ? Must wait until request is reported + switch (typeof allowed) { + case 'boolean': + if (allowed === false) + delete set.headers['Server-Timing'] + + set.headers['Server-Timing'] = label + + break + + case 'function': + if ((await allowed(context)) === false) + delete set.headers['Server-Timing'] + + set.headers['Server-Timing'] = label + + break + + default: + set.headers['Server-Timing'] = label + } + }) + }) + } + ) + return app +} + +export default serverTiming diff --git a/packages/elysia/routes/root/index.ts b/packages/elysia/routes/root/index.ts index 1b305da..31ed7b2 100644 --- a/packages/elysia/routes/root/index.ts +++ b/packages/elysia/routes/root/index.ts @@ -9,53 +9,54 @@ const SingerObj = t.Object({ message: t.Optional(t.String()) }); -export const rootHandler = new Elysia().get( - "/", - async () => { - let singer: Singer | Singer[]; - const shouldShowSpecialSinger = Math.random() < 0.016; - if (getSingerForBirthday().length !== 0) { - singer = JSON.parse(JSON.stringify(getSingerForBirthday())) as Singer[]; - for (const s of singer) { - delete s.birthday; - s.message = `祝${s.name}生日快乐~`; +export const rootHandler = new Elysia() + .get( + "/", + async () => { + let singer: Singer | Singer[]; + const shouldShowSpecialSinger = Math.random() < 0.016; + if (getSingerForBirthday().length !== 0) { + singer = JSON.parse(JSON.stringify(getSingerForBirthday())) as Singer[]; + for (const s of singer) { + delete s.birthday; + s.message = `祝${s.name}生日快乐~`; + } + } else if (shouldShowSpecialSinger) { + singer = pickSpecialSinger(); + } else { + singer = pickSinger(); } - } else if (shouldShowSpecialSinger) { - singer = pickSpecialSinger(); - } else { - singer = pickSinger(); - } - return { - project: { - name: "中 V 档案馆", - mascot: "知夏", - quote: "星河知海夏生光" - }, - status: 200, - version: VERSION, - time: Date.now(), - singer: singer - }; - }, - { - response: { - 200: t.Object({ - project: t.Object({ - name: t.String(), - mascot: t.String(), - quote: t.String() - }), - status: t.Number(), - version: t.String(), - time: t.Number(), - singer: t.Union([SingerObj, t.Array(SingerObj)]) - }) + return { + project: { + name: "中V档案馆", + mascot: "知夏", + quote: "星河知海夏生光" + }, + status: 200, + version: VERSION, + time: Date.now(), + singer: singer + }; }, - detail: { - summary: "Root route", - description: - "The root path. It returns a JSON object containing a random virtual singer, \ + { + response: { + 200: t.Object({ + project: t.Object({ + name: t.String(), + mascot: t.String(), + quote: t.String() + }), + status: t.Number(), + version: t.String(), + time: t.Number(), + singer: t.Union([SingerObj, t.Array(SingerObj)]) + }) + }, + detail: { + summary: "Root route", + description: + "The root path. It returns a JSON object containing a random virtual singer, \ backend version, current server time and other miscellaneous information." + } } - } -); + ) diff --git a/packages/elysia/routes/song/milestone.ts b/packages/elysia/routes/song/milestone.ts index 5390217..849cec9 100644 --- a/packages/elysia/routes/song/milestone.ts +++ b/packages/elysia/routes/song/milestone.ts @@ -2,7 +2,8 @@ import { Elysia, t } from "elysia"; import { dbMain } from "@core/drizzle"; import { bilibiliMetadata, latestVideoSnapshot } from "@core/drizzle/main/schema"; import { eq, and, gte, lt, desc } from "drizzle-orm"; -import { getShortTermETA } from "@core/db"; +import { getMilestoneETA } from "@core/db"; +import serverTiming from "@elysia/middlewares/timing"; type MileStoneType = "dendou" | "densetsu" | "shinwa"; @@ -12,10 +13,11 @@ const range = { shinwa: [5000000, 9999999, 10000000] }; -export const closeMileStoneHandler = new Elysia({ prefix: "/song" }).get( +export const closeMileStoneHandler = new Elysia({ prefix: "/song" }).use(serverTiming()).get( "/close-milestone/:type", - async (c) => { - const type = c.params.type; + async ({ params, timeLog }) => { + timeLog.startTime("retrieveCandidates") + const type = params.type; const min = range[type as MileStoneType][0]; const max = range[type as MileStoneType][1]; const data = await dbMain @@ -29,14 +31,19 @@ export const closeMileStoneHandler = new Elysia({ prefix: "/song" }).get( eta: number; }; const result: Result[] = []; + timeLog.endTime("retrieveCandidates") + + timeLog.startTime("calculateETA"); for (let i = 0; i < data.length; i++) { const aid = data[i].bilibili_metadata.aid; - const eta = await getShortTermETA(aid, range[type as MileStoneType][2]); + const eta = await getMilestoneETA(aid, range[type as MileStoneType][2]); result.push({ ...data[i], eta }); } + timeLog.endTime("calculateETA"); + result.sort((a, b) => a.eta - b.eta); return result; }, diff --git a/packages/elysia/src/index.ts b/packages/elysia/src/index.ts index cfab212..3b57152 100644 --- a/packages/elysia/src/index.ts +++ b/packages/elysia/src/index.ts @@ -7,17 +7,38 @@ import { getSongInfoHandler } from "@elysia/routes/song/info"; import { rootHandler } from "@elysia/routes/root"; import { getVideoMetadataHandler } from "@elysia/routes/video/metadata"; import { closeMileStoneHandler } from "@elysia/routes/song/milestone"; -import { serverTiming } from '@elysiajs/server-timing' +import serverTiming from "@elysia/middlewares/timing"; const [host, port] = getBindingInfo(); logStartup(host, port); +const encoder = new TextEncoder(); + const app = new Elysia({ serve: { hostname: host } }) -.use(serverTiming()) + .onAfterHandle({ as: "global" }, ({ responseValue, set, request }) => { + const contentType = request.headers.get("Content-Type") || ""; + const accept = request.headers.get("Accept") || ""; + const secFetchMode = request.headers.get("Sec-Fetch-Mode"); + const requestJson = contentType.includes("application/json"); + const isBrowser = !requestJson && (accept.includes("text/html") || secFetchMode === "navigate"); + const responseValueType = typeof responseValue; + const isObject = responseValueType === "object"; + const response = isObject + ? responseValue + : { + message: responseValue + }; + const text = isBrowser ? JSON.stringify(response, null, 2) : JSON.stringify(response); + return new Response(encoder.encode(text), { + headers: { + "Content-Type": "application/json; charset=utf-8" + } + }); + }) .use(cors()) .use(openapi()) .use(rootHandler)