From 30f8a2ffe81407da1c1395c5333e7fdee9ad7a47 Mon Sep 17 00:00:00 2001 From: alikia2x Date: Sun, 16 Nov 2025 20:14:13 +0800 Subject: [PATCH] ref: switch to drizzle, add history --- packages/backend/lib/auth.ts | 34 +- packages/backend/lib/bilibiliID.ts | 9 +- packages/backend/lib/mq.ts | 2 +- packages/backend/middlewares/timing.ts | 187 +++++---- packages/backend/routes/auth/login.ts | 107 +++--- packages/backend/routes/root/index.ts | 97 +++-- packages/backend/routes/search/index.ts | 10 +- packages/backend/routes/song/add.ts | 10 +- packages/backend/routes/song/delete.ts | 11 +- packages/backend/routes/song/info.ts | 47 +-- packages/backend/routes/song/milestone.ts | 13 +- packages/backend/routes/video/eta.ts | 3 +- packages/backend/routes/video/metadata.ts | 5 +- packages/backend/routes/video/snapshots.ts | 5 +- packages/backend/src/index.ts | 11 +- packages/backend/src/mq.ts | 21 ++ packages/backend/src/onAfterHandle.ts | 20 +- packages/core/db/dbNew.ts | 8 +- .../core/db/snapshots/getClosetSnapshot.ts | 4 +- .../core/db/snapshots/getLatestSnapshot.ts | 9 +- packages/core/db/snapshots/milestone.ts | 5 +- .../drizzle/cred/0000_moaning_shotgun.sql | 44 --- .../core/drizzle/cred/meta/0000_snapshot.json | 350 ----------------- packages/core/drizzle/cred/meta/_journal.json | 13 - packages/core/drizzle/cred/relations.ts | 2 - packages/core/drizzle/cred/schema.ts | 93 ----- packages/core/drizzle/index.ts | 8 +- packages/core/drizzle/main/relations.ts | 43 ++- packages/core/drizzle/main/schema.ts | 354 +++++++++++------- .../core/drizzle/{outerSchema.ts => type.ts} | 0 packages/core/lib/index.ts | 1 + packages/core/lib/type.ts | 1 + packages/crawler/db/bilibili_metadata.ts | 116 +++--- packages/crawler/db/snapshot.ts | 77 ++-- packages/crawler/ml/akari_api.ts | 102 ++--- packages/crawler/ml/api_manager.ts | 226 +++++------ packages/crawler/ml/manager.ts | 6 +- packages/crawler/mq/exec/classifyVideo.ts | 21 +- .../crawler/mq/exec/collectQueueMetrics.ts | 26 +- packages/crawler/mq/exec/directSnapshot.ts | 10 +- .../mq/exec/dispatchRegularSnapshots.ts | 11 +- packages/crawler/mq/exec/getVideoInfo.ts | 105 +++++- packages/crawler/mq/index.ts | 10 +- packages/crawler/mq/schema.ts | 6 + packages/crawler/mq/task/collectSongs.ts | 70 ++-- packages/crawler/mq/task/getVideoDetails.ts | 72 ---- packages/crawler/mq/task/queueLatestVideo.ts | 2 +- packages/crawler/net/getLatestVideoAids.ts | 5 +- packages/crawler/net/getVideoDetails.ts | 10 +- packages/crawler/src/filterWorker.ts | 2 +- packages/crawler/src/worker.ts | 4 +- 51 files changed, 1051 insertions(+), 1357 deletions(-) create mode 100644 packages/backend/src/mq.ts delete mode 100644 packages/core/drizzle/cred/0000_moaning_shotgun.sql delete mode 100644 packages/core/drizzle/cred/meta/0000_snapshot.json delete mode 100644 packages/core/drizzle/cred/meta/_journal.json delete mode 100644 packages/core/drizzle/cred/relations.ts delete mode 100644 packages/core/drizzle/cred/schema.ts rename packages/core/drizzle/{outerSchema.ts => type.ts} (100%) create mode 100644 packages/core/lib/type.ts delete mode 100644 packages/crawler/mq/task/getVideoDetails.ts diff --git a/packages/backend/lib/auth.ts b/packages/backend/lib/auth.ts index d4d614d..17ea336 100644 --- a/packages/backend/lib/auth.ts +++ b/packages/backend/lib/auth.ts @@ -1,7 +1,6 @@ import Argon2id from "@rabbit-company/argon2id"; -import { dbMain } from "@core/drizzle"; -import { usersInCredentials, loginSessionsInCredentials } from "@core/drizzle/main/schema"; -import { eq, and, isNull } from "drizzle-orm"; +import { db, usersInCredentials, loginSessionsInCredentials } from "@core/drizzle"; +import { eq, and, isNull, getTableColumns } from "drizzle-orm"; import { generate as generateId } from "@alikia/random-key"; import logger from "@core/log"; @@ -10,10 +9,11 @@ export interface User { username: string; nickname: string | null; role: string; + unqId: string; } export async function verifyUser(username: string, password: string): Promise { - const user = await dbMain + const user = await db .select() .from(usersInCredentials) .where(eq(usersInCredentials.username, username)) @@ -33,7 +33,8 @@ export async function verifyUser(username: string, password: string): Promise { - const session = await dbMain - .select() + const session = await db + .select({ + ...getTableColumns(usersInCredentials), + ...getTableColumns(loginSessionsInCredentials) + }) .from(loginSessionsInCredentials) + .innerJoin(usersInCredentials, eq(loginSessionsInCredentials.uid, usersInCredentials.id)) .where( and( eq(loginSessionsInCredentials.id, sessionId), @@ -88,7 +93,7 @@ export async function validateSession( return null; } - const user = await dbMain + const user = await db .select() .from(usersInCredentials) .where(eq(usersInCredentials.id, foundSession.uid)) @@ -98,24 +103,19 @@ export async function validateSession( return null; } - await dbMain + await db .update(loginSessionsInCredentials) .set({ lastUsedAt: new Date().toISOString() }) .where(eq(loginSessionsInCredentials.id, sessionId)); return { - user: { - id: user[0].id, - username: user[0].username, - nickname: user[0].nickname, - role: user[0].role - }, + user: user[0], session: foundSession }; } export async function deactivateSession(sessionId: string): Promise { - const result = await dbMain + const result = await db .update(loginSessionsInCredentials) .set({ deactivatedAt: new Date().toISOString() diff --git a/packages/backend/lib/bilibiliID.ts b/packages/backend/lib/bilibiliID.ts index be2e980..d50e079 100644 --- a/packages/backend/lib/bilibiliID.ts +++ b/packages/backend/lib/bilibiliID.ts @@ -40,13 +40,12 @@ export function detectBiliID(id: string) { return { type: "bv" as const, id: id as `BV1${string}` - } - } - else if (avSchema.safeParse(id).success) { + }; + } else if (avSchema.safeParse(id).success) { return { type: "av" as const, id: id as `av${string}` - } + }; } return null; } @@ -61,4 +60,4 @@ export function biliIDToAID(id: string) { } else { return Number.parseInt(detected.id.slice(2)); } -} \ No newline at end of file +} diff --git a/packages/backend/lib/mq.ts b/packages/backend/lib/mq.ts index 4c8256d..08c29f0 100644 --- a/packages/backend/lib/mq.ts +++ b/packages/backend/lib/mq.ts @@ -7,4 +7,4 @@ export const LatestVideosQueue = new Queue("latestVideos", { export const SnapshotQueue = new Queue("snapshot", { connection: redis as ConnectionOptions -}); \ No newline at end of file +}); diff --git a/packages/backend/middlewares/timing.ts b/packages/backend/middlewares/timing.ts index b60818f..0d7e8e8 100644 --- a/packages/backend/middlewares/timing.ts +++ b/packages/backend/middlewares/timing.ts @@ -1,34 +1,28 @@ -import { - Elysia, - type MapResponse, - type Context, - type TraceEvent, - type TraceProcess -} from 'elysia' +import { Elysia, type MapResponse, type Context, type TraceEvent, type TraceProcess } from "elysia"; -type MaybePromise = T | Promise +type MaybePromise = T | Promise; class TimeLogger { - private startTimes: Map - private durations: Map - private totalStartTime: number | null + private startTimes: Map; + private durations: Map; + private totalStartTime: number | null; constructor() { - this.startTimes = new Map() - this.durations = new Map() - this.totalStartTime = null + this.startTimes = new Map(); + this.durations = new Map(); + this.totalStartTime = null; } startTime(name: string) { - this.startTimes.set(name, performance.now()) + this.startTimes.set(name, performance.now()); } endTime(name: string) { - const startTime = this.startTimes.get(name) + const startTime = this.startTimes.get(name); if (startTime !== undefined) { - const duration = performance.now() - startTime - this.durations.set(name, duration) - this.startTimes.delete(name) + const duration = performance.now() - startTime; + this.durations.set(name, duration); + this.startTimes.delete(name); } } @@ -36,16 +30,16 @@ class TimeLogger { return Array.from(this.durations.entries()).map(([name, duration]) => ({ name, duration - })) + })); } startTotal(): void { - this.totalStartTime = performance.now() + this.totalStartTime = performance.now(); } endTotal(): number | null { - if (this.totalStartTime === null) return null - return performance.now() - this.totalStartTime + if (this.totalStartTime === null) return null; + return performance.now() - this.totalStartTime; } } @@ -53,7 +47,7 @@ export interface ServerTimingOptions { /** * Should Elysia report data back to client via 'Server-Sent-Event' */ - report?: boolean + report?: boolean; /** * Allow Server Timing to log specified life-cycle events */ @@ -63,107 +57,103 @@ export interface ServerTimingOptions { * * @default true */ - request?: boolean + request?: boolean; /** * Capture duration from parse * * @default true */ - parse?: boolean + parse?: boolean; /** * Capture duration from transform * * @default true */ - transform?: boolean + transform?: boolean; /** * Capture duration from beforeHandle * * @default true */ - beforeHandle?: boolean + beforeHandle?: boolean; /** * Capture duration from handle * * @default true */ - handle?: boolean + handle?: boolean; /** * Capture duration from afterHandle * * @default true */ - afterHandle?: boolean + afterHandle?: boolean; /** * Capture duration from mapResponse * * @default true */ - error?: boolean + error?: boolean; /** * Capture duration from mapResponse * * @default true */ - mapResponse?: boolean + mapResponse?: boolean; /** * Capture total duration from start to finish * * @default true */ - total?: boolean - } + total?: boolean; + }; /** * Determine whether or not Server Timing should be enabled * * @default NODE_ENV !== 'production' */ - enabled?: boolean + enabled?: boolean; /** * A condition whether server timing should be log * * @default undefined */ - allow?: - | MaybePromise - | ((context: Omit) => MaybePromise) + allow?: MaybePromise | ((context: Omit) => MaybePromise); /** * A custom mapResponse provided by user * * @default undefined */ - mapResponse?: MapResponse + mapResponse?: MapResponse; } const getLabel = ( event: TraceEvent, - listener: ( - callback: (process: TraceProcess<'begin', true>) => unknown - ) => unknown, + listener: (callback: (process: TraceProcess<"begin", true>) => unknown) => unknown, write: (value: string) => void ) => { listener(async ({ onStop, onEvent, total }) => { - let label = '' + let label = ""; - if (total === 0) return + if (total === 0) return; onEvent(({ name, index, onStop }) => { onStop(({ elapsed }) => { - label += `${event}.${index}.${name || 'anon'};dur=${elapsed},` - }) - }) + label += `${event}.${index}.${name || "anon"};dur=${elapsed},`; + }); + }); onStop(({ elapsed }) => { - label += `${event};dur=${elapsed},` + label += `${event};dur=${elapsed},`; - write(label) - }) - }) -} + write(label); + }); + }); +}; export const serverTiming = ({ allow, - enabled = process.env.NODE_ENV !== 'production', + enabled = process.env.NODE_ENV !== "production", trace: { request: traceRequest = true, parse: traceParse = true, @@ -177,8 +167,8 @@ export const serverTiming = ({ } = {}, mapResponse }: ServerTimingOptions = {}) => { - const app = new Elysia().decorate('timeLog', new TimeLogger()).trace( - { as: 'global' }, + const app = new Elysia().decorate("timeLog", new TimeLogger()).trace( + { as: "global" }, async ({ onRequest, onParse, @@ -195,84 +185,77 @@ export const serverTiming = ({ request: { method } } }) => { - - if (!enabled) return - let label = '' + if (!enabled) return; + let label = ""; const write = (nextValue: string) => { - label += nextValue - } + label += nextValue; + }; - let start: number + let start: number; onRequest(({ begin }) => { - context.timeLog.startTotal() - start = begin - }) + context.timeLog.startTotal(); + start = begin; + }); - if (traceRequest) getLabel('request', onRequest, write) - if (traceParse) getLabel('parse', onParse, write) - if (traceTransform) getLabel('transform', onTransform, write) - if (traceBeforeHandle) - getLabel('beforeHandle', onBeforeHandle, write) - if (traceAfterHandle) getLabel('afterHandle', onAfterHandle, write) - if (traceError) getLabel('error', onError, write) - if (traceMapResponse) getLabel('mapResponse', onMapResponse, write) + if (traceRequest) getLabel("request", onRequest, write); + if (traceParse) getLabel("parse", onParse, write); + if (traceTransform) getLabel("transform", onTransform, write); + if (traceBeforeHandle) getLabel("beforeHandle", onBeforeHandle, write); + if (traceAfterHandle) getLabel("afterHandle", onAfterHandle, write); + if (traceError) getLabel("error", onError, write); + if (traceMapResponse) getLabel("mapResponse", onMapResponse, write); if (traceHandle) onHandle(({ name, onStop }) => { onStop(({ elapsed }) => { - label += `handle.${name};dur=${elapsed},` - }) - }) + label += `handle.${name};dur=${elapsed},`; + }); + }); onMapResponse(({ onStop }) => { onStop(async ({ end }) => { - const completedDurations = - context.timeLog.getCompletedDurations() + const completedDurations = context.timeLog.getCompletedDurations(); if (completedDurations.length > 0) { label += completedDurations - .map( - ({ name, duration }) => - `${name};dur=${duration}` - ) - .join(', ') + ',' + .map(({ name, duration }) => `${name};dur=${duration}`) + .join(", ") + ","; } const elapsed = context.timeLog.endTotal(); - let allowed = allow - if (allowed instanceof Promise) allowed = await allowed + let allowed = allow; + if (allowed instanceof Promise) allowed = await allowed; - if (traceTotal) label += `total;dur=${elapsed}` - else label = label.slice(0, -1) + if (traceTotal) label += `total;dur=${elapsed}`; + else label = label.slice(0, -1); // ? Must wait until request is reported switch (typeof allowed) { - case 'boolean': - if (allowed === false) - delete set.headers['Server-Timing'] + case "boolean": + if (allowed === false) delete set.headers["Server-Timing"]; - set.headers['Server-Timing'] = label + set.headers["Server-Timing"] = label; - break + break; - case 'function': + case "function": if ((await allowed(context)) === false) - delete set.headers['Server-Timing'] + delete set.headers["Server-Timing"]; - set.headers['Server-Timing'] = label + set.headers["Server-Timing"] = label; - break + break; default: - set.headers['Server-Timing'] = label + set.headers["Server-Timing"] = label; } - }) - }) + }); + }); } - ) - return app -} + ); + return app; +}; -export default serverTiming +export default serverTiming; diff --git a/packages/backend/routes/auth/login.ts b/packages/backend/routes/auth/login.ts index dedf2c4..60d1d3c 100644 --- a/packages/backend/routes/auth/login.ts +++ b/packages/backend/routes/auth/login.ts @@ -2,60 +2,57 @@ import { Elysia, t } from "elysia"; import { ip } from "elysia-ip"; import { verifyUser, createSession, getSessionExpirationDate } from "@elysia/lib/auth"; -export const loginHandler = new Elysia({ prefix: "/auth" }) - .use(ip()) - .post( - "/session", - async ({ body, set, cookie, ip, request }) => { - const { username, password } = body; +export const loginHandler = new Elysia({ prefix: "/auth" }).use(ip()).post( + "/session", + async ({ body, set, cookie, ip, request }) => { + const { username, password } = body; - const user = await verifyUser(username, password); - if (!user) { - set.status = 401; - return { message: "Invalid credentials." }; - } - - const userAgent = request.headers.get("user-agent") || "Unknown"; - const sessionId = await createSession(user.id, ip || null, userAgent); - - const expiresAt = getSessionExpirationDate(); - cookie.sessionId.value = sessionId; - cookie.sessionId.httpOnly = true; - cookie.sessionId.secure = process.env.NODE_ENV === 'production'; - cookie.sessionId.sameSite = 'strict'; - cookie.sessionId.expires = expiresAt; - - return { - message: "You are logged in.", - user: { - id: user.id, - username: user.username, - nickname: user.nickname, - role: user.role - }, - sessionID: sessionId - }; - }, - { - response: { - 200: t.Object({ - message: t.String(), - user: t.Object({ - id: t.Integer(), - username: t.String(), - nickname: t.Optional(t.String()), - role: t.String() - }), - sessionID: t.String() - }), - 401: t.Object({ - message: t.String() - }) - }, - body: t.Object({ - username: t.String(), - password: t.String() - }) + const user = await verifyUser(username, password); + if (!user) { + set.status = 401; + return { message: "Invalid credentials." }; } - ) - \ No newline at end of file + + const userAgent = request.headers.get("user-agent") || "Unknown"; + const sessionId = await createSession(user.id, ip || null, userAgent); + + const expiresAt = getSessionExpirationDate(); + cookie.sessionId.value = sessionId; + cookie.sessionId.httpOnly = true; + cookie.sessionId.secure = process.env.NODE_ENV === "production"; + cookie.sessionId.sameSite = "strict"; + cookie.sessionId.expires = expiresAt; + + return { + message: "You are logged in.", + user: { + id: user.id, + username: user.username, + nickname: user.nickname, + role: user.role + }, + sessionID: sessionId + }; + }, + { + response: { + 200: t.Object({ + message: t.String(), + user: t.Object({ + id: t.Integer(), + username: t.String(), + nickname: t.Optional(t.String()), + role: t.String() + }), + sessionID: t.String() + }), + 401: t.Object({ + message: t.String() + }) + }, + body: t.Object({ + username: t.String(), + password: t.String() + }) + } +); diff --git a/packages/backend/routes/root/index.ts b/packages/backend/routes/root/index.ts index 31ed7b2..64965cb 100644 --- a/packages/backend/routes/root/index.ts +++ b/packages/backend/routes/root/index.ts @@ -9,54 +9,53 @@ const SingerObj = t.Object({ message: t.Optional(t.String()) }); -export const rootHandler = new Elysia() - .get( - "/", - async () => { - let singer: Singer | Singer[]; - const shouldShowSpecialSinger = Math.random() < 0.016; - if (getSingerForBirthday().length !== 0) { - singer = JSON.parse(JSON.stringify(getSingerForBirthday())) as Singer[]; - for (const s of singer) { - delete s.birthday; - s.message = `祝${s.name}生日快乐~`; - } - } else if (shouldShowSpecialSinger) { - singer = pickSpecialSinger(); - } else { - singer = pickSinger(); - } - return { - project: { - name: "中V档案馆", - mascot: "知夏", - quote: "星河知海夏生光" - }, - status: 200, - version: VERSION, - time: Date.now(), - singer: singer - }; - }, - { - response: { - 200: t.Object({ - project: t.Object({ - name: t.String(), - mascot: t.String(), - quote: t.String() - }), - status: t.Number(), - version: t.String(), - time: t.Number(), - singer: t.Union([SingerObj, t.Array(SingerObj)]) - }) - }, - detail: { - summary: "Root route", - description: - "The root path. It returns a JSON object containing a random virtual singer, \ - backend version, current server time and other miscellaneous information." +export const rootHandler = new Elysia().get( + "/", + async () => { + let singer: Singer | Singer[]; + const shouldShowSpecialSinger = Math.random() < 0.016; + if (getSingerForBirthday().length !== 0) { + singer = JSON.parse(JSON.stringify(getSingerForBirthday())) as Singer[]; + for (const s of singer) { + delete s.birthday; + s.message = `祝${s.name}生日快乐~`; } + } else if (shouldShowSpecialSinger) { + singer = pickSpecialSinger(); + } else { + singer = pickSinger(); } - ) + return { + project: { + name: "中V档案馆", + mascot: "知夏", + quote: "星河知海夏生光" + }, + status: 200, + version: VERSION, + time: Date.now(), + singer: singer + }; + }, + { + response: { + 200: t.Object({ + project: t.Object({ + name: t.String(), + mascot: t.String(), + quote: t.String() + }), + status: t.Number(), + version: t.String(), + time: t.Number(), + singer: t.Union([SingerObj, t.Array(SingerObj)]) + }) + }, + detail: { + summary: "Root route", + description: + "The root path. It returns a JSON object containing a random virtual singer, \ + backend version, current server time and other miscellaneous information." + } + } +); diff --git a/packages/backend/routes/search/index.ts b/packages/backend/routes/search/index.ts index aeae4ba..9ddc817 100644 --- a/packages/backend/routes/search/index.ts +++ b/packages/backend/routes/search/index.ts @@ -1,7 +1,6 @@ import { Elysia } from "elysia"; -import { db } from "@core/drizzle"; -import { bilibiliMetadata, latestVideoSnapshot, songs } from "@core/drizzle/main/schema"; -import { eq, like, or } from "drizzle-orm"; +import { db, bilibiliMetadata, latestVideoSnapshot, songs } from "@core/drizzle"; +import { eq, like } from "drizzle-orm"; import { BiliAPIVideoMetadataSchema, BiliVideoSchema, SongSchema } from "@elysia/lib/schema"; import { z } from "zod"; import { getVideoInfo } from "@core/net/getVideoInfo"; @@ -82,9 +81,8 @@ const getVideoSearchResult = async (searchQuery: string) => { let data; const cachedData = await retrieveVideoInfoFromCache(aid); if (cachedData) { - data = cachedData - } - else { + data = cachedData; + } else { data = await getVideoInfo(aid, "getVideoInfo"); if (typeof data === "number") return []; const cacheKey = `cvsa:videoInfo:av${aid}`; diff --git a/packages/backend/routes/song/add.ts b/packages/backend/routes/song/add.ts index fd16239..0512263 100644 --- a/packages/backend/routes/song/add.ts +++ b/packages/backend/routes/song/add.ts @@ -2,15 +2,14 @@ import { Elysia, t } from "elysia"; import { biliIDToAID } from "@elysia/lib/bilibiliID"; import { requireAuth } from "@elysia/middlewares/auth"; import { LatestVideosQueue } from "@elysia/lib/mq"; -import { db } from "@core/drizzle"; -import { songs } from "@core/drizzle/main/schema"; +import { db, songs } from "@core/drizzle"; import { eq, and } from "drizzle-orm"; export const addSongHandler = new Elysia() .use(requireAuth) .post( "/song/import/bilibili", - async ({ body, status }) => { + async ({ body, status, user }) => { const id = body.id; const aid = biliIDToAID(id); if (!aid) { @@ -32,7 +31,8 @@ export const addSongHandler = new Elysia() } const job = await LatestVideosQueue.add("getVideoInfo", { aid: aid, - insertSongs: true + insertSongs: true, + uid: user!.unqId }); if (!job.id) { return status(500, { @@ -76,7 +76,7 @@ export const addSongHandler = new Elysia() result: { message: "Video already exists in the songs table." } - } + }; } const job = await LatestVideosQueue.getJob(jobID); if (!job) { diff --git a/packages/backend/routes/song/delete.ts b/packages/backend/routes/song/delete.ts index dd3499a..05cc636 100644 --- a/packages/backend/routes/song/delete.ts +++ b/packages/backend/routes/song/delete.ts @@ -1,14 +1,19 @@ import { Elysia, t } from "elysia"; import { requireAuth } from "@elysia/middlewares/auth"; -import { db } from "@core/drizzle"; -import { songs } from "@core/drizzle/main/schema"; +import { songs, history, db } from "@core/drizzle"; import { eq } from "drizzle-orm"; export const deleteSongHandler = new Elysia({ prefix: "/song" }).use(requireAuth).delete( "/:id", - async ({ params }) => { + async ({ params, user }) => { const id = Number(params.id); await db.update(songs).set({ deleted: true }).where(eq(songs.id, id)); + await db.insert(history).values({ + objectId: id, + changeType: "del-song", + changedBy: user!.unqId, + data: null + }); return { message: `Successfully deleted song ${id}.` }; diff --git a/packages/backend/routes/song/info.ts b/packages/backend/routes/song/info.ts index 23326e8..bc2231b 100644 --- a/packages/backend/routes/song/info.ts +++ b/packages/backend/routes/song/info.ts @@ -1,6 +1,5 @@ import { Elysia, t } from "elysia"; -import { dbMain } from "@core/drizzle"; -import { relations, singer, songs } from "@core/drizzle/main/schema"; +import { db, history, songs } from "@core/drizzle"; import { eq, and } from "drizzle-orm"; import { bv2av } from "@elysia/lib/bilibiliID"; import { requireAuth } from "@elysia/middlewares/auth"; @@ -14,11 +13,7 @@ async function getSongIDFromBiliID(id: string) { } else { return null; } - const songID = await dbMain - .select({ id: songs.id }) - .from(songs) - .where(eq(songs.aid, aid)) - .limit(1); + const songID = await db.select({ id: songs.id }).from(songs).where(eq(songs.aid, aid)).limit(1); if (songID.length > 0) { return songID[0].id; } @@ -38,7 +33,7 @@ async function getSongID(id: string) { } async function getSongInfo(id: number) { - const songInfo = await dbMain + const songInfo = await db .select() .from(songs) .where(and(eq(songs.id, id), eq(songs.deleted, false))) @@ -46,23 +41,6 @@ async function getSongInfo(id: number) { return songInfo[0]; } -async function getSingers(id: number) { - const singers = await dbMain - .select({ - singers: singer.name - }) - .from(relations) - .innerJoin(singer, eq(relations.targetId, singer.id)) - .where( - and( - eq(relations.sourceId, id), - eq(relations.sourceType, "song"), - eq(relations.relation, "sing") - ) - ); - return singers.map((singer) => singer.singers); -} - const songInfoGetHandler = new Elysia({ prefix: "/song" }).get( "/:id/info", async ({ params, status }) => { @@ -81,14 +59,12 @@ const songInfoGetHandler = new Elysia({ prefix: "/song" }).get( message: "Given song cannot be found." }); } - const singers = await getSingers(info.id); return { id: info.id, name: info.name, aid: info.aid, producer: info.producer, duration: info.duration, - singers: singers, cover: info.image || undefined, publishedAt: info.publishedAt }; @@ -101,9 +77,8 @@ const songInfoGetHandler = new Elysia({ prefix: "/song" }).get( aid: t.Union([t.Number(), t.Null()]), producer: t.Union([t.String(), t.Null()]), duration: t.Union([t.Number(), t.Null()]), - singers: t.Array(t.String()), cover: t.Optional(t.String()), - publishedAt: t.Union([t.String(), t.Null()]), + publishedAt: t.Union([t.String(), t.Null()]) }), 404: t.Object({ code: t.String(), @@ -127,7 +102,7 @@ const songInfoGetHandler = new Elysia({ prefix: "/song" }).get( const songInfoUpdateHandler = new Elysia({ prefix: "/song" }).use(requireAuth).patch( "/:id/info", - async ({ params, status, body }) => { + async ({ params, status, body, user }) => { const id = params.id; const songID = await getSongID(id); if (!songID) { @@ -145,16 +120,22 @@ const songInfoUpdateHandler = new Elysia({ prefix: "/song" }).use(requireAuth).p } if (body.name) { - await dbMain.update(songs).set({ name: body.name }).where(eq(songs.id, songID)); + await db.update(songs).set({ name: body.name }).where(eq(songs.id, songID)); } if (body.producer) { - await dbMain + await db .update(songs) .set({ producer: body.producer }) .where(eq(songs.id, songID)) .returning(); } - const updatedData = await dbMain.select().from(songs).where(eq(songs.id, songID)); + const updatedData = await db.select().from(songs).where(eq(songs.id, songID)); + await db.insert(history).values({ + objectId: songID, + changeType: "update-song", + changedBy: user!.unqId, + data: updatedData.length > 0 ? updatedData[0] : null + }); return { message: "Successfully updated song info.", updated: updatedData.length > 0 ? updatedData[0] : null diff --git a/packages/backend/routes/song/milestone.ts b/packages/backend/routes/song/milestone.ts index 5314042..8d04591 100644 --- a/packages/backend/routes/song/milestone.ts +++ b/packages/backend/routes/song/milestone.ts @@ -1,7 +1,6 @@ import { Elysia, t } from "elysia"; -import { dbMain } from "@core/drizzle"; -import { bilibiliMetadata, eta, latestVideoSnapshot } from "@core/drizzle/main/schema"; -import { eq, and, gte, lt, desc } from "drizzle-orm"; +import { db, bilibiliMetadata, eta } from "@core/drizzle"; +import { eq, and, gte, lt } from "drizzle-orm"; import serverTiming from "@elysia/middlewares/timing"; import z from "zod"; import { BiliVideoSchema } from "@elysia/lib/schema"; @@ -9,9 +8,9 @@ import { BiliVideoSchema } from "@elysia/lib/schema"; type MileStoneType = "dendou" | "densetsu" | "shinwa"; const range = { - dendou: [90000, 99999, 2160], - densetsu: [900000, 999999, 8760], - shinwa: [5000000, 9999999, 87600] + dendou: [0, 100000, 2160], + densetsu: [100000, 1000000, 8760], + shinwa: [1000000, 10000000, 43800] }; export const closeMileStoneHandler = new Elysia({ prefix: "/songs" }).use(serverTiming()).get( @@ -21,7 +20,7 @@ export const closeMileStoneHandler = new Elysia({ prefix: "/songs" }).use(server const type = params.type; const min = range[type as MileStoneType][0]; const max = range[type as MileStoneType][1]; - return dbMain + return db .select() .from(eta) .innerJoin(bilibiliMetadata, eq(bilibiliMetadata.aid, eta.aid)) diff --git a/packages/backend/routes/video/eta.ts b/packages/backend/routes/video/eta.ts index 4e570cf..ad0826b 100644 --- a/packages/backend/routes/video/eta.ts +++ b/packages/backend/routes/video/eta.ts @@ -1,6 +1,5 @@ import { Elysia, t } from "elysia"; -import { db } from "@core/drizzle"; -import { eta } from "@core/drizzle/main/schema"; +import { db, eta } from "@core/drizzle"; import { eq } from "drizzle-orm"; import { biliIDToAID } from "@elysia/lib/bilibiliID"; diff --git a/packages/backend/routes/video/metadata.ts b/packages/backend/routes/video/metadata.ts index 84c109f..48968ef 100644 --- a/packages/backend/routes/video/metadata.ts +++ b/packages/backend/routes/video/metadata.ts @@ -1,6 +1,5 @@ import { Elysia, t } from "elysia"; -import { dbMain } from "@core/drizzle"; -import { videoSnapshot } from "@core/drizzle/main/schema"; +import { db, videoSnapshot } from "@core/drizzle"; import { bv2av } from "@elysia/lib/bilibiliID"; import { getVideoInfo } from "@core/net/getVideoInfo"; import { redis } from "@core/db/redis"; @@ -31,7 +30,7 @@ async function insertVideoSnapshot(data: VideoInfoData) { const favorites = data.stat.favorite; const aid = data.aid; - await dbMain.insert(videoSnapshot).values({ + await db.insert(videoSnapshot).values({ aid, views, danmakus, diff --git a/packages/backend/routes/video/snapshots.ts b/packages/backend/routes/video/snapshots.ts index 77c98ea..1855489 100644 --- a/packages/backend/routes/video/snapshots.ts +++ b/packages/backend/routes/video/snapshots.ts @@ -1,6 +1,5 @@ import { Elysia } from "elysia"; -import { dbMain } from "@core/drizzle"; -import { videoSnapshot } from "@core/drizzle/main/schema"; +import { db, videoSnapshot } from "@core/drizzle"; import { bv2av } from "@elysia/lib/bilibiliID"; import { ErrorResponseSchema } from "@elysia/src/schema"; import { eq, desc } from "drizzle-orm"; @@ -26,7 +25,7 @@ export const getVideoSnapshotsHandler = new Elysia({ prefix: "/video" }).get( }); } - const data = await dbMain + const data = await db .select() .from(videoSnapshot) .where(eq(videoSnapshot.aid, aid)) diff --git a/packages/backend/src/index.ts b/packages/backend/src/index.ts index 1124ce7..15f6d51 100644 --- a/packages/backend/src/index.ts +++ b/packages/backend/src/index.ts @@ -1,4 +1,4 @@ -import { Elysia } from "elysia"; +import { Elysia, file } from "elysia"; import { getBindingInfo, logStartup } from "./startMessage"; import { pingHandler } from "@elysia/routes/ping"; import openapi from "@elysiajs/openapi"; @@ -14,6 +14,7 @@ import { getVideoSnapshotsHandler } from "@elysia/routes/video/snapshots"; import { addSongHandler } from "@elysia/routes/song/add"; import { deleteSongHandler } from "@elysia/routes/song/delete"; import { songEtaHandler } from "@elysia/routes/video/eta"; +import "./mq"; const [host, port] = getBindingInfo(); logStartup(host, port); @@ -45,6 +46,14 @@ const app = new Elysia({ .use(addSongHandler) .use(deleteSongHandler) .use(songEtaHandler) + .get("/a", () => file("public/background.jpg")) + .get("/song/:id", ({ redirect, params }) => { + console.log(`/song/${params.id}/info`); + return redirect(`/song/${params.id}/info`, 302); + }) + .get("/video/:id", ({ redirect, params }) => { + return redirect(`/video/${params.id}/info`, 302); + }) .listen(15412); export const VERSION = "0.7.0"; diff --git a/packages/backend/src/mq.ts b/packages/backend/src/mq.ts new file mode 100644 index 0000000..6ccbcfd --- /dev/null +++ b/packages/backend/src/mq.ts @@ -0,0 +1,21 @@ +import { db, history } from "@core/drizzle"; +import { ConnectionOptions, QueueEvents, QueueEventsListener } from "bullmq"; +import { redis } from "bun"; + +interface CustomListener extends QueueEventsListener { + addSong: (args: { uid: string; songID: number }, id: string) => void; +} +const queueEvents = new QueueEvents("latestVideos", { + connection: redis as ConnectionOptions +}); +queueEvents.on( + "addSong", + async ({ uid, songID }: { uid: string; songID: number }) => { + await db.insert(history).values({ + objectId: songID, + changeType: "add-song", + changedBy: uid, + data: null + }); + } +); diff --git a/packages/backend/src/onAfterHandle.ts b/packages/backend/src/onAfterHandle.ts index d0ba41b..9802fe6 100644 --- a/packages/backend/src/onAfterHandle.ts +++ b/packages/backend/src/onAfterHandle.ts @@ -1,4 +1,4 @@ -import Elysia from "elysia"; +import Elysia, { ElysiaFile } from "elysia"; const encoder = new TextEncoder(); @@ -11,20 +11,14 @@ export const onAfterHandler = new Elysia().onAfterHandle( const requestJson = contentType.includes("application/json"); const isBrowser = !requestJson && (accept.includes("text/html") || secFetchMode === "navigate"); - const responseValueType = typeof responseValue; - const isObject = responseValueType === "object"; + const isObject = typeof responseValue === "object"; if (!isObject) { - const response = { - message: responseValue - }; - const text = isBrowser ? JSON.stringify(response, null, 2) : JSON.stringify(response); - return new Response(encoder.encode(text), { - headers: { - "Content-Type": "application/json; charset=utf-8" - } - }); + return; } - const realResponse = responseValue as Record; + if (responseValue instanceof ElysiaFile || responseValue instanceof Response) { + return; + } + const realResponse = responseValue as Record; if (realResponse.code) { const text = isBrowser ? JSON.stringify(realResponse.response, null, 2) diff --git a/packages/core/db/dbNew.ts b/packages/core/db/dbNew.ts index b14cf1b..15e537d 100644 --- a/packages/core/db/dbNew.ts +++ b/packages/core/db/dbNew.ts @@ -1,8 +1,4 @@ import postgres from "postgres"; -import { postgresConfigCred, postgresConfig } from "./pgConfigNew"; +import { postgresConfig } from "./pgConfigNew"; -export const sql = postgres(postgresConfig); - -export const sqlCred = postgres(postgresConfigCred); - -export const sqlTest = postgres(postgresConfig); +export const sql = postgres(postgresConfig); \ No newline at end of file diff --git a/packages/core/db/snapshots/getClosetSnapshot.ts b/packages/core/db/snapshots/getClosetSnapshot.ts index 172ef0c..e96ed88 100644 --- a/packages/core/db/snapshots/getClosetSnapshot.ts +++ b/packages/core/db/snapshots/getClosetSnapshot.ts @@ -1,8 +1,8 @@ -import { dbMain } from "@core/drizzle"; +import { db } from "@core/drizzle"; import { sql } from "drizzle-orm"; export const getClosestSnapshot = async (aid: number, targetTime: Date) => { - const closest = await dbMain.execute<{ created_at: Date; views: number }>(sql` + const closest = await db.execute<{ created_at: Date; views: number }>(sql` SELECT created_at, views FROM ( (SELECT created_at, views, 'later' AS type diff --git a/packages/core/db/snapshots/getLatestSnapshot.ts b/packages/core/db/snapshots/getLatestSnapshot.ts index 0036db9..b8f502f 100644 --- a/packages/core/db/snapshots/getLatestSnapshot.ts +++ b/packages/core/db/snapshots/getLatestSnapshot.ts @@ -1,9 +1,12 @@ -import { dbMain } from "@core/drizzle"; -import { latestVideoSnapshot } from "@core/drizzle/main/schema"; +import { db, latestVideoSnapshot } from "@core/drizzle"; import { eq } from "drizzle-orm"; export const getLatestSnapshot = async (aid: number) => { - const result = await dbMain.select().from(latestVideoSnapshot).where(eq(latestVideoSnapshot.aid, aid)).limit(1); + const result = await db + .select() + .from(latestVideoSnapshot) + .where(eq(latestVideoSnapshot.aid, aid)) + .limit(1); if (result.length === 0) { return null; } diff --git a/packages/core/db/snapshots/milestone.ts b/packages/core/db/snapshots/milestone.ts index 57eaf3d..e07c876 100644 --- a/packages/core/db/snapshots/milestone.ts +++ b/packages/core/db/snapshots/milestone.ts @@ -1,5 +1,4 @@ -import { dbMain } from "@core/drizzle"; -import { eta as etaTable } from "@core/drizzle/main/schema"; +import { db, eta as etaTable } from "@core/drizzle"; import { eq } from "drizzle-orm"; import { MINUTE, HOUR, getClosetMilestone } from "@core/lib"; import { getLatestSnapshot, getClosestSnapshot } from "@core/db"; @@ -34,7 +33,7 @@ export const getGroundTruthMilestoneETA = async ( }; export const getMilestoneETA = async (aid: number) => { - const data = await dbMain.select().from(etaTable).where(eq(etaTable.aid, aid)).limit(1); + const data = await db.select().from(etaTable).where(eq(etaTable.aid, aid)).limit(1); if (data.length > 0) { return data[0].eta; } diff --git a/packages/core/drizzle/cred/0000_moaning_shotgun.sql b/packages/core/drizzle/cred/0000_moaning_shotgun.sql deleted file mode 100644 index d371bc5..0000000 --- a/packages/core/drizzle/cred/0000_moaning_shotgun.sql +++ /dev/null @@ -1,44 +0,0 @@ --- Current sql file was generated after introspecting the database --- If you want to run this migration please uncomment this code before executing migrations -/* -CREATE SEQUENCE "public"."captcha_difficulty_settings_id_seq" INCREMENT BY 1 MINVALUE 1 MAXVALUE 2147483647 START WITH 1 CACHE 1;--> statement-breakpoint -CREATE SEQUENCE "public"."users_id_seq" INCREMENT BY 1 MINVALUE 1 MAXVALUE 2147483647 START WITH 1 CACHE 1;--> statement-breakpoint -CREATE TABLE "captcha_difficulty_settings" ( - "id" integer DEFAULT nextval('captcha_difficulty_settings_id_seq'::regclass) NOT NULL, - "method" text NOT NULL, - "path" text NOT NULL, - "duration" real NOT NULL, - "threshold" integer NOT NULL, - "difficulty" integer NOT NULL, - "global" boolean NOT NULL -); ---> statement-breakpoint -CREATE TABLE "login_sessions" ( - "id" text NOT NULL, - "uid" integer NOT NULL, - "created_at" timestamp with time zone DEFAULT CURRENT_TIMESTAMP NOT NULL, - "expire_at" timestamp with time zone, - "last_used_at" timestamp with time zone, - "ip_address" "inet", - "user_agent" text, - "deactivated_at" timestamp with time zone -); ---> statement-breakpoint -CREATE TABLE "users" ( - "id" integer DEFAULT nextval('users_id_seq'::regclass) NOT NULL, - "nickname" text, - "username" text NOT NULL, - "password" text NOT NULL, - "unq_id" text DEFAULT gen_random_uuid() NOT NULL, - "role" text DEFAULT 'USER' NOT NULL, - "created_at" timestamp with time zone DEFAULT CURRENT_TIMESTAMP NOT NULL -); ---> statement-breakpoint -CREATE UNIQUE INDEX "captcha_difficulty_settings_pkey" ON "captcha_difficulty_settings" USING btree ("id" int4_ops);--> statement-breakpoint -CREATE INDEX "inx_login-sessions_uid" ON "login_sessions" USING btree ("uid" int4_ops);--> statement-breakpoint -CREATE UNIQUE INDEX "login_sessions_pkey" ON "login_sessions" USING btree ("id" text_ops);--> statement-breakpoint -CREATE UNIQUE INDEX "users_pkey" ON "users" USING btree ("id" int4_ops);--> statement-breakpoint -CREATE UNIQUE INDEX "users_pkey1" ON "users" USING btree ("id" int4_ops);--> statement-breakpoint -CREATE UNIQUE INDEX "users_unq_id_key" ON "users" USING btree ("unq_id" text_ops);--> statement-breakpoint -CREATE UNIQUE INDEX "users_username_key" ON "users" USING btree ("username" text_ops); -*/ \ No newline at end of file diff --git a/packages/core/drizzle/cred/meta/0000_snapshot.json b/packages/core/drizzle/cred/meta/0000_snapshot.json deleted file mode 100644 index a420ac1..0000000 --- a/packages/core/drizzle/cred/meta/0000_snapshot.json +++ /dev/null @@ -1,350 +0,0 @@ -{ - "id": "00000000-0000-0000-0000-000000000000", - "prevId": "", - "version": "7", - "dialect": "postgresql", - "tables": { - "public.captcha_difficulty_settings": { - "name": "captcha_difficulty_settings", - "schema": "", - "columns": { - "id": { - "name": "id", - "type": "integer", - "primaryKey": false, - "notNull": true, - "default": "nextval('captcha_difficulty_settings_id_seq'::regclass)" - }, - "method": { - "name": "method", - "type": "text", - "primaryKey": false, - "notNull": true - }, - "path": { - "name": "path", - "type": "text", - "primaryKey": false, - "notNull": true - }, - "duration": { - "name": "duration", - "type": "real", - "primaryKey": false, - "notNull": true - }, - "threshold": { - "name": "threshold", - "type": "integer", - "primaryKey": false, - "notNull": true - }, - "difficulty": { - "name": "difficulty", - "type": "integer", - "primaryKey": false, - "notNull": true - }, - "global": { - "name": "global", - "type": "boolean", - "primaryKey": false, - "notNull": true - } - }, - "indexes": { - "captcha_difficulty_settings_pkey": { - "name": "captcha_difficulty_settings_pkey", - "columns": [ - { - "expression": "id", - "asc": true, - "nulls": "last", - "opclass": "int4_ops", - "isExpression": false - } - ], - "isUnique": true, - "concurrently": false, - "method": "btree", - "with": {} - } - }, - "foreignKeys": {}, - "compositePrimaryKeys": {}, - "uniqueConstraints": {}, - "checkConstraints": {}, - "policies": {}, - "isRLSEnabled": false - }, - "public.login_sessions": { - "name": "login_sessions", - "schema": "", - "columns": { - "id": { - "name": "id", - "type": "text", - "primaryKey": false, - "notNull": true - }, - "uid": { - "name": "uid", - "type": "integer", - "primaryKey": false, - "notNull": true - }, - "created_at": { - "name": "created_at", - "type": "timestamp with time zone", - "primaryKey": false, - "notNull": true, - "default": "CURRENT_TIMESTAMP" - }, - "expire_at": { - "name": "expire_at", - "type": "timestamp with time zone", - "primaryKey": false, - "notNull": false - }, - "last_used_at": { - "name": "last_used_at", - "type": "timestamp with time zone", - "primaryKey": false, - "notNull": false - }, - "ip_address": { - "name": "ip_address", - "type": "inet", - "primaryKey": false, - "notNull": false - }, - "user_agent": { - "name": "user_agent", - "type": "text", - "primaryKey": false, - "notNull": false - }, - "deactivated_at": { - "name": "deactivated_at", - "type": "timestamp with time zone", - "primaryKey": false, - "notNull": false - } - }, - "indexes": { - "inx_login-sessions_uid": { - "name": "inx_login-sessions_uid", - "columns": [ - { - "expression": "uid", - "asc": true, - "nulls": "last", - "opclass": "int4_ops", - "isExpression": false - } - ], - "isUnique": false, - "concurrently": false, - "method": "btree", - "with": {} - }, - "login_sessions_pkey": { - "name": "login_sessions_pkey", - "columns": [ - { - "expression": "id", - "asc": true, - "nulls": "last", - "opclass": "text_ops", - "isExpression": false - } - ], - "isUnique": true, - "concurrently": false, - "method": "btree", - "with": {} - } - }, - "foreignKeys": {}, - "compositePrimaryKeys": {}, - "uniqueConstraints": {}, - "checkConstraints": {}, - "policies": {}, - "isRLSEnabled": false - }, - "public.users": { - "name": "users", - "schema": "", - "columns": { - "id": { - "name": "id", - "type": "integer", - "primaryKey": false, - "notNull": true, - "default": "nextval('users_id_seq'::regclass)" - }, - "nickname": { - "name": "nickname", - "type": "text", - "primaryKey": false, - "notNull": false - }, - "username": { - "name": "username", - "type": "text", - "primaryKey": false, - "notNull": true - }, - "password": { - "name": "password", - "type": "text", - "primaryKey": false, - "notNull": true - }, - "unq_id": { - "name": "unq_id", - "type": "text", - "primaryKey": false, - "notNull": true, - "default": "gen_random_uuid()" - }, - "role": { - "name": "role", - "type": "text", - "primaryKey": false, - "notNull": true, - "default": "'USER'" - }, - "created_at": { - "name": "created_at", - "type": "timestamp with time zone", - "primaryKey": false, - "notNull": true, - "default": "CURRENT_TIMESTAMP" - } - }, - "indexes": { - "users_pkey": { - "name": "users_pkey", - "columns": [ - { - "expression": "id", - "asc": true, - "nulls": "last", - "opclass": "int4_ops", - "isExpression": false - } - ], - "isUnique": true, - "concurrently": false, - "method": "btree", - "with": {} - }, - "users_pkey1": { - "name": "users_pkey1", - "columns": [ - { - "expression": "id", - "asc": true, - "nulls": "last", - "opclass": "int4_ops", - "isExpression": false - } - ], - "isUnique": true, - "concurrently": false, - "method": "btree", - "with": {} - }, - "users_unq_id_key": { - "name": "users_unq_id_key", - "columns": [ - { - "expression": "unq_id", - "asc": true, - "nulls": "last", - "opclass": "text_ops", - "isExpression": false - } - ], - "isUnique": true, - "concurrently": false, - "method": "btree", - "with": {} - }, - "users_username_key": { - "name": "users_username_key", - "columns": [ - { - "expression": "username", - "asc": true, - "nulls": "last", - "opclass": "text_ops", - "isExpression": false - } - ], - "isUnique": true, - "concurrently": false, - "method": "btree", - "with": {} - } - }, - "foreignKeys": {}, - "compositePrimaryKeys": {}, - "uniqueConstraints": {}, - "checkConstraints": {}, - "policies": {}, - "isRLSEnabled": false - } - }, - "enums": {}, - "schemas": {}, - "sequences": { - "public.captcha_difficulty_settings_id_seq": { - "name": "captcha_difficulty_settings_id_seq", - "schema": "public", - "startWith": "1", - "minValue": "1", - "maxValue": "2147483647", - "increment": "1", - "cycle": false, - "cache": "1" - }, - "public.users_id_seq": { - "name": "users_id_seq", - "schema": "public", - "startWith": "1", - "minValue": "1", - "maxValue": "2147483647", - "increment": "1", - "cycle": false, - "cache": "1" - } - }, - "roles": {}, - "policies": {}, - "views": {}, - "_meta": { - "schemas": {}, - "tables": {}, - "columns": {} - }, - "internal": { - "tables": { - "captcha_difficulty_settings": { - "columns": { - "id": { - "isDefaultAnExpression": true - } - } - }, - "users": { - "columns": { - "id": { - "isDefaultAnExpression": true - } - } - } - } - } -} diff --git a/packages/core/drizzle/cred/meta/_journal.json b/packages/core/drizzle/cred/meta/_journal.json deleted file mode 100644 index 574cfc3..0000000 --- a/packages/core/drizzle/cred/meta/_journal.json +++ /dev/null @@ -1,13 +0,0 @@ -{ - "version": "7", - "dialect": "postgresql", - "entries": [ - { - "idx": 0, - "version": "7", - "when": 1750513073792, - "tag": "0000_moaning_shotgun", - "breakpoints": true - } - ] -} diff --git a/packages/core/drizzle/cred/relations.ts b/packages/core/drizzle/cred/relations.ts deleted file mode 100644 index 0ed80c7..0000000 --- a/packages/core/drizzle/cred/relations.ts +++ /dev/null @@ -1,2 +0,0 @@ -import { relations } from "drizzle-orm/relations"; -import {} from "./schema"; diff --git a/packages/core/drizzle/cred/schema.ts b/packages/core/drizzle/cred/schema.ts deleted file mode 100644 index 0b9e7b1..0000000 --- a/packages/core/drizzle/cred/schema.ts +++ /dev/null @@ -1,93 +0,0 @@ -import { - pgTable, - uniqueIndex, - integer, - text, - real, - boolean, - index, - timestamp, - inet, - pgSequence -} from "drizzle-orm/pg-core"; -import { sql } from "drizzle-orm"; - -export const captchaDifficultySettingsIdSeq = pgSequence("captcha_difficulty_settings_id_seq", { - startWith: "1", - increment: "1", - minValue: "1", - maxValue: "2147483647", - cache: "1", - cycle: false -}); -export const usersIdSeq = pgSequence("users_id_seq", { - startWith: "1", - increment: "1", - minValue: "1", - maxValue: "2147483647", - cache: "1", - cycle: false -}); - -export const captchaDifficultySettings = pgTable( - "captcha_difficulty_settings", - { - id: integer() - .default(sql`nextval('captcha_difficulty_settings_id_seq'::regclass)`) - .notNull(), - method: text().notNull(), - path: text().notNull(), - duration: real().notNull(), - threshold: integer().notNull(), - difficulty: integer().notNull(), - global: boolean().notNull() - }, - (table) => [ - uniqueIndex("captcha_difficulty_settings_pkey").using("btree", table.id.asc().nullsLast().op("int4_ops")) - ] -); - -export const loginSessions = pgTable( - "login_sessions", - { - id: text().notNull(), - uid: integer().notNull(), - createdAt: timestamp("created_at", { withTimezone: true, mode: "string" }) - .default(sql`CURRENT_TIMESTAMP`) - .notNull(), - expireAt: timestamp("expire_at", { withTimezone: true, mode: "string" }), - lastUsedAt: timestamp("last_used_at", { withTimezone: true, mode: "string" }), - ipAddress: inet("ip_address"), - userAgent: text("user_agent"), - deactivatedAt: timestamp("deactivated_at", { withTimezone: true, mode: "string" }) - }, - (table) => [ - index("inx_login-sessions_uid").using("btree", table.uid.asc().nullsLast().op("int4_ops")), - uniqueIndex("login_sessions_pkey").using("btree", table.id.asc().nullsLast().op("text_ops")) - ] -); - -export const users = pgTable( - "users", - { - id: integer() - .default(sql`nextval('users_id_seq'::regclass)`) - .notNull(), - nickname: text(), - username: text().notNull(), - password: text().notNull(), - unqId: text("unq_id") - .default(sql`gen_random_uuid()`) - .notNull(), - role: text().default("USER").notNull(), - createdAt: timestamp("created_at", { withTimezone: true, mode: "string" }) - .default(sql`CURRENT_TIMESTAMP`) - .notNull() - }, - (table) => [ - uniqueIndex("users_pkey").using("btree", table.id.asc().nullsLast().op("int4_ops")), - uniqueIndex("users_pkey1").using("btree", table.id.asc().nullsLast().op("int4_ops")), - uniqueIndex("users_unq_id_key").using("btree", table.unqId.asc().nullsLast().op("text_ops")), - uniqueIndex("users_username_key").using("btree", table.username.asc().nullsLast().op("text_ops")) - ] -); diff --git a/packages/core/drizzle/index.ts b/packages/core/drizzle/index.ts index 453212f..34d947f 100644 --- a/packages/core/drizzle/index.ts +++ b/packages/core/drizzle/index.ts @@ -1,8 +1,8 @@ "use server"; import { drizzle } from "drizzle-orm/postgres-js"; -import { sqlCred, sql } from "@core/db/dbNew"; +import { sql } from "@core/db/dbNew"; -export const dbMain = drizzle(sql); -export const dbCred = drizzle(sqlCred); -export const db = drizzle(sql); \ No newline at end of file +export const db = drizzle(sql); +export * from "./main/schema"; +export * from "./type"; \ No newline at end of file diff --git a/packages/core/drizzle/main/relations.ts b/packages/core/drizzle/main/relations.ts index f7a9112..d491e2d 100644 --- a/packages/core/drizzle/main/relations.ts +++ b/packages/core/drizzle/main/relations.ts @@ -1,24 +1,15 @@ import { relations } from "drizzle-orm/relations"; -import { songs, relationSinger, singer, relationsProducer } from "./schema"; +import { usersInCredentials, history, songs, relationsProducer, singer, relationSinger } from "./schema"; -export const relationSingerRelations = relations(relationSinger, ({one}) => ({ - song: one(songs, { - fields: [relationSinger.songId], - references: [songs.id] - }), - singer: one(singer, { - fields: [relationSinger.singerId], - references: [singer.id] +export const historyRelations = relations(history, ({one}) => ({ + usersInCredential: one(usersInCredentials, { + fields: [history.changedBy], + references: [usersInCredentials.unqId] }), })); -export const songsRelations = relations(songs, ({many}) => ({ - relationSingers: many(relationSinger), - relationsProducers: many(relationsProducer), -})); - -export const singerRelations = relations(singer, ({many}) => ({ - relationSingers: many(relationSinger), +export const usersInCredentialsRelations = relations(usersInCredentials, ({many}) => ({ + histories: many(history), })); export const relationsProducerRelations = relations(relationsProducer, ({one}) => ({ @@ -26,4 +17,24 @@ export const relationsProducerRelations = relations(relationsProducer, ({one}) = fields: [relationsProducer.songId], references: [songs.id] }), +})); + +export const songsRelations = relations(songs, ({many}) => ({ + relationsProducers: many(relationsProducer), + relationSingers: many(relationSinger), +})); + +export const relationSingerRelations = relations(relationSinger, ({one}) => ({ + singer: one(singer, { + fields: [relationSinger.singerId], + references: [singer.id] + }), + song: one(songs, { + fields: [relationSinger.songId], + references: [songs.id] + }), +})); + +export const singerRelations = relations(singer, ({many}) => ({ + relationSingers: many(relationSinger), })); \ No newline at end of file diff --git a/packages/core/drizzle/main/schema.ts b/packages/core/drizzle/main/schema.ts index 189e8fb..0f06a05 100644 --- a/packages/core/drizzle/main/schema.ts +++ b/packages/core/drizzle/main/schema.ts @@ -1,4 +1,4 @@ -import { pgTable, index, unique, serial, bigint, text, timestamp, uniqueIndex, integer, varchar, smallint, jsonb, boolean, foreignKey, bigserial, real, pgSchema, inet, pgSequence } from "drizzle-orm/pg-core" +import { pgTable, pgSchema, uniqueIndex, check, integer, text, timestamp, foreignKey, serial, bigint, jsonb, index, inet, varchar, smallint, real, unique, boolean, bigserial, pgSequence } from "drizzle-orm/pg-core" import { sql } from "drizzle-orm" export const credentials = pgSchema("credentials"); @@ -6,28 +6,69 @@ export const userRoleInCredentials = credentials.enum("user_role", ['ADMIN', 'US export const allDataIdSeq = pgSequence("all_data_id_seq", { startWith: "1", increment: "1", minValue: "1", maxValue: "2147483647", cache: "1", cycle: false }) export const labelingResultIdSeq = pgSequence("labeling_result_id_seq", { startWith: "1", increment: "1", minValue: "1", maxValue: "2147483647", cache: "1", cycle: false }) +export const relationSingerIdSeq = pgSequence("relation_singer_id_seq", { startWith: "1", increment: "1", minValue: "1", maxValue: "2147483647", cache: "1", cycle: false }) +export const relationsProducerIdSeq = pgSequence("relations_producer_id_seq", { startWith: "1", increment: "1", minValue: "1", maxValue: "2147483647", cache: "1", cycle: false }) export const songsIdSeq = pgSequence("songs_id_seq", { startWith: "1", increment: "1", minValue: "1", maxValue: "2147483647", cache: "1", cycle: false }) export const videoSnapshotIdSeq = pgSequence("video_snapshot_id_seq", { startWith: "1", increment: "1", minValue: "1", maxValue: "2147483647", cache: "1", cycle: false }) export const viewsIncrementRateIdSeq = pgSequence("views_increment_rate_id_seq", { startWith: "1", increment: "1", minValue: "1", maxValue: "9223372036854775807", cache: "1", cycle: false }) -export const relationSingerIdSeq = pgSequence("relation_singer_id_seq", { startWith: "1", increment: "1", minValue: "1", maxValue: "2147483647", cache: "1", cycle: false }) -export const relationsProducerIdSeq = pgSequence("relations_producer_id_seq", { startWith: "1", increment: "1", minValue: "1", maxValue: "2147483647", cache: "1", cycle: false }) export const usersIdSeqInCredentials = credentials.sequence("users_id_seq", { startWith: "1", increment: "1", minValue: "1", maxValue: "2147483647", cache: "1", cycle: false }) -export const relations = pgTable("relations", { +export const usersInCredentials = credentials.table("users", { + id: integer().default(sql`nextval('credentials.users_id_seq'::regclass)`).notNull(), + nickname: text(), + username: text().notNull(), + password: text().notNull(), + unqId: text("unq_id").notNull(), + role: userRoleInCredentials().default('USER').notNull(), + createdAt: timestamp("created_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(), +}, (table) => [ + uniqueIndex("users_pkey").using("btree", table.id.asc().nullsLast().op("int4_ops")), + uniqueIndex("users_pkey1").using("btree", table.id.asc().nullsLast().op("int4_ops")), + uniqueIndex("users_username_key").using("btree", table.username.asc().nullsLast().op("text_ops")), + check("users_id_not_null", sql`NOT NULL id`), + check("users_username_not_null", sql`NOT NULL username`), + check("users_password_not_null", sql`NOT NULL password`), + check("users_unq_id_not_null", sql`NOT NULL unq_id`), + check("users_role_not_null", sql`NOT NULL role`), + check("users_created_at_not_null", sql`NOT NULL created_at`), +]); + +export const history = pgTable("history", { id: serial().primaryKey().notNull(), // You can use { mode: "bigint" } if numbers are exceeding js number limitations - sourceId: bigint("source_id", { mode: "number" }).notNull(), - sourceType: text("source_type").notNull(), - // You can use { mode: "bigint" } if numbers are exceeding js number limitations - targetId: bigint("target_id", { mode: "number" }).notNull(), - targetType: text("target_type").notNull(), - relation: text().notNull(), - createdAt: timestamp("created_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(), - updatedAt: timestamp("updated_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(), + objectId: bigint("object_id", { mode: "number" }).notNull(), + changeType: text("change_type").notNull(), + changedAt: timestamp("changed_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(), + changedBy: text("changed_by").notNull(), + data: jsonb(), }, (table) => [ - index("idx_relations_source_id_source_type_relation").using("btree", table.sourceId.asc().nullsLast().op("int8_ops"), table.sourceType.asc().nullsLast().op("int8_ops"), table.relation.asc().nullsLast().op("text_ops")), - index("idx_relations_target_id_target_type_relation").using("btree", table.targetId.asc().nullsLast().op("text_ops"), table.targetType.asc().nullsLast().op("text_ops"), table.relation.asc().nullsLast().op("text_ops")), - unique("unq_relations").on(table.sourceId, table.sourceType, table.targetId, table.targetType, table.relation), + foreignKey({ + columns: [table.changedBy], + foreignColumns: [usersInCredentials.unqId], + name: "rel_history_changed_by" + }), + check("history_id_not_null", sql`NOT NULL id`), + check("history_object_id_not_null", sql`NOT NULL object_id`), + check("history_change_type_not_null", sql`NOT NULL change_type`), + check("history_changed_at_not_null", sql`NOT NULL changed_at`), + check("history_changed_by_not_null", sql`NOT NULL changed_by`), +]); + +export const loginSessionsInCredentials = credentials.table("login_sessions", { + id: text().notNull(), + uid: integer().notNull(), + createdAt: timestamp("created_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(), + expireAt: timestamp("expire_at", { withTimezone: true, mode: 'string' }), + lastUsedAt: timestamp("last_used_at", { withTimezone: true, mode: 'string' }), + ipAddress: inet("ip_address"), + userAgent: text("user_agent"), + deactivatedAt: timestamp("deactivated_at", { withTimezone: true, mode: 'string' }), +}, (table) => [ + index("inx_login-sessions_uid").using("btree", table.uid.asc().nullsLast().op("int4_ops")), + uniqueIndex("login_sessions_pkey").using("btree", table.id.asc().nullsLast().op("text_ops")), + check("login_sessions_id_not_null", sql`NOT NULL id`), + check("login_sessions_uid_not_null", sql`NOT NULL uid`), + check("login_sessions_created_at_not_null", sql`NOT NULL created_at`), ]); export const bilibiliMetadata = pgTable("bilibili_metadata", { @@ -52,6 +93,9 @@ export const bilibiliMetadata = pgTable("bilibili_metadata", { index("idx_all-data_uid").using("btree", table.uid.asc().nullsLast().op("int8_ops")), index("idx_bili-meta_status").using("btree", table.status.asc().nullsLast().op("int4_ops")), uniqueIndex("unq_all-data_aid").using("btree", table.aid.asc().nullsLast().op("int8_ops")), + check("bilibili_metadata_id_not_null", sql`NOT NULL id`), + check("bilibili_metadata_aid_not_null", sql`NOT NULL aid`), + check("bilibili_metadata_status_not_null", sql`NOT NULL status`), ]); export const humanClassifiedLables = pgTable("human_classified_lables", { @@ -66,22 +110,41 @@ export const humanClassifiedLables = pgTable("human_classified_lables", { index("idx_classified-labels-human_author").using("btree", table.uid.asc().nullsLast().op("int4_ops")), index("idx_classified-labels-human_created-at").using("btree", table.createdAt.asc().nullsLast().op("timestamptz_ops")), index("idx_classified-labels-human_label").using("btree", table.label.asc().nullsLast().op("int2_ops")), + check("human_classified_lables_id_not_null", sql`NOT NULL id`), + check("human_classified_lables_aid_not_null", sql`NOT NULL aid`), + check("human_classified_lables_uid_not_null", sql`NOT NULL uid`), + check("human_classified_lables_label_not_null", sql`NOT NULL label`), + check("human_classified_lables_created_at_not_null", sql`NOT NULL created_at`), ]); -export const singer = pgTable("singer", { - id: serial().primaryKey().notNull(), - name: text().notNull(), -}); - -export const history = pgTable("history", { - id: serial().primaryKey().notNull(), +export const videoSnapshot = pgTable("video_snapshot", { + id: integer().default(sql`nextval('video_snapshot_id_seq'::regclass)`).notNull(), + createdAt: timestamp("created_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(), + views: integer().notNull(), + coins: integer(), + likes: integer(), + favorites: integer(), + shares: integer(), + danmakus: integer(), // You can use { mode: "bigint" } if numbers are exceeding js number limitations - objectId: bigint("object_id", { mode: "number" }).notNull(), - changeType: text("change_type").notNull(), - changedAt: timestamp("changed_at", { withTimezone: true, mode: 'string' }).notNull(), - changedBy: integer("changed_by").notNull(), - data: jsonb().notNull(), -}); + aid: bigint({ mode: "number" }).notNull(), + replies: integer(), +}, (table) => [ + index("idx_vid_snapshot_aid_created_at").using("btree", table.aid.asc().nullsLast().op("int8_ops"), table.createdAt.asc().nullsLast().op("int8_ops")), + index("idx_vid_snapshot_time").using("btree", table.createdAt.asc().nullsLast().op("timestamptz_ops")), + check("video_snapshot_id_not_null", sql`NOT NULL id`), + check("video_snapshot_created_at_not_null", sql`NOT NULL created_at`), + check("video_snapshot_views_not_null", sql`NOT NULL views`), + check("video_snapshot_aid_not_null", sql`NOT NULL aid`), +]); + +export const producer = pgTable("producer", { + id: integer().primaryKey().notNull(), + name: text().notNull(), +}, (table) => [ + check("producer_id_not_null", sql`NOT NULL id`), + check("producer_name_not_null", sql`NOT NULL name`), +]); export const labellingResult = pgTable("labelling_result", { id: integer().default(sql`nextval('labeling_result_id_seq'::regclass)`).notNull(), @@ -97,6 +160,11 @@ export const labellingResult = pgTable("labelling_result", { index("idx_labelling_aid-label").using("btree", table.aid.asc().nullsLast().op("int2_ops"), table.label.asc().nullsLast().op("int2_ops")), uniqueIndex("labeling_result_pkey").using("btree", table.id.asc().nullsLast().op("int4_ops")), uniqueIndex("unq_labelling-result_aid_model-version").using("btree", table.aid.asc().nullsLast().op("int8_ops"), table.modelVersion.asc().nullsLast().op("int8_ops")), + check("labelling_result_id_not_null", sql`NOT NULL id`), + check("labelling_result_aid_not_null", sql`NOT NULL aid`), + check("labelling_result_label_not_null", sql`NOT NULL label`), + check("labelling_result_model_version_not_null", sql`NOT NULL model_version`), + check("labelling_result_created_at_not_null", sql`NOT NULL created_at`), ]); export const latestVideoSnapshot = pgTable("latest_video_snapshot", { @@ -113,6 +181,80 @@ export const latestVideoSnapshot = pgTable("latest_video_snapshot", { }, (table) => [ index("idx_latest-video-snapshot_time").using("btree", table.time.asc().nullsLast().op("timestamptz_ops")), index("idx_latest-video-snapshot_views").using("btree", table.views.asc().nullsLast().op("int4_ops")), + check("latest_video_snapshot_aid_not_null", sql`NOT NULL aid`), + check("latest_video_snapshot_time_not_null", sql`NOT NULL "time"`), + check("latest_video_snapshot_views_not_null", sql`NOT NULL views`), + check("latest_video_snapshot_coins_not_null", sql`NOT NULL coins`), + check("latest_video_snapshot_likes_not_null", sql`NOT NULL likes`), + check("latest_video_snapshot_favorites_not_null", sql`NOT NULL favorites`), + check("latest_video_snapshot_replies_not_null", sql`NOT NULL replies`), + check("latest_video_snapshot_danmakus_not_null", sql`NOT NULL danmakus`), + check("latest_video_snapshot_shares_not_null", sql`NOT NULL shares`), +]); + +export const relationsProducer = pgTable("relations_producer", { + id: integer().default(sql`nextval('relations_producer_id_seq'::regclass)`).primaryKey().notNull(), + // You can use { mode: "bigint" } if numbers are exceeding js number limitations + songId: bigint("song_id", { mode: "number" }).notNull(), + createdAt: timestamp("created_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(), + producerId: integer("producer_id").notNull(), + updatedAt: timestamp("updated_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(), +}, (table) => [ + foreignKey({ + columns: [table.songId], + foreignColumns: [songs.id], + name: "fkey_relations_producer_songs_id" + }), + check("relations_producer_id_not_null", sql`NOT NULL id`), + check("relations_producer_song_id_not_null", sql`NOT NULL song_id`), + check("relations_producer_created_at_not_null", sql`NOT NULL created_at`), + check("relations_producer_producer_id_not_null", sql`NOT NULL producer_id`), + check("relations_producer_updated_at_not_null", sql`NOT NULL updated_at`), +]); + +export const eta = pgTable("eta", { + // You can use { mode: "bigint" } if numbers are exceeding js number limitations + aid: bigint({ mode: "number" }).primaryKey().notNull(), + eta: real().notNull(), + speed: real().notNull(), + currentViews: integer("current_views").notNull(), + updatedAt: timestamp("updated_at", { withTimezone: true, mode: 'string' }).defaultNow().notNull(), +}, (table) => [ + index("idx_eta_eta_current_views").using("btree", table.eta.asc().nullsLast().op("int4_ops"), table.currentViews.asc().nullsLast().op("int4_ops")), + check("eta_aid_not_null", sql`NOT NULL aid`), + check("eta_eta_not_null", sql`NOT NULL eta`), + check("eta_speed_not_null", sql`NOT NULL speed`), + check("eta_current_views_not_null", sql`NOT NULL current_views`), + check("eta_updated_at_not_null", sql`NOT NULL updated_at`), +]); + +export const bilibiliUser = pgTable("bilibili_user", { + id: serial().primaryKey().notNull(), + // You can use { mode: "bigint" } if numbers are exceeding js number limitations + uid: bigint({ mode: "number" }).notNull(), + username: text().notNull(), + desc: text().notNull(), + fans: integer().notNull(), + createdAt: timestamp("created_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(), + updatedAt: timestamp("updated_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(), +}, (table) => [ + index("idx_bili-user_uid").using("btree", table.uid.asc().nullsLast().op("int8_ops")), + unique("unq_bili-user_uid").on(table.uid), + check("bilibili_user_id_not_null", sql`NOT NULL id`), + check("bilibili_user_uid_not_null", sql`NOT NULL uid`), + check("bilibili_user_username_not_null", sql`NOT NULL username`), + check("bilibili_user_desc_not_null", sql`NOT NULL "desc"`), + check("bilibili_user_fans_not_null", sql`NOT NULL fans`), + check("bilibili_user_created_at_not_null", sql`NOT NULL created_at`), + check("bilibili_user_updated_at_not_null", sql`NOT NULL updated_at`), +]); + +export const singer = pgTable("singer", { + id: serial().primaryKey().notNull(), + name: text().notNull(), +}, (table) => [ + check("singer_id_not_null", sql`NOT NULL id`), + check("singer_name_not_null", sql`NOT NULL name`), ]); export const songs = pgTable("songs", { @@ -131,71 +273,16 @@ export const songs = pgTable("songs", { image: text(), producer: text(), }, (table) => [ - index("idx_aid").using("btree", table.aid.asc().nullsLast().op("int8_ops")), index("idx_hash_songs_aid").using("hash", table.aid.asc().nullsLast().op("int8_ops")), index("idx_netease_id").using("btree", table.neteaseId.asc().nullsLast().op("int8_ops")), index("idx_published_at").using("btree", table.publishedAt.asc().nullsLast().op("timestamptz_ops")), index("idx_type").using("btree", table.type.asc().nullsLast().op("int2_ops")), uniqueIndex("unq_songs_aid").using("btree", table.aid.asc().nullsLast().op("int8_ops")), uniqueIndex("unq_songs_netease_id").using("btree", table.neteaseId.asc().nullsLast().op("int8_ops")), -]); - -export const videoSnapshot = pgTable("video_snapshot", { - id: integer().default(sql`nextval('video_snapshot_id_seq'::regclass)`).notNull(), - createdAt: timestamp("created_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(), - views: integer().notNull(), - coins: integer(), - likes: integer(), - favorites: integer(), - shares: integer(), - danmakus: integer(), - // You can use { mode: "bigint" } if numbers are exceeding js number limitations - aid: bigint({ mode: "number" }).notNull(), - replies: integer(), -}, (table) => [ - index("idx_vid_snapshot_aid").using("btree", table.aid.asc().nullsLast().op("int8_ops")), - index("idx_vid_snapshot_aid_created_at").using("btree", table.aid.asc().nullsLast().op("int8_ops"), table.createdAt.asc().nullsLast().op("timestamptz_ops")), - index("idx_vid_snapshot_time").using("btree", table.createdAt.asc().nullsLast().op("timestamptz_ops")), - index("idx_vid_snapshot_views").using("btree", table.views.asc().nullsLast().op("int4_ops")), - uniqueIndex("video_snapshot_pkey").using("btree", table.id.asc().nullsLast().op("int4_ops")), -]); - -export const bilibiliUser = pgTable("bilibili_user", { - id: serial().primaryKey().notNull(), - // You can use { mode: "bigint" } if numbers are exceeding js number limitations - uid: bigint({ mode: "number" }).notNull(), - username: text().notNull(), - desc: text().notNull(), - fans: integer().notNull(), - createdAt: timestamp("created_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(), - updatedAt: timestamp("updated_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(), -}, (table) => [ - index("idx_bili-user_uid").using("btree", table.uid.asc().nullsLast().op("int8_ops")), - unique("unq_bili-user_uid").on(table.uid), -]); - -export const producer = pgTable("producer", { - id: integer().primaryKey().notNull(), - name: text().notNull(), -}); - -export const relationSinger = pgTable("relation_singer", { - id: integer().default(sql`nextval('relation_singer_id_seq'::regclass)`).primaryKey().notNull(), - songId: integer("song_id").notNull(), - singerId: integer("singer_id").notNull(), - createdAt: timestamp("created_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(), - updatedAt: timestamp("updated_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(), -}, (table) => [ - foreignKey({ - columns: [table.songId], - foreignColumns: [songs.id], - name: "fkey_song_id" - }), - foreignKey({ - columns: [table.singerId], - foreignColumns: [singer.id], - name: "fkey_singer_id" - }), + check("songs_id_not_null", sql`NOT NULL id`), + check("songs_created_at_not_null", sql`NOT NULL created_at`), + check("songs_updated_at_not_null", sql`NOT NULL updated_at`), + check("songs_deleted_not_null", sql`NOT NULL deleted`), ]); export const snapshotSchedule = pgTable("snapshot_schedule", { @@ -208,65 +295,58 @@ export const snapshotSchedule = pgTable("snapshot_schedule", { finishedAt: timestamp("finished_at", { withTimezone: true, mode: 'string' }), status: text().default('pending').notNull(), }, (table) => [ - index("idx_snapshot_schedule_aid").using("btree", table.aid.asc().nullsLast().op("int8_ops")), - index("idx_snapshot_schedule_started-at_status_type").using("btree", table.startedAt.asc().nullsLast().op("text_ops"), table.status.asc().nullsLast().op("timestamptz_ops"), table.type.asc().nullsLast().op("timestamptz_ops")), - index("idx_snapshot_schedule_started_at").using("btree", table.startedAt.asc().nullsLast().op("timestamptz_ops")), - index("idx_snapshot_schedule_status").using("btree", table.status.asc().nullsLast().op("text_ops")), - index("idx_snapshot_schedule_status_type_aid").using("btree", table.status.asc().nullsLast().op("int8_ops"), table.type.asc().nullsLast().op("int8_ops"), table.aid.asc().nullsLast().op("text_ops")), - index("idx_snapshot_schedule_type").using("btree", table.type.asc().nullsLast().op("text_ops")), + index("idx_snapshot_schedule_aid_status_type").using("btree", table.aid.asc().nullsLast().op("int8_ops"), table.status.asc().nullsLast().op("text_ops"), table.type.asc().nullsLast().op("text_ops")), + index("idx_snapshot_schedule_status_started_at").using("btree", table.status.asc().nullsLast().op("timestamptz_ops"), table.startedAt.asc().nullsLast().op("text_ops")), uniqueIndex("snapshot_schedule_pkey").using("btree", table.id.asc().nullsLast().op("int8_ops")), + check("snapshot_schedule_id_not_null", sql`NOT NULL id`), + check("snapshot_schedule_aid_not_null", sql`NOT NULL aid`), + check("snapshot_schedule_created_at_not_null", sql`NOT NULL created_at`), + check("snapshot_schedule_status_not_null", sql`NOT NULL status`), ]); -export const eta = pgTable("eta", { +export const relationSinger = pgTable("relation_singer", { + id: integer().default(sql`nextval('relation_singer_id_seq'::regclass)`).primaryKey().notNull(), // You can use { mode: "bigint" } if numbers are exceeding js number limitations - aid: bigint({ mode: "number" }).primaryKey().notNull(), - eta: real().notNull(), - speed: real().notNull(), - currentViews: integer("current_views").notNull(), - updatedAt: timestamp("updated_at", { withTimezone: true, mode: 'string' }).defaultNow().notNull(), -}, (table) => [ - index("idx_eta_eta").using("btree", table.eta.asc().nullsLast().op("float4_ops")), -]); - -export const loginSessionsInCredentials = credentials.table("login_sessions", { - id: text().notNull(), - uid: integer().notNull(), + songId: bigint("song_id", { mode: "number" }).notNull(), + singerId: integer("singer_id").notNull(), createdAt: timestamp("created_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(), - expireAt: timestamp("expire_at", { withTimezone: true, mode: 'string' }), - lastUsedAt: timestamp("last_used_at", { withTimezone: true, mode: 'string' }), - ipAddress: inet("ip_address"), - userAgent: text("user_agent"), - deactivatedAt: timestamp("deactivated_at", { withTimezone: true, mode: 'string' }), -}, (table) => [ - index("inx_login-sessions_uid").using("btree", table.uid.asc().nullsLast().op("int4_ops")), - uniqueIndex("login_sessions_pkey").using("btree", table.id.asc().nullsLast().op("text_ops")), -]); - -export const usersInCredentials = credentials.table("users", { - id: integer().default(sql`nextval('credentials.users_id_seq'::regclass)`).notNull(), - nickname: text(), - username: text().notNull(), - password: text().notNull(), - unqId: text("unq_id").notNull(), - role: userRoleInCredentials().default('USER').notNull(), - createdAt: timestamp("created_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(), -}, (table) => [ - uniqueIndex("users_pkey").using("btree", table.id.asc().nullsLast().op("int4_ops")), - uniqueIndex("users_pkey1").using("btree", table.id.asc().nullsLast().op("int4_ops")), - uniqueIndex("users_unq_id_key").using("btree", table.unqId.asc().nullsLast().op("text_ops")), - uniqueIndex("users_username_key").using("btree", table.username.asc().nullsLast().op("text_ops")), -]); - -export const relationsProducer = pgTable("relations_producer", { - id: integer().default(sql`nextval('relations_producer_id_seq'::regclass)`).primaryKey().notNull(), - songId: integer("song_id").notNull(), - createdAt: timestamp("created_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(), - producerId: integer("producer_id").notNull(), updatedAt: timestamp("updated_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(), }, (table) => [ + foreignKey({ + columns: [table.singerId], + foreignColumns: [singer.id], + name: "fkey_singer_id" + }), foreignKey({ columns: [table.songId], foreignColumns: [songs.id], - name: "fkey_relations_producer_songs_id" + name: "fkey_song_id" }), + check("relation_singer_id_not_null", sql`NOT NULL id`), + check("relation_singer_song_id_not_null", sql`NOT NULL song_id`), + check("relation_singer_singer_id_not_null", sql`NOT NULL singer_id`), + check("relation_singer_created_at_not_null", sql`NOT NULL created_at`), + check("relation_singer_updated_at_not_null", sql`NOT NULL updated_at`), +]); + +export const relations = pgTable("relations", { + id: integer().notNull(), + // You can use { mode: "bigint" } if numbers are exceeding js number limitations + sourceId: bigint("source_id", { mode: "number" }).notNull(), + sourceType: text("source_type").notNull(), + // You can use { mode: "bigint" } if numbers are exceeding js number limitations + targetId: bigint("target_id", { mode: "number" }).notNull(), + targetType: text("target_type").notNull(), + relation: text().notNull(), + createdAt: timestamp("created_at", { precision: 6, withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(), + updatedAt: timestamp("updated_at", { precision: 6, withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(), +}, (table) => [ + check("relations_id_not_null", sql`NOT NULL id`), + check("relations_source_id_not_null", sql`NOT NULL source_id`), + check("relations_source_type_not_null", sql`NOT NULL source_type`), + check("relations_target_id_not_null", sql`NOT NULL target_id`), + check("relations_target_type_not_null", sql`NOT NULL target_type`), + check("relations_relation_not_null", sql`NOT NULL relation`), + check("relations_created_at_not_null", sql`NOT NULL created_at`), + check("relations_updated_at_not_null", sql`NOT NULL updated_at`), ]); diff --git a/packages/core/drizzle/outerSchema.ts b/packages/core/drizzle/type.ts similarity index 100% rename from packages/core/drizzle/outerSchema.ts rename to packages/core/drizzle/type.ts diff --git a/packages/core/lib/index.ts b/packages/core/lib/index.ts index 76ccac8..4bf52b9 100644 --- a/packages/core/lib/index.ts +++ b/packages/core/lib/index.ts @@ -2,3 +2,4 @@ export * from "./math"; export * from "./milestone"; export * from "./randomID"; export * from "./time"; +export * from "./type"; \ No newline at end of file diff --git a/packages/core/lib/type.ts b/packages/core/lib/type.ts new file mode 100644 index 0000000..c68be82 --- /dev/null +++ b/packages/core/lib/type.ts @@ -0,0 +1 @@ +export type PartialBy = Omit & Partial>; \ No newline at end of file diff --git a/packages/crawler/db/bilibili_metadata.ts b/packages/crawler/db/bilibili_metadata.ts index c009bdd..b79b92e 100644 --- a/packages/crawler/db/bilibili_metadata.ts +++ b/packages/crawler/db/bilibili_metadata.ts @@ -1,67 +1,95 @@ -import type { Psql } from "@core/db/psql.d"; -import { BiliUserType, BiliVideoMetadataType } from "@core/db/schema"; +import { + bilibiliMetadata, + BilibiliMetadataType, + bilibiliUser, + db, + labellingResult +} from "@core/drizzle"; import { AkariModelVersion } from "ml/const"; +import { eq, isNull } from "drizzle-orm"; +import { PartialBy } from "@core/lib"; -export async function videoExistsInAllData(sql: Psql, aid: number) { - const rows = await sql<{ exists: boolean }[]>` - SELECT EXISTS(SELECT 1 FROM bilibili_metadata WHERE aid = ${aid}) - `; - return rows[0].exists; +export async function insertIntoMetadata( + data: PartialBy +) { + await db.insert(bilibiliMetadata).values(data); } -export async function userExistsInBiliUsers(sql: Psql, uid: number) { - const rows = await sql<{ exists: boolean }[]>` - SELECT EXISTS(SELECT 1 FROM bilibili_user WHERE uid = ${uid}) - `; - return rows[0].exists; +export async function videoExistsInAllData(aid: number) { + const rows = await db + .select({ + id: bilibiliMetadata.id + }) + .from(bilibiliMetadata) + .where(eq(bilibiliMetadata.aid, aid)) + .limit(1); + + return rows.length > 0; } -export async function getUnlabelledVideos(sql: Psql) { - const rows = await sql<{ aid: number }[]>` - SELECT a.aid FROM bilibili_metadata a LEFT JOIN labelling_result l ON a.aid = l.aid WHERE l.aid IS NULL - `; +export async function userExistsInBiliUsers(uid: number) { + const rows = await db + .select({ id: bilibiliUser.id }) + .from(bilibiliUser) + .where(eq(bilibiliUser.uid, uid)) + .limit(1); + + return rows.length > 0; +} + +export async function getUnlabelledVideos() { + const rows = await db + .select({ aid: bilibiliMetadata.aid }) + .from(bilibiliMetadata) + .leftJoin(labellingResult, eq(bilibiliMetadata.aid, labellingResult.aid)) + .where(isNull(labellingResult.aid)); + return rows.map((row) => row.aid); } -export async function insertVideoLabel(sql: Psql, aid: number, label: number) { - await sql` - INSERT INTO labelling_result (aid, label, model_version) VALUES (${aid}, ${label}, ${AkariModelVersion}) ON CONFLICT (aid, model_version) DO NOTHING - `; +export async function insertVideoLabel(aid: number, label: number) { + await db + .insert(labellingResult) + .values({ + aid, + label, + modelVersion: AkariModelVersion + }) + .onConflictDoNothing({ + target: [labellingResult.aid, labellingResult.modelVersion] + }); } -export async function getVideoInfoFromAllData(sql: Psql, aid: number) { - const rows = await sql` - SELECT * FROM bilibili_metadata WHERE aid = ${aid} - `; - const row = rows[0]; - let authorInfo = ""; - if (row.uid && (await userExistsInBiliUsers(sql, row.uid))) { - const userRows = await sql` - SELECT * FROM bilibili_user WHERE uid = ${row.uid} - `; - const userRow = userRows[0]; - if (userRow) { - authorInfo = userRow.desc; - } +export async function getVideoInfoFromAllData(aid: number) { + const rows = await db + .select() + .from(bilibiliMetadata) + .where(eq(bilibiliMetadata.aid, aid)) + .limit(1); + + if (rows.length === 0) { + return null; } + + const row = rows[0]; return { title: row.title, description: row.description, - tags: row.tags, - author_info: authorInfo + tags: row.tags }; } -export async function setBiliVideoStatus(sql: Psql, aid: number, status: number) { - await sql` - UPDATE bilibili_metadata SET status = ${status} WHERE aid = ${aid} - `; +export async function setBiliVideoStatus(aid: number, status: number) { + await db.update(bilibiliMetadata).set({ status }).where(eq(bilibiliMetadata.aid, aid)); } -export async function getBiliVideoStatus(sql: Psql, aid: number) { - const rows = await sql<{ status: number }[]>` - SELECT status FROM bilibili_metadata WHERE aid = ${aid} - `; +export async function getBiliVideoStatus(aid: number) { + const rows = await db + .select({ status: bilibiliMetadata.status }) + .from(bilibiliMetadata) + .where(eq(bilibiliMetadata.aid, aid)) + .limit(1); + if (rows.length === 0) return 0; return rows[0].status; } diff --git a/packages/crawler/db/snapshot.ts b/packages/crawler/db/snapshot.ts index b528baf..3b6df94 100644 --- a/packages/crawler/db/snapshot.ts +++ b/packages/crawler/db/snapshot.ts @@ -1,30 +1,56 @@ import { LatestSnapshotType } from "@core/db/schema"; import { SnapshotNumber } from "mq/task/getVideoStats"; import type { Psql } from "@core/db/psql.d"; +import { + db, + eta, + latestVideoSnapshot as lv, + songs, + videoSnapshot, + VideoSnapshotType +} from "@core/drizzle"; +import { PartialBy } from "@core/lib"; +import { and, eq, getTableColumns, gte, lte, lt, or } from "drizzle-orm"; +import { union } from "drizzle-orm/pg-core"; +import { sql } from "drizzle-orm"; -export async function getVideosNearMilestone(sql: Psql) { - const queryResult = await sql` - SELECT ls.* - FROM latest_video_snapshot ls - RIGHT JOIN songs ON songs.aid = ls.aid - WHERE - (views >= 60000 AND views < 100000) OR - (views >= 900000 AND views < 1000000) OR - views > 1000000 - UNION - SELECT ls.* - FROM latest_video_snapshot ls - WHERE - (views >= 90000 AND views < 100000) OR - (views >= 900000 AND views < 1000000) OR - (views >= CEIL(views::float/1000000::float)*1000000-100000 AND views < CEIL(views::float/1000000::float)*1000000) - UNION - SELECT ls.* - FROM latest_video_snapshot ls - JOIN eta ON eta.aid = ls.aid - WHERE eta.eta < 2300 - `; - return queryResult.map((row) => { +export async function insertVideoSnapshot(data: PartialBy) { + await db.insert(videoSnapshot).values(data); +} + +export async function getVideosNearMilestone() { + const results = await union( + db + .select({ ...getTableColumns(lv) }) + .from(lv) + .rightJoin(songs, eq(lv.aid, songs.aid)) + .where( + or( + and(gte(lv.views, 60000), lt(lv.views, 100000)), + and(gte(lv.views, 900000), lt(lv.views, 1000000)), + gte(lv.views, 1000000) + ) + ), + db + .select({ ...getTableColumns(lv) }) + .from(lv) + .where( + or( + and(gte(lv.views, 60000), lt(lv.views, 100000)), + and(gte(lv.views, 900000), lt(lv.views, 1000000)), + and( + sql`views >= CEIL(views::float/1000000::float)*1000000-100000`, + sql`views < CEIL(views::float/1000000::float)*1000000)` + ) + ) + ), + db + .select({ ...getTableColumns(lv) }) + .from(lv) + .innerJoin(eta, eq(lv.aid, eta.aid)) + .where(lte(eta.eta, 2300)) + ); + return results.map((row) => { return { ...row, aid: Number(row.aid) @@ -32,7 +58,10 @@ export async function getVideosNearMilestone(sql: Psql) { }); } -export async function getLatestVideoSnapshot(sql: Psql, aid: number): Promise { +export async function getLatestVideoSnapshot( + sql: Psql, + aid: number +): Promise { const queryResult = await sql` SELECT * FROM latest_video_snapshot diff --git a/packages/crawler/ml/akari_api.ts b/packages/crawler/ml/akari_api.ts index c00792e..1705d5f 100644 --- a/packages/crawler/ml/akari_api.ts +++ b/packages/crawler/ml/akari_api.ts @@ -3,63 +3,63 @@ import logger from "@core/log"; import { WorkerError } from "mq/schema"; class AkariAPI { - private readonly serviceReady: Promise; + private readonly serviceReady: Promise; - constructor() { - // Wait for the ML API service to be ready on startup - this.serviceReady = apiManager.waitForService(); - } + constructor() { + // Wait for the ML API service to be ready on startup + this.serviceReady = apiManager.waitForService(); + } - public async init(): Promise { - const isReady = await this.serviceReady; - if (!isReady) { - throw new WorkerError( - new Error("ML API service failed to become ready"), - "ml", - "fn:init" - ); - } - logger.log("Akari API initialized successfully", "ml"); - } + public async init(): Promise { + const isReady = await this.serviceReady; + if (!isReady) { + throw new WorkerError( + new Error("ML API service failed to become ready"), + "ml", + "fn:init" + ); + } + logger.log("Akari API initialized successfully", "ml"); + } - public async classifyVideo( - title: string, - description: string, - tags: string, - aid?: number - ): Promise { - try { - // Ensure service is ready - await this.serviceReady; - - const label = await apiManager.classifyVideo(title, description, tags, aid); - return label; - } catch (error) { - logger.error(`Classification failed for aid ${aid}: ${error}`, "ml"); - throw new WorkerError(error as Error, "ml", "fn:classifyVideo"); - } - } + public async classifyVideo( + title: string, + description: string, + tags: string, + aid?: number + ): Promise { + try { + // Ensure service is ready + await this.serviceReady; - public async classifyVideosBatch( - videos: Array<{ title: string; description: string; tags: string; aid?: number }> - ): Promise> { - try { - // Ensure service is ready - await this.serviceReady; - - const results = await apiManager.classifyVideosBatch(videos); - return results; - } catch (error) { - logger.error(`Batch classification failed: ${error}`, "ml"); - throw new WorkerError(error as Error, "ml", "fn:classifyVideosBatch"); - } - } + const label = await apiManager.classifyVideo(title, description, tags, aid); + return label; + } catch (error) { + logger.error(`Classification failed for aid ${aid}: ${error}`, "ml"); + throw new WorkerError(error as Error, "ml", "fn:classifyVideo"); + } + } - public async healthCheck(): Promise { - return await apiManager.healthCheck(); - } + public async classifyVideosBatch( + videos: Array<{ title: string; description: string; tags: string; aid?: number }> + ): Promise> { + try { + // Ensure service is ready + await this.serviceReady; + + const results = await apiManager.classifyVideosBatch(videos); + return results; + } catch (error) { + logger.error(`Batch classification failed: ${error}`, "ml"); + throw new WorkerError(error as Error, "ml", "fn:classifyVideosBatch"); + } + } + + public async healthCheck(): Promise { + return await apiManager.healthCheck(); + } } // Create a singleton instance const Akari = new AkariAPI(); -export default Akari; \ No newline at end of file +export default Akari; diff --git a/packages/crawler/ml/api_manager.ts b/packages/crawler/ml/api_manager.ts index 0904a12..a461c2c 100644 --- a/packages/crawler/ml/api_manager.ts +++ b/packages/crawler/ml/api_manager.ts @@ -2,145 +2,145 @@ import logger from "@core/log"; import { WorkerError } from "mq/schema"; interface ClassificationRequest { - title: string; - description: string; - tags: string; - aid?: number; + title: string; + description: string; + tags: string; + aid?: number; } interface ClassificationResponse { - label: number; - probabilities: number[]; - aid?: number; + label: number; + probabilities: number[]; + aid?: number; } interface HealthResponse { - status: string; - models_loaded: boolean; + status: string; + models_loaded: boolean; } export class APIManager { - private readonly baseUrl: string; - private readonly timeout: number; + private readonly baseUrl: string; + private readonly timeout: number; - constructor(baseUrl: string = "http://localhost:8544", timeout: number = 30000) { - this.baseUrl = baseUrl; - this.timeout = timeout; - } + constructor(baseUrl: string = "http://localhost:8544", timeout: number = 30000) { + this.baseUrl = baseUrl; + this.timeout = timeout; + } - public async healthCheck(): Promise { - try { - const response = await fetch(`${this.baseUrl}/health`, { - method: 'GET', - headers: { - 'Content-Type': 'application/json', - }, - signal: AbortSignal.timeout(this.timeout), - }); + public async healthCheck(): Promise { + try { + const response = await fetch(`${this.baseUrl}/health`, { + method: "GET", + headers: { + "Content-Type": "application/json" + }, + signal: AbortSignal.timeout(this.timeout) + }); - if (!response.ok) { - throw new Error(`HTTP ${response.status}: ${response.statusText}`); - } + if (!response.ok) { + throw new Error(`HTTP ${response.status}: ${response.statusText}`); + } - const data: HealthResponse = await response.json(); - return data.models_loaded; - } catch (error) { - logger.error(`Health check failed: ${error}`, "ml"); - return false; - } - } + const data: HealthResponse = await response.json(); + return data.models_loaded; + } catch (error) { + logger.error(`Health check failed: ${error}`, "ml"); + return false; + } + } - public async classifyVideo( - title: string, - description: string, - tags: string, - aid?: number - ): Promise { - const request: ClassificationRequest = { - title: title.trim() || "untitled", - description: description.trim() || "N/A", - tags: tags.trim() || "empty", - aid: aid - }; + public async classifyVideo( + title: string, + description: string, + tags: string, + aid?: number + ): Promise { + const request: ClassificationRequest = { + title: title.trim() || "untitled", + description: description.trim() || "N/A", + tags: tags.trim() || "empty", + aid: aid + }; - try { - const response = await fetch(`${this.baseUrl}/classify`, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - body: JSON.stringify(request), - signal: AbortSignal.timeout(this.timeout), - }); + try { + const response = await fetch(`${this.baseUrl}/classify`, { + method: "POST", + headers: { + "Content-Type": "application/json" + }, + body: JSON.stringify(request), + signal: AbortSignal.timeout(this.timeout) + }); - if (!response.ok) { - throw new Error(`HTTP ${response.status}: ${response.statusText}`); - } + if (!response.ok) { + throw new Error(`HTTP ${response.status}: ${response.statusText}`); + } - const data: ClassificationResponse = await response.json(); - - if (aid) { - logger.log( - `Prediction result for aid: ${aid}: [${data.probabilities.map((p) => p.toFixed(5))}]`, - "ml" - ); - } + const data: ClassificationResponse = await response.json(); - return data.label; - } catch (error) { - logger.error(`Classification failed for aid ${aid}: ${error}`, "ml"); - throw new WorkerError(error as Error, "ml", "fn:classifyVideo"); - } - } + if (aid) { + logger.log( + `Prediction result for aid: ${aid}: [${data.probabilities.map((p) => p.toFixed(5))}]`, + "ml" + ); + } - public async classifyVideosBatch( - requests: Array<{ title: string; description: string; tags: string; aid?: number }> - ): Promise> { - try { - const response = await fetch(`${this.baseUrl}/classify_batch`, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - body: JSON.stringify(requests), - signal: AbortSignal.timeout(this.timeout * 2), // Longer timeout for batch - }); + return data.label; + } catch (error) { + logger.error(`Classification failed for aid ${aid}: ${error}`, "ml"); + throw new WorkerError(error as Error, "ml", "fn:classifyVideo"); + } + } - if (!response.ok) { - throw new Error(`HTTP ${response.status}: ${response.statusText}`); - } + public async classifyVideosBatch( + requests: Array<{ title: string; description: string; tags: string; aid?: number }> + ): Promise> { + try { + const response = await fetch(`${this.baseUrl}/classify_batch`, { + method: "POST", + headers: { + "Content-Type": "application/json" + }, + body: JSON.stringify(requests), + signal: AbortSignal.timeout(this.timeout * 2) // Longer timeout for batch + }); - const data = await response.json(); - return data.results; - } catch (error) { - logger.error(`Batch classification failed: ${error}`, "ml"); - throw new WorkerError(error as Error, "ml", "fn:classifyVideosBatch"); - } - } + if (!response.ok) { + throw new Error(`HTTP ${response.status}: ${response.statusText}`); + } - public async waitForService(timeoutMs: number = 60000): Promise { - const startTime = Date.now(); - const checkInterval = 2000; // Check every 2 seconds + const data = await response.json(); + return data.results; + } catch (error) { + logger.error(`Batch classification failed: ${error}`, "ml"); + throw new WorkerError(error as Error, "ml", "fn:classifyVideosBatch"); + } + } - while (Date.now() - startTime < timeoutMs) { - try { - const isHealthy = await this.healthCheck(); - if (isHealthy) { - logger.log("ML API service is healthy", "ml"); - return true; - } - } catch (error) { - // Service not ready yet, continue waiting - } + public async waitForService(timeoutMs: number = 60000): Promise { + const startTime = Date.now(); + const checkInterval = 2000; // Check every 2 seconds - await new Promise(resolve => setTimeout(resolve, checkInterval)); - } + while (Date.now() - startTime < timeoutMs) { + try { + const isHealthy = await this.healthCheck(); + if (isHealthy) { + logger.log("ML API service is healthy", "ml"); + return true; + } + } catch (error) { + // Service not ready yet, continue waiting + } - logger.error("ML API service did not become ready within timeout", "ml"); - return false; - } + await new Promise((resolve) => setTimeout(resolve, checkInterval)); + } + + logger.error("ML API service did not become ready within timeout", "ml"); + return false; + } } // Create a singleton instance const apiManager = new APIManager(); -export default apiManager; \ No newline at end of file +export default apiManager; diff --git a/packages/crawler/ml/manager.ts b/packages/crawler/ml/manager.ts index eefcbaa..ff6c045 100644 --- a/packages/crawler/ml/manager.ts +++ b/packages/crawler/ml/manager.ts @@ -22,7 +22,11 @@ export class AIManager { public getModelSession(key: string): ort.InferenceSession { if (this.sessions[key] === undefined) { - throw new WorkerError(new Error(`Model ${key} not found / not initialized.`), "ml", "fn:getModelSession"); + throw new WorkerError( + new Error(`Model ${key} not found / not initialized.`), + "ml", + "fn:getModelSession" + ); } return this.sessions[key]; } diff --git a/packages/crawler/mq/exec/classifyVideo.ts b/packages/crawler/mq/exec/classifyVideo.ts index bd0ec44..c57c332 100644 --- a/packages/crawler/mq/exec/classifyVideo.ts +++ b/packages/crawler/mq/exec/classifyVideo.ts @@ -1,5 +1,9 @@ import { Job } from "bullmq"; -import { getUnlabelledVideos, getVideoInfoFromAllData, insertVideoLabel } from "../../db/bilibili_metadata"; +import { + getUnlabelledVideos, + getVideoInfoFromAllData, + insertVideoLabel +} from "../../db/bilibili_metadata"; import Akari from "ml/akari_api"; import { ClassifyVideoQueue } from "mq/index"; import logger from "@core/log"; @@ -16,20 +20,23 @@ export const classifyVideoWorker = async (job: Job) => { return 3; } - const videoInfo = await getVideoInfoFromAllData(sql, aid); - const title = videoInfo.title?.trim() || "untitled"; - const description = videoInfo.description?.trim() || "N/A"; + const videoInfo = await getVideoInfoFromAllData(aid); + if (!videoInfo) { + return 3; + } + const title = videoInfo.title.trim(); + const description = videoInfo.description.trim(); const tags = videoInfo.tags?.trim() || "empty"; const label = await Akari.classifyVideo(title, description, tags, aid); if (label == -1) { logger.warn(`Failed to classify video ${aid}`, "ml"); } - await insertVideoLabel(sql, aid, label); + await insertVideoLabel(aid, label); const exists = await aidExistsInSongs(sql, aid); if (!exists && label !== 0) { await scheduleSnapshot(sql, aid, "new", Date.now() + 10 * MINUTE, true); - await insertIntoSongs(sql, aid); + await insertIntoSongs(aid); } await job.updateData({ @@ -48,7 +55,7 @@ export const classifyVideosWorker = async () => { await lockManager.acquireLock("classifyVideos", 5 * 60); - const videos = await getUnlabelledVideos(sql); + const videos = await getUnlabelledVideos(); logger.log(`Found ${videos.length} unlabelled videos`); const startTime = new Date().getTime(); diff --git a/packages/crawler/mq/exec/collectQueueMetrics.ts b/packages/crawler/mq/exec/collectQueueMetrics.ts index 03dbf9b..0119249 100644 --- a/packages/crawler/mq/exec/collectQueueMetrics.ts +++ b/packages/crawler/mq/exec/collectQueueMetrics.ts @@ -2,17 +2,19 @@ import { queueJobsCounter } from "metrics"; import { SnapshotQueue } from "mq"; export const collectQueueMetrics = async () => { - const counts = await SnapshotQueue.getJobCounts(); + const counts = await SnapshotQueue.getJobCounts(); const waiting = counts?.waiting; const prioritized = counts?.prioritized; - const active = counts?.active; - const completed = counts?.completed; - const failed = counts?.failed; - const delayed = counts?.delayed; - waiting && queueJobsCounter.record(waiting, { queueName: "SnapshotQueue", status: "waiting" }); - prioritized && queueJobsCounter.record(prioritized, { queueName: "SnapshotQueue", status: "prioritized" }); - active && queueJobsCounter.record(active, { queueName: "SnapshotQueue", status: "active" }); - completed && queueJobsCounter.record(completed, { queueName: "SnapshotQueue", status: "completed" }); - failed && queueJobsCounter.record(failed, { queueName: "SnapshotQueue", status: "failed" }); - delayed && queueJobsCounter.record(delayed, { queueName: "SnapshotQueue", status: "delayed" }); -} \ No newline at end of file + const active = counts?.active; + const completed = counts?.completed; + const failed = counts?.failed; + const delayed = counts?.delayed; + waiting && queueJobsCounter.record(waiting, { queueName: "SnapshotQueue", status: "waiting" }); + prioritized && + queueJobsCounter.record(prioritized, { queueName: "SnapshotQueue", status: "prioritized" }); + active && queueJobsCounter.record(active, { queueName: "SnapshotQueue", status: "active" }); + completed && + queueJobsCounter.record(completed, { queueName: "SnapshotQueue", status: "completed" }); + failed && queueJobsCounter.record(failed, { queueName: "SnapshotQueue", status: "failed" }); + delayed && queueJobsCounter.record(delayed, { queueName: "SnapshotQueue", status: "delayed" }); +}; diff --git a/packages/crawler/mq/exec/directSnapshot.ts b/packages/crawler/mq/exec/directSnapshot.ts index c288591..f6e1f57 100644 --- a/packages/crawler/mq/exec/directSnapshot.ts +++ b/packages/crawler/mq/exec/directSnapshot.ts @@ -4,14 +4,14 @@ import { sql } from "@core/db/dbNew"; import { lockManager } from "@core/mq/lockManager"; export const directSnapshotWorker = async (job: Job): Promise => { - const lock = await lockManager.isLocked(`directSnapshot-${job.data.aid}`); - if (lock) { - return; - } + const lock = await lockManager.isLocked(`directSnapshot-${job.data.aid}`); + if (lock) { + return; + } const aid = job.data.aid; if (!aid) { throw new Error("aid does not exists"); } await insertVideoSnapshot(sql, aid, "snapshotMilestoneVideo"); - await lockManager.acquireLock(`directSnapshot-${job.data.aid}`, 75); + await lockManager.acquireLock(`directSnapshot-${job.data.aid}`, 75); }; diff --git a/packages/crawler/mq/exec/dispatchRegularSnapshots.ts b/packages/crawler/mq/exec/dispatchRegularSnapshots.ts index d0ef8ba..bfc744a 100644 --- a/packages/crawler/mq/exec/dispatchRegularSnapshots.ts +++ b/packages/crawler/mq/exec/dispatchRegularSnapshots.ts @@ -1,7 +1,10 @@ import { Job } from "bullmq"; import { getLatestVideoSnapshot } from "db/snapshot"; import { truncate } from "utils/truncate"; -import { getVideosWithoutActiveSnapshotScheduleByType, scheduleSnapshot } from "db/snapshotSchedule"; +import { + getVideosWithoutActiveSnapshotScheduleByType, + scheduleSnapshot +} from "db/snapshotSchedule"; import logger from "@core/log"; import { HOUR, MINUTE, WEEK } from "@core/lib"; import { lockManager } from "@core/mq/lockManager"; @@ -25,7 +28,11 @@ export const dispatchRegularSnapshotsWorker = async (_job: Job): Promise = const lastSnapshotedAt = latestSnapshot?.time ?? now; const interval = await getRegularSnapshotInterval(sql, aid); logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq"); - const targetTime = truncate(lastSnapshotedAt + interval * HOUR, now + 1, now + 100000 * WEEK); + const targetTime = truncate( + lastSnapshotedAt + interval * HOUR, + now + 1, + now + 100000 * WEEK + ); await scheduleSnapshot(sql, aid, "normal", targetTime); if (now - startedAt > 25 * MINUTE) { return; diff --git a/packages/crawler/mq/exec/getVideoInfo.ts b/packages/crawler/mq/exec/getVideoInfo.ts index 403c7df..73ca304 100644 --- a/packages/crawler/mq/exec/getVideoInfo.ts +++ b/packages/crawler/mq/exec/getVideoInfo.ts @@ -1,14 +1,109 @@ import { Job } from "bullmq"; -import { insertVideoInfo } from "mq/task/getVideoDetails"; +import { getVideoDetails } from "net/getVideoDetails"; import logger from "@core/log"; -import { sql } from "@core/db/dbNew"; +import { ClassifyVideoQueue, latestVideosEventsProducer } from "mq/index"; +import { + insertIntoMetadata, + userExistsInBiliUsers, + videoExistsInAllData +} from "db/bilibili_metadata"; +import { insertIntoSongs } from "mq/task/collectSongs"; +import { bilibiliUser, db, videoSnapshot } from "@core/drizzle"; +import { eq } from "drizzle-orm"; +import { GetVideoInfoJobData } from "mq/schema"; -export const getVideoInfoWorker = async (job: Job): Promise => { +interface AddSongEventPayload { + eventName: string; + uid: string; + songID: number; +} + +const publishAddsongEvent = async (songID: number, uid: string) => + latestVideosEventsProducer.publishEvent({ + eventName: "addSong", + uid: uid, + songID: songID + }); + +export const getVideoInfoWorker = async (job: Job): Promise => { const aid = job.data.aid; - const insert = job.data.insertSongs || false; + const insertSongs = job.data.insertSongs || false; if (!aid) { logger.warn("aid does not exists", "mq", "job:getVideoInfo"); return; } - await insertVideoInfo(sql, aid, insert); + const videoExists = await videoExistsInAllData(aid); + if (videoExists && !insertSongs) { + return; + } + if (videoExists && insertSongs) { + const songs = await insertIntoSongs(aid); + if (songs.length === 0) { + logger.warn(`Failed to insert song for aid: ${aid}`, "mq", "fn:getVideoInfoWorker"); + return; + } + await publishAddsongEvent(songs[0].id, job.data.uid); + return; + } + const data = await getVideoDetails(aid); + if (data === null) { + return null; + } + + const uid = data.View.owner.mid; + + await insertIntoMetadata({ + aid, + bvid: data.View.bvid, + description: data.View.desc, + uid: uid, + tags: data.Tags.filter((tag) => !["old_channel", "topic"].indexOf(tag.tag_type)) + .map((tag) => tag.tag_name) + .join(","), + title: data.View.title, + publishedAt: new Date(data.View.pubdate).toISOString(), + duration: data.View.duration, + coverUrl: data.View.pic + }); + + const userExists = await userExistsInBiliUsers(aid); + if (!userExists) { + await db.insert(bilibiliUser).values({ + uid, + username: data.View.owner.name, + desc: data.Card.card.sign, + fans: data.Card.follower + }); + } else { + await db + .update(bilibiliUser) + .set({ username: data.View.owner.name, desc: data.Card.card.sign }) + .where(eq(bilibiliUser.uid, uid)); + } + + const stat = data.View.stat; + + await db.insert(videoSnapshot).values({ + aid, + views: stat.view, + danmakus: stat.danmaku, + replies: stat.reply, + likes: stat.like, + coins: stat.coin, + shares: stat.share, + favorites: stat.favorite + }); + + logger.log(`Inserted video metadata for aid: ${aid}`, "mq"); + + if (!insertSongs) { + await ClassifyVideoQueue.add("classifyVideo", { aid }); + return; + } + const songs = await insertIntoSongs(aid); + if (songs.length === 0) { + logger.warn(`Failed to insert song for aid: ${aid}`, "mq", "fn:getVideoInfoWorker"); + return; + } + await publishAddsongEvent(songs[0].id, job.data.uid); }; diff --git a/packages/crawler/mq/index.ts b/packages/crawler/mq/index.ts index 0d90829..00f715a 100644 --- a/packages/crawler/mq/index.ts +++ b/packages/crawler/mq/index.ts @@ -1,5 +1,5 @@ -import { Queue, ConnectionOptions } from "bullmq"; -import { redis } from "@core/db/redis"; +import { Queue, ConnectionOptions, QueueEventsProducer } from "bullmq"; +import { redis } from "bun"; export const LatestVideosQueue = new Queue("latestVideos", { connection: redis as ConnectionOptions @@ -15,4 +15,8 @@ export const SnapshotQueue = new Queue("snapshot", { export const MiscQueue = new Queue("misc", { connection: redis as ConnectionOptions -}); \ No newline at end of file +}); + +export const latestVideosEventsProducer = new QueueEventsProducer("latestVideos", { + connection: redis as ConnectionOptions +}); diff --git a/packages/crawler/mq/schema.ts b/packages/crawler/mq/schema.ts index 07e4033..53bef2f 100644 --- a/packages/crawler/mq/schema.ts +++ b/packages/crawler/mq/schema.ts @@ -10,3 +10,9 @@ export class WorkerError extends Error { this.rawError = rawError; } } + +export interface GetVideoInfoJobData { + aid: number; + insertSongs?: boolean; + uid?: string; +} diff --git a/packages/crawler/mq/task/collectSongs.ts b/packages/crawler/mq/task/collectSongs.ts index e16f575..894fb79 100644 --- a/packages/crawler/mq/task/collectSongs.ts +++ b/packages/crawler/mq/task/collectSongs.ts @@ -4,49 +4,53 @@ import logger from "@core/log"; import { scheduleSnapshot } from "db/snapshotSchedule"; import { MINUTE } from "@core/lib"; import type { Psql } from "@core/db/psql.d"; +import { db, songs } from "@core/drizzle"; +import { and, eq, sql as drizzleSQL } from "drizzle-orm"; export async function collectSongs() { const aids = await getNotCollectedSongs(sql); for (const aid of aids) { const exists = await aidExistsInSongs(sql, aid); if (exists) continue; - await insertIntoSongs(sql, aid); + await insertIntoSongs(aid); await scheduleSnapshot(sql, aid, "new", Date.now() + 10 * MINUTE, true); logger.log(`Video ${aid} was added into the songs table.`, "mq", "fn:collectSongs"); } } -export async function insertIntoSongs(sql: Psql, aid: number) { - const songExistsAndDeleted = await sql` - SELECT EXISTS ( - SELECT 1 - FROM songs - WHERE aid = ${aid} - AND deleted = true - ); - `; - if (songExistsAndDeleted[0].exists) { - await sql` - UPDATE songs - SET deleted = false - WHERE aid = ${aid} - `; +export async function insertIntoSongs(aid: number) { + const song = await db + .select({ id: songs.id }) + .from(songs) + .where(and(eq(songs.aid, aid), eq(songs.deleted, true))) + .limit(1); + const songExistsAndDeleted = song.length > 0; + + if (songExistsAndDeleted) { + const data = await db + .update(songs) + .set({ deleted: false }) + .where(eq(songs.id, song[0].id)) + .returning(); + return data; } - await sql` - INSERT INTO songs (aid, published_at, duration, image, producer) - VALUES ( - $1, - (SELECT published_at FROM bilibili_metadata WHERE aid = ${aid}), - (SELECT duration FROM bilibili_metadata WHERE aid = ${aid}), - (SELECT cover_url FROM bilibili_metadata WHERE aid = ${aid}), - ( - SELECT username - FROM bilibili_user bu - JOIN bilibili_metadata bm - ON bm.uid = bu.uid - WHERE bm.aid = ${aid} - ) - ) - ON CONFLICT DO NOTHING - `; + const data = await db + .insert(songs) + .values({ + aid, + publishedAt: drizzleSQL`SELECT published_at FROM bilibili_metadata WHERE aid = ${aid}`, + duration: drizzleSQL`SELECT duration FROM bilibili_metadata WHERE aid = ${aid}`, + image: drizzleSQL`SELECT cover_url FROM bilibili_metadata WHERE aid = ${aid}`, + producer: drizzleSQL` + SELECT username + FROM bilibili_user bu + JOIN bilibili_metadata bm + ON bm.uid = bu.uid + WHERE bm.aid = ${aid} + ` + }) + .onConflictDoNothing() + .returning(); + + return data; } diff --git a/packages/crawler/mq/task/getVideoDetails.ts b/packages/crawler/mq/task/getVideoDetails.ts deleted file mode 100644 index 411482b..0000000 --- a/packages/crawler/mq/task/getVideoDetails.ts +++ /dev/null @@ -1,72 +0,0 @@ -import { getVideoDetails } from "net/getVideoDetails"; -import { formatTimestampToPsql } from "utils/formatTimestampToPostgre"; -import logger from "@core/log"; -import { ClassifyVideoQueue } from "mq/index"; -import { userExistsInBiliUsers, videoExistsInAllData } from "db/bilibili_metadata"; -import { HOUR, SECOND } from "@core/lib"; -import type { Psql } from "@core/db/psql.d"; -import { insertIntoSongs } from "./collectSongs"; - -export async function insertVideoInfo(sql: Psql, aid: number, insertSongs = false) { - const videoExists = await videoExistsInAllData(sql, aid); - if (videoExists && !insertSongs) { - return; - } - if (videoExists && insertSongs) { - await insertIntoSongs(sql, aid); - return; - } - const data = await getVideoDetails(aid); - if (data === null) { - return null; - } - const bvid = data.View.bvid; - const desc = data.View.desc; - const uid = data.View.owner.mid; - const tags = data.Tags.filter((tag) => !["old_channel", "topic"].indexOf(tag.tag_type)) - .map((tag) => tag.tag_name) - .join(","); - const title = data.View.title; - const published_at = formatTimestampToPsql(data.View.pubdate * SECOND + 8 * HOUR); - const duration = data.View.duration; - const cover = data.View.pic; - await sql` - INSERT INTO bilibili_metadata (aid, bvid, description, uid, tags, title, published_at, duration, cover_url) - VALUES (${aid}, ${bvid}, ${desc}, ${uid}, ${tags}, ${title}, ${published_at}, ${duration}, ${cover}) - `; - const userExists = await userExistsInBiliUsers(sql, aid); - if (!userExists) { - await sql` - INSERT INTO bilibili_user (uid, username, "desc", fans) - VALUES (${uid}, ${data.View.owner.name}, ${data.Card.card.sign}, ${data.Card.follower}) - `; - } else { - await sql` - UPDATE bilibili_user SET fans = ${data.Card.follower} WHERE uid = ${uid} - `; - } - - const stat = data.View.stat; - - await sql` - INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites) - VALUES ( - ${aid}, - ${stat.view}, - ${stat.danmaku}, - ${stat.reply}, - ${stat.like}, - ${stat.coin}, - ${stat.share}, - ${stat.favorite} - ) - `; - - logger.log(`Inserted video metadata for aid: ${aid}`, "mq"); - - if (!insertSongs) { - await ClassifyVideoQueue.add("classifyVideo", { aid }); - return; - } - await insertIntoSongs(sql, aid); -} diff --git a/packages/crawler/mq/task/queueLatestVideo.ts b/packages/crawler/mq/task/queueLatestVideo.ts index 45d0e02..5aad8d2 100644 --- a/packages/crawler/mq/task/queueLatestVideo.ts +++ b/packages/crawler/mq/task/queueLatestVideo.ts @@ -20,7 +20,7 @@ export async function queueLatestVideos(sql: Psql): Promise { let allExists = true; let delay = 0; for (const aid of aids) { - const videoExists = await videoExistsInAllData(sql, aid); + const videoExists = await videoExistsInAllData(aid); if (videoExists) { continue; } diff --git a/packages/crawler/net/getLatestVideoAids.ts b/packages/crawler/net/getLatestVideoAids.ts index bbb8673..65137e3 100644 --- a/packages/crawler/net/getLatestVideoAids.ts +++ b/packages/crawler/net/getLatestVideoAids.ts @@ -2,7 +2,10 @@ import type { VideoListResponse } from "@core/net/bilibili.d"; import logger from "@core/log"; import networkDelegate from "@core/net/delegate"; -export async function getLatestVideoAids(page: number = 1, pageSize: number = 10): Promise { +export async function getLatestVideoAids( + page: number = 1, + pageSize: number = 10 +): Promise { const startFrom = 1 + pageSize * (page - 1); const endTo = pageSize * page; const range = `${startFrom}-${endTo}`; diff --git a/packages/crawler/net/getVideoDetails.ts b/packages/crawler/net/getVideoDetails.ts index 414b421..67f5b5d 100644 --- a/packages/crawler/net/getVideoDetails.ts +++ b/packages/crawler/net/getVideoDetails.ts @@ -2,9 +2,15 @@ import networkDelegate from "@core/net/delegate"; import type { VideoDetailsData, VideoDetailsResponse } from "@core/net/bilibili.d"; import logger from "@core/log"; -export async function getVideoDetails(aid: number, archive: boolean = false): Promise { +export async function getVideoDetails( + aid: number, + archive: boolean = false +): Promise { const url = `https://api.bilibili.com/x/web-interface/view/detail?aid=${aid}`; - const { data } = await networkDelegate.request(url, archive ? "" : "getVideoInfo"); + const { data } = await networkDelegate.request( + url, + archive ? "" : "getVideoInfo" + ); const errMessage = `Error fetching metadata for ${aid}:`; if (data.code !== 0) { logger.error(errMessage + data.code + "-" + data.message, "net", "fn:getVideoInfo"); diff --git a/packages/crawler/src/filterWorker.ts b/packages/crawler/src/filterWorker.ts index 4a2983a..ae32204 100644 --- a/packages/crawler/src/filterWorker.ts +++ b/packages/crawler/src/filterWorker.ts @@ -12,7 +12,7 @@ const shutdown = async (signal: string, filterWorker: Worker) process.exit(0); }; -await Akari.init() +await Akari.init(); const filterWorker = new Worker( "classifyVideo", diff --git a/packages/crawler/src/worker.ts b/packages/crawler/src/worker.ts index 110e4ce..c07e90a 100644 --- a/packages/crawler/src/worker.ts +++ b/packages/crawler/src/worker.ts @@ -78,7 +78,7 @@ const snapshotWorker = new Worker( "snapshot", async (job: Job) => { switch (job.name) { - case "directSnapshot": + case "directSnapshot": return await directSnapshotWorker(job); case "snapshotVideo": return await snapshotVideoWorker(job); @@ -124,4 +124,4 @@ const miscWorker = new Worker( miscWorker.on("error", (err) => { const e = err as WorkerError; logger.error(e.rawError, e.service, e.codePath); -}); \ No newline at end of file +});