add: the server timing & prettifier plugin, update the archiveSnapshots logic
This commit is contained in:
parent
f511a2bfdf
commit
87d9066fa1
@ -3,6 +3,6 @@
|
||||
"tabWidth": 4,
|
||||
"trailingComma": "none",
|
||||
"singleQuote": false,
|
||||
"printWidth": 120,
|
||||
"printWidth": 100,
|
||||
"endOfLine": "lf"
|
||||
}
|
||||
|
||||
1
bun.lock
1
bun.lock
@ -63,6 +63,7 @@
|
||||
"@bull-board/express": "^6.9.5",
|
||||
"@huggingface/transformers": "^3.5.1",
|
||||
"bullmq": "^5.52.1",
|
||||
"date-fns": "^4.1.0",
|
||||
"express": "^5.1.0",
|
||||
"ioredis": "^5.6.1",
|
||||
"onnxruntime-node": "1.19.2",
|
||||
|
||||
@ -1,10 +1,10 @@
|
||||
import { MINUTE, HOUR, getClosetMilestone } from "@core/lib";
|
||||
import { getLatestSnapshot, getClosestSnapshot } from "@core/db";
|
||||
|
||||
export const getShortTermETA = async (aid: number, targetViews?: number): Promise<number> => {
|
||||
export const getMilestoneETA = async (aid: number, targetViews?: number): Promise<number> => {
|
||||
const DELTA = 1e-5;
|
||||
let minETAHours = Infinity;
|
||||
const timeIntervals = [20 * MINUTE, HOUR, 3 * HOUR, 6 * HOUR, 24 * HOUR, 72 * HOUR, 168 * HOUR];
|
||||
const timeIntervals = [3 * HOUR, 24 * HOUR, 96 * HOUR];
|
||||
const currentTimestamp = new Date().getTime();
|
||||
const latestSnapshot = await getLatestSnapshot(aid);
|
||||
const latestSnapshotTime = new Date(latestSnapshot.time).getTime();
|
||||
|
||||
@ -341,3 +341,21 @@ export async function getVideosWithoutActiveSnapshotScheduleByType(sql: Psql, ty
|
||||
`;
|
||||
return rows.map((r) => Number(r.aid));
|
||||
}
|
||||
|
||||
export async function getCommonArchiveAids(sql: Psql) {
|
||||
const rows = await sql<{ aid: string }[]>`
|
||||
SELECT b.aid
|
||||
FROM bilibili_metadata b
|
||||
LEFT JOIN snapshot_schedule ss ON
|
||||
b.aid = ss.aid AND
|
||||
(ss.status = 'pending' OR ss.status = 'processing') AND
|
||||
ss.type = 'archive'
|
||||
WHERE ss.aid IS NULL
|
||||
AND NOT EXISTS (
|
||||
SELECT 1
|
||||
FROM songs s
|
||||
WHERE s.aid = b.aid
|
||||
)
|
||||
`;
|
||||
return rows.map((r) => Number(r.aid));
|
||||
}
|
||||
|
||||
@ -1,28 +1,34 @@
|
||||
import { Job } from "bullmq";
|
||||
import { getVideosWithoutActiveSnapshotScheduleByType, scheduleSnapshot } from "db/snapshotSchedule";
|
||||
import {
|
||||
getCommonArchiveAids,
|
||||
getVideosWithoutActiveSnapshotScheduleByType,
|
||||
scheduleSnapshot
|
||||
} from "db/snapshotSchedule";
|
||||
import logger from "@core/log";
|
||||
import { lockManager } from "@core/mq/lockManager";
|
||||
import { getLatestVideoSnapshot } from "db/snapshot";
|
||||
import { MINUTE } from "@core/lib";
|
||||
import { sql } from "@core/db/dbNew";
|
||||
import {
|
||||
nextMonday,
|
||||
nextSaturday,
|
||||
formatDistanceStrict,
|
||||
intervalToDuration,
|
||||
formatDuration
|
||||
} from "date-fns";
|
||||
|
||||
function getNextSaturdayMidnightTimestamp(): number {
|
||||
const now = new Date();
|
||||
const currentDay = now.getDay();
|
||||
|
||||
let daysUntilNextSaturday = (6 - currentDay + 7) % 7;
|
||||
|
||||
if (daysUntilNextSaturday === 0) {
|
||||
daysUntilNextSaturday = 7;
|
||||
}
|
||||
|
||||
const nextSaturday = new Date(now);
|
||||
nextSaturday.setDate(nextSaturday.getDate() + daysUntilNextSaturday);
|
||||
nextSaturday.setHours(0, 0, 0, 0);
|
||||
|
||||
return nextSaturday.getTime();
|
||||
function randomTimestampBetween(start: Date, end: Date) {
|
||||
const startMs = start.getTime();
|
||||
const endMs = end.getTime();
|
||||
const randomMs = startMs + Math.random() * (endMs - startMs);
|
||||
return Math.floor(randomMs);
|
||||
}
|
||||
|
||||
const getRandomTimeInNextWeek = (): number => {
|
||||
const secondMonday = nextMonday(new Date());
|
||||
const thirdMonday = nextMonday(secondMonday);
|
||||
return randomTimestampBetween(secondMonday, thirdMonday);
|
||||
};
|
||||
|
||||
export const archiveSnapshotsWorker = async (_job: Job) => {
|
||||
try {
|
||||
const startedAt = Date.now();
|
||||
@ -34,17 +40,37 @@ export const archiveSnapshotsWorker = async (_job: Job) => {
|
||||
const aids = await getVideosWithoutActiveSnapshotScheduleByType(sql, "archive");
|
||||
for (const rawAid of aids) {
|
||||
const aid = Number(rawAid);
|
||||
const latestSnapshot = await getLatestVideoSnapshot(sql, aid);
|
||||
const now = Date.now();
|
||||
const lastSnapshotedAt = latestSnapshot?.time ?? now;
|
||||
const nextSatMidnight = getNextSaturdayMidnightTimestamp();
|
||||
const interval = nextSatMidnight - now;
|
||||
const date = new Date();
|
||||
const formatted = formatDistanceStrict(date, nextSaturday(date).getTime(), {
|
||||
unit: "hour"
|
||||
});
|
||||
logger.log(
|
||||
`Scheduled archive snapshot for aid ${aid} in ${interval} hours.`,
|
||||
`Scheduled archive snapshot for aid ${aid} in ${formatted}.`,
|
||||
"mq",
|
||||
"fn:archiveSnapshotsWorker"
|
||||
);
|
||||
await scheduleSnapshot(sql, aid, "archive", nextSaturday(date).getTime());
|
||||
if (now - startedAt > 30 * MINUTE) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
const aids2 = await getCommonArchiveAids(sql);
|
||||
for (const rawAid of aids2) {
|
||||
const aid = Number(rawAid);
|
||||
const now = Date.now();
|
||||
const targetTime = getRandomTimeInNextWeek();
|
||||
const interval = intervalToDuration({
|
||||
start: new Date(),
|
||||
end: new Date(targetTime)
|
||||
});
|
||||
const formatted = formatDuration(interval, { format: ["days", "hours"] });
|
||||
|
||||
logger.log(
|
||||
`Scheduled archive snapshot for aid ${aid} in ${formatted}.`,
|
||||
"mq",
|
||||
"fn:archiveSnapshotsWorker"
|
||||
);
|
||||
const targetTime = lastSnapshotedAt + interval;
|
||||
await scheduleSnapshot(sql, aid, "archive", targetTime);
|
||||
if (now - startedAt > 30 * MINUTE) {
|
||||
return;
|
||||
|
||||
@ -19,9 +19,10 @@
|
||||
"@bull-board/express": "^6.9.5",
|
||||
"@huggingface/transformers": "^3.5.1",
|
||||
"bullmq": "^5.52.1",
|
||||
"date-fns": "^4.1.0",
|
||||
"express": "^5.1.0",
|
||||
"ioredis": "^5.6.1",
|
||||
"postgres": "^3.4.5",
|
||||
"onnxruntime-node": "1.19.2"
|
||||
"onnxruntime-node": "1.19.2",
|
||||
"postgres": "^3.4.5"
|
||||
}
|
||||
}
|
||||
|
||||
278
packages/elysia/middlewares/timing.ts
Normal file
278
packages/elysia/middlewares/timing.ts
Normal file
@ -0,0 +1,278 @@
|
||||
import {
|
||||
Elysia,
|
||||
type MapResponse,
|
||||
type Context,
|
||||
type TraceEvent,
|
||||
type TraceProcess
|
||||
} from 'elysia'
|
||||
|
||||
type MaybePromise<T> = T | Promise<T>
|
||||
|
||||
class TimeLogger {
|
||||
private startTimes: Map<string, number>
|
||||
private durations: Map<string, number>
|
||||
private totalStartTime: number | null
|
||||
|
||||
constructor() {
|
||||
this.startTimes = new Map()
|
||||
this.durations = new Map()
|
||||
this.totalStartTime = null
|
||||
}
|
||||
|
||||
startTime(name: string) {
|
||||
this.startTimes.set(name, performance.now())
|
||||
}
|
||||
|
||||
endTime(name: string) {
|
||||
const startTime = this.startTimes.get(name)
|
||||
if (startTime !== undefined) {
|
||||
const duration = performance.now() - startTime
|
||||
this.durations.set(name, duration)
|
||||
this.startTimes.delete(name)
|
||||
}
|
||||
}
|
||||
|
||||
getCompletedDurations() {
|
||||
return Array.from(this.durations.entries()).map(([name, duration]) => ({
|
||||
name,
|
||||
duration
|
||||
}))
|
||||
}
|
||||
|
||||
startTotal(): void {
|
||||
this.totalStartTime = performance.now()
|
||||
}
|
||||
|
||||
endTotal(): number | null {
|
||||
if (this.totalStartTime === null) return null
|
||||
return performance.now() - this.totalStartTime
|
||||
}
|
||||
}
|
||||
|
||||
export interface ServerTimingOptions {
|
||||
/**
|
||||
* Should Elysia report data back to client via 'Server-Sent-Event'
|
||||
*/
|
||||
report?: boolean
|
||||
/**
|
||||
* Allow Server Timing to log specified life-cycle events
|
||||
*/
|
||||
trace?: {
|
||||
/**
|
||||
* Capture duration from request
|
||||
*
|
||||
* @default true
|
||||
*/
|
||||
request?: boolean
|
||||
/**
|
||||
* Capture duration from parse
|
||||
*
|
||||
* @default true
|
||||
*/
|
||||
parse?: boolean
|
||||
/**
|
||||
* Capture duration from transform
|
||||
*
|
||||
* @default true
|
||||
*/
|
||||
transform?: boolean
|
||||
/**
|
||||
* Capture duration from beforeHandle
|
||||
*
|
||||
* @default true
|
||||
*/
|
||||
beforeHandle?: boolean
|
||||
/**
|
||||
* Capture duration from handle
|
||||
*
|
||||
* @default true
|
||||
*/
|
||||
handle?: boolean
|
||||
/**
|
||||
* Capture duration from afterHandle
|
||||
*
|
||||
* @default true
|
||||
*/
|
||||
afterHandle?: boolean
|
||||
/**
|
||||
* Capture duration from mapResponse
|
||||
*
|
||||
* @default true
|
||||
*/
|
||||
error?: boolean
|
||||
/**
|
||||
* Capture duration from mapResponse
|
||||
*
|
||||
* @default true
|
||||
*/
|
||||
mapResponse?: boolean
|
||||
/**
|
||||
* Capture total duration from start to finish
|
||||
*
|
||||
* @default true
|
||||
*/
|
||||
total?: boolean
|
||||
}
|
||||
/**
|
||||
* Determine whether or not Server Timing should be enabled
|
||||
*
|
||||
* @default NODE_ENV !== 'production'
|
||||
*/
|
||||
enabled?: boolean
|
||||
/**
|
||||
* A condition whether server timing should be log
|
||||
*
|
||||
* @default undefined
|
||||
*/
|
||||
allow?:
|
||||
| MaybePromise<boolean>
|
||||
| ((context: Omit<Context, 'path'>) => MaybePromise<boolean>)
|
||||
/**
|
||||
* A custom mapResponse provided by user
|
||||
*
|
||||
* @default undefined
|
||||
*/
|
||||
mapResponse?: MapResponse
|
||||
}
|
||||
|
||||
const getLabel = (
|
||||
event: TraceEvent,
|
||||
listener: (
|
||||
callback: (process: TraceProcess<'begin', true>) => unknown
|
||||
) => unknown,
|
||||
write: (value: string) => void
|
||||
) => {
|
||||
listener(async ({ onStop, onEvent, total }) => {
|
||||
let label = ''
|
||||
|
||||
if (total === 0) return
|
||||
|
||||
onEvent(({ name, index, onStop }) => {
|
||||
onStop(({ elapsed }) => {
|
||||
label += `${event}.${index}.${name || 'anon'};dur=${elapsed},`
|
||||
})
|
||||
})
|
||||
|
||||
onStop(({ elapsed }) => {
|
||||
label += `${event};dur=${elapsed},`
|
||||
|
||||
write(label)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
export const serverTiming = ({
|
||||
allow,
|
||||
enabled = process.env.NODE_ENV !== 'production',
|
||||
trace: {
|
||||
request: traceRequest = true,
|
||||
parse: traceParse = true,
|
||||
transform: traceTransform = true,
|
||||
beforeHandle: traceBeforeHandle = true,
|
||||
handle: traceHandle = true,
|
||||
afterHandle: traceAfterHandle = true,
|
||||
error: traceError = true,
|
||||
mapResponse: traceMapResponse = true,
|
||||
total: traceTotal = true
|
||||
} = {},
|
||||
mapResponse
|
||||
}: ServerTimingOptions = {}) => {
|
||||
const app = new Elysia().decorate('timeLog', new TimeLogger()).trace(
|
||||
{ as: 'global' },
|
||||
async ({
|
||||
onRequest,
|
||||
onParse,
|
||||
onTransform,
|
||||
onBeforeHandle,
|
||||
onHandle,
|
||||
onAfterHandle,
|
||||
onMapResponse,
|
||||
onError,
|
||||
set,
|
||||
context,
|
||||
response,
|
||||
context: {
|
||||
request: { method }
|
||||
}
|
||||
}) => {
|
||||
|
||||
if (!enabled) return
|
||||
let label = ''
|
||||
|
||||
const write = (nextValue: string) => {
|
||||
label += nextValue
|
||||
}
|
||||
|
||||
let start: number
|
||||
|
||||
onRequest(({ begin }) => {
|
||||
context.timeLog.startTotal()
|
||||
start = begin
|
||||
})
|
||||
|
||||
if (traceRequest) getLabel('request', onRequest, write)
|
||||
if (traceParse) getLabel('parse', onParse, write)
|
||||
if (traceTransform) getLabel('transform', onTransform, write)
|
||||
if (traceBeforeHandle)
|
||||
getLabel('beforeHandle', onBeforeHandle, write)
|
||||
if (traceAfterHandle) getLabel('afterHandle', onAfterHandle, write)
|
||||
if (traceError) getLabel('error', onError, write)
|
||||
if (traceMapResponse) getLabel('mapResponse', onMapResponse, write)
|
||||
|
||||
if (traceHandle)
|
||||
onHandle(({ name, onStop }) => {
|
||||
onStop(({ elapsed }) => {
|
||||
label += `handle.${name};dur=${elapsed},`
|
||||
})
|
||||
})
|
||||
|
||||
onMapResponse(({ onStop }) => {
|
||||
onStop(async ({ end }) => {
|
||||
const completedDurations =
|
||||
context.timeLog.getCompletedDurations()
|
||||
if (completedDurations.length > 0) {
|
||||
label +=
|
||||
completedDurations
|
||||
.map(
|
||||
({ name, duration }) =>
|
||||
`${name};dur=${duration}`
|
||||
)
|
||||
.join(', ') + ','
|
||||
}
|
||||
const elapsed = context.timeLog.endTotal();
|
||||
|
||||
let allowed = allow
|
||||
if (allowed instanceof Promise) allowed = await allowed
|
||||
|
||||
if (traceTotal) label += `total;dur=${elapsed}`
|
||||
else label = label.slice(0, -1)
|
||||
|
||||
// ? Must wait until request is reported
|
||||
switch (typeof allowed) {
|
||||
case 'boolean':
|
||||
if (allowed === false)
|
||||
delete set.headers['Server-Timing']
|
||||
|
||||
set.headers['Server-Timing'] = label
|
||||
|
||||
break
|
||||
|
||||
case 'function':
|
||||
if ((await allowed(context)) === false)
|
||||
delete set.headers['Server-Timing']
|
||||
|
||||
set.headers['Server-Timing'] = label
|
||||
|
||||
break
|
||||
|
||||
default:
|
||||
set.headers['Server-Timing'] = label
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
)
|
||||
return app
|
||||
}
|
||||
|
||||
export default serverTiming
|
||||
@ -9,53 +9,54 @@ const SingerObj = t.Object({
|
||||
message: t.Optional(t.String())
|
||||
});
|
||||
|
||||
export const rootHandler = new Elysia().get(
|
||||
"/",
|
||||
async () => {
|
||||
let singer: Singer | Singer[];
|
||||
const shouldShowSpecialSinger = Math.random() < 0.016;
|
||||
if (getSingerForBirthday().length !== 0) {
|
||||
singer = JSON.parse(JSON.stringify(getSingerForBirthday())) as Singer[];
|
||||
for (const s of singer) {
|
||||
delete s.birthday;
|
||||
s.message = `祝${s.name}生日快乐~`;
|
||||
export const rootHandler = new Elysia()
|
||||
.get(
|
||||
"/",
|
||||
async () => {
|
||||
let singer: Singer | Singer[];
|
||||
const shouldShowSpecialSinger = Math.random() < 0.016;
|
||||
if (getSingerForBirthday().length !== 0) {
|
||||
singer = JSON.parse(JSON.stringify(getSingerForBirthday())) as Singer[];
|
||||
for (const s of singer) {
|
||||
delete s.birthday;
|
||||
s.message = `祝${s.name}生日快乐~`;
|
||||
}
|
||||
} else if (shouldShowSpecialSinger) {
|
||||
singer = pickSpecialSinger();
|
||||
} else {
|
||||
singer = pickSinger();
|
||||
}
|
||||
} else if (shouldShowSpecialSinger) {
|
||||
singer = pickSpecialSinger();
|
||||
} else {
|
||||
singer = pickSinger();
|
||||
}
|
||||
return {
|
||||
project: {
|
||||
name: "中 V 档案馆",
|
||||
mascot: "知夏",
|
||||
quote: "星河知海夏生光"
|
||||
},
|
||||
status: 200,
|
||||
version: VERSION,
|
||||
time: Date.now(),
|
||||
singer: singer
|
||||
};
|
||||
},
|
||||
{
|
||||
response: {
|
||||
200: t.Object({
|
||||
project: t.Object({
|
||||
name: t.String(),
|
||||
mascot: t.String(),
|
||||
quote: t.String()
|
||||
}),
|
||||
status: t.Number(),
|
||||
version: t.String(),
|
||||
time: t.Number(),
|
||||
singer: t.Union([SingerObj, t.Array(SingerObj)])
|
||||
})
|
||||
return {
|
||||
project: {
|
||||
name: "中V档案馆",
|
||||
mascot: "知夏",
|
||||
quote: "星河知海夏生光"
|
||||
},
|
||||
status: 200,
|
||||
version: VERSION,
|
||||
time: Date.now(),
|
||||
singer: singer
|
||||
};
|
||||
},
|
||||
detail: {
|
||||
summary: "Root route",
|
||||
description:
|
||||
"The root path. It returns a JSON object containing a random virtual singer, \
|
||||
{
|
||||
response: {
|
||||
200: t.Object({
|
||||
project: t.Object({
|
||||
name: t.String(),
|
||||
mascot: t.String(),
|
||||
quote: t.String()
|
||||
}),
|
||||
status: t.Number(),
|
||||
version: t.String(),
|
||||
time: t.Number(),
|
||||
singer: t.Union([SingerObj, t.Array(SingerObj)])
|
||||
})
|
||||
},
|
||||
detail: {
|
||||
summary: "Root route",
|
||||
description:
|
||||
"The root path. It returns a JSON object containing a random virtual singer, \
|
||||
backend version, current server time and other miscellaneous information."
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
)
|
||||
|
||||
@ -2,7 +2,8 @@ import { Elysia, t } from "elysia";
|
||||
import { dbMain } from "@core/drizzle";
|
||||
import { bilibiliMetadata, latestVideoSnapshot } from "@core/drizzle/main/schema";
|
||||
import { eq, and, gte, lt, desc } from "drizzle-orm";
|
||||
import { getShortTermETA } from "@core/db";
|
||||
import { getMilestoneETA } from "@core/db";
|
||||
import serverTiming from "@elysia/middlewares/timing";
|
||||
|
||||
type MileStoneType = "dendou" | "densetsu" | "shinwa";
|
||||
|
||||
@ -12,10 +13,11 @@ const range = {
|
||||
shinwa: [5000000, 9999999, 10000000]
|
||||
};
|
||||
|
||||
export const closeMileStoneHandler = new Elysia({ prefix: "/song" }).get(
|
||||
export const closeMileStoneHandler = new Elysia({ prefix: "/song" }).use(serverTiming()).get(
|
||||
"/close-milestone/:type",
|
||||
async (c) => {
|
||||
const type = c.params.type;
|
||||
async ({ params, timeLog }) => {
|
||||
timeLog.startTime("retrieveCandidates")
|
||||
const type = params.type;
|
||||
const min = range[type as MileStoneType][0];
|
||||
const max = range[type as MileStoneType][1];
|
||||
const data = await dbMain
|
||||
@ -29,14 +31,19 @@ export const closeMileStoneHandler = new Elysia({ prefix: "/song" }).get(
|
||||
eta: number;
|
||||
};
|
||||
const result: Result[] = [];
|
||||
timeLog.endTime("retrieveCandidates")
|
||||
|
||||
timeLog.startTime("calculateETA");
|
||||
for (let i = 0; i < data.length; i++) {
|
||||
const aid = data[i].bilibili_metadata.aid;
|
||||
const eta = await getShortTermETA(aid, range[type as MileStoneType][2]);
|
||||
const eta = await getMilestoneETA(aid, range[type as MileStoneType][2]);
|
||||
result.push({
|
||||
...data[i],
|
||||
eta
|
||||
});
|
||||
}
|
||||
timeLog.endTime("calculateETA");
|
||||
|
||||
result.sort((a, b) => a.eta - b.eta);
|
||||
return result;
|
||||
},
|
||||
|
||||
@ -7,17 +7,38 @@ import { getSongInfoHandler } from "@elysia/routes/song/info";
|
||||
import { rootHandler } from "@elysia/routes/root";
|
||||
import { getVideoMetadataHandler } from "@elysia/routes/video/metadata";
|
||||
import { closeMileStoneHandler } from "@elysia/routes/song/milestone";
|
||||
import { serverTiming } from '@elysiajs/server-timing'
|
||||
import serverTiming from "@elysia/middlewares/timing";
|
||||
|
||||
const [host, port] = getBindingInfo();
|
||||
logStartup(host, port);
|
||||
|
||||
const encoder = new TextEncoder();
|
||||
|
||||
const app = new Elysia({
|
||||
serve: {
|
||||
hostname: host
|
||||
}
|
||||
})
|
||||
.use(serverTiming())
|
||||
.onAfterHandle({ as: "global" }, ({ responseValue, set, request }) => {
|
||||
const contentType = request.headers.get("Content-Type") || "";
|
||||
const accept = request.headers.get("Accept") || "";
|
||||
const secFetchMode = request.headers.get("Sec-Fetch-Mode");
|
||||
const requestJson = contentType.includes("application/json");
|
||||
const isBrowser = !requestJson && (accept.includes("text/html") || secFetchMode === "navigate");
|
||||
const responseValueType = typeof responseValue;
|
||||
const isObject = responseValueType === "object";
|
||||
const response = isObject
|
||||
? responseValue
|
||||
: {
|
||||
message: responseValue
|
||||
};
|
||||
const text = isBrowser ? JSON.stringify(response, null, 2) : JSON.stringify(response);
|
||||
return new Response(encoder.encode(text), {
|
||||
headers: {
|
||||
"Content-Type": "application/json; charset=utf-8"
|
||||
}
|
||||
});
|
||||
})
|
||||
.use(cors())
|
||||
.use(openapi())
|
||||
.use(rootHandler)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user