1
0

ref: switch to drizzle, add history

This commit is contained in:
alikia2x (寒寒) 2025-11-16 20:14:13 +08:00
parent 965fe5ddc9
commit 30f8a2ffe8
WARNING! Although there is a key with this ID in the database it does not verify this commit! This commit is SUSPICIOUS.
GPG Key ID: 56209E0CCD8420C6
51 changed files with 1051 additions and 1357 deletions

View File

@ -1,7 +1,6 @@
import Argon2id from "@rabbit-company/argon2id";
import { dbMain } from "@core/drizzle";
import { usersInCredentials, loginSessionsInCredentials } from "@core/drizzle/main/schema";
import { eq, and, isNull } from "drizzle-orm";
import { db, usersInCredentials, loginSessionsInCredentials } from "@core/drizzle";
import { eq, and, isNull, getTableColumns } from "drizzle-orm";
import { generate as generateId } from "@alikia/random-key";
import logger from "@core/log";
@ -10,10 +9,11 @@ export interface User {
username: string;
nickname: string | null;
role: string;
unqId: string;
}
export async function verifyUser(username: string, password: string): Promise<User | null> {
const user = await dbMain
const user = await db
.select()
.from(usersInCredentials)
.where(eq(usersInCredentials.username, username))
@ -33,7 +33,8 @@ export async function verifyUser(username: string, password: string): Promise<Us
id: foundUser.id,
username: foundUser.username,
nickname: foundUser.nickname,
role: foundUser.role
role: foundUser.role,
unqId: foundUser.unqId
};
}
@ -48,7 +49,7 @@ export async function createSession(
expireAt.setDate(expireAt.getDate() + expiresInDays);
try {
await dbMain.insert(loginSessionsInCredentials).values({
await db.insert(loginSessionsInCredentials).values({
id: sessionId,
uid: userId,
ipAddress,
@ -67,9 +68,13 @@ export async function createSession(
export async function validateSession(
sessionId: string
): Promise<{ user: User; session: any } | null> {
const session = await dbMain
.select()
const session = await db
.select({
...getTableColumns(usersInCredentials),
...getTableColumns(loginSessionsInCredentials)
})
.from(loginSessionsInCredentials)
.innerJoin(usersInCredentials, eq(loginSessionsInCredentials.uid, usersInCredentials.id))
.where(
and(
eq(loginSessionsInCredentials.id, sessionId),
@ -88,7 +93,7 @@ export async function validateSession(
return null;
}
const user = await dbMain
const user = await db
.select()
.from(usersInCredentials)
.where(eq(usersInCredentials.id, foundSession.uid))
@ -98,24 +103,19 @@ export async function validateSession(
return null;
}
await dbMain
await db
.update(loginSessionsInCredentials)
.set({ lastUsedAt: new Date().toISOString() })
.where(eq(loginSessionsInCredentials.id, sessionId));
return {
user: {
id: user[0].id,
username: user[0].username,
nickname: user[0].nickname,
role: user[0].role
},
user: user[0],
session: foundSession
};
}
export async function deactivateSession(sessionId: string): Promise<boolean> {
const result = await dbMain
const result = await db
.update(loginSessionsInCredentials)
.set({
deactivatedAt: new Date().toISOString()

View File

@ -40,13 +40,12 @@ export function detectBiliID(id: string) {
return {
type: "bv" as const,
id: id as `BV1${string}`
}
}
else if (avSchema.safeParse(id).success) {
};
} else if (avSchema.safeParse(id).success) {
return {
type: "av" as const,
id: id as `av${string}`
}
};
}
return null;
}
@ -61,4 +60,4 @@ export function biliIDToAID(id: string) {
} else {
return Number.parseInt(detected.id.slice(2));
}
}
}

View File

@ -7,4 +7,4 @@ export const LatestVideosQueue = new Queue("latestVideos", {
export const SnapshotQueue = new Queue("snapshot", {
connection: redis as ConnectionOptions
});
});

View File

@ -1,34 +1,28 @@
import {
Elysia,
type MapResponse,
type Context,
type TraceEvent,
type TraceProcess
} from 'elysia'
import { Elysia, type MapResponse, type Context, type TraceEvent, type TraceProcess } from "elysia";
type MaybePromise<T> = T | Promise<T>
type MaybePromise<T> = T | Promise<T>;
class TimeLogger {
private startTimes: Map<string, number>
private durations: Map<string, number>
private totalStartTime: number | null
private startTimes: Map<string, number>;
private durations: Map<string, number>;
private totalStartTime: number | null;
constructor() {
this.startTimes = new Map()
this.durations = new Map()
this.totalStartTime = null
this.startTimes = new Map();
this.durations = new Map();
this.totalStartTime = null;
}
startTime(name: string) {
this.startTimes.set(name, performance.now())
this.startTimes.set(name, performance.now());
}
endTime(name: string) {
const startTime = this.startTimes.get(name)
const startTime = this.startTimes.get(name);
if (startTime !== undefined) {
const duration = performance.now() - startTime
this.durations.set(name, duration)
this.startTimes.delete(name)
const duration = performance.now() - startTime;
this.durations.set(name, duration);
this.startTimes.delete(name);
}
}
@ -36,16 +30,16 @@ class TimeLogger {
return Array.from(this.durations.entries()).map(([name, duration]) => ({
name,
duration
}))
}));
}
startTotal(): void {
this.totalStartTime = performance.now()
this.totalStartTime = performance.now();
}
endTotal(): number | null {
if (this.totalStartTime === null) return null
return performance.now() - this.totalStartTime
if (this.totalStartTime === null) return null;
return performance.now() - this.totalStartTime;
}
}
@ -53,7 +47,7 @@ export interface ServerTimingOptions {
/**
* Should Elysia report data back to client via 'Server-Sent-Event'
*/
report?: boolean
report?: boolean;
/**
* Allow Server Timing to log specified life-cycle events
*/
@ -63,107 +57,103 @@ export interface ServerTimingOptions {
*
* @default true
*/
request?: boolean
request?: boolean;
/**
* Capture duration from parse
*
* @default true
*/
parse?: boolean
parse?: boolean;
/**
* Capture duration from transform
*
* @default true
*/
transform?: boolean
transform?: boolean;
/**
* Capture duration from beforeHandle
*
* @default true
*/
beforeHandle?: boolean
beforeHandle?: boolean;
/**
* Capture duration from handle
*
* @default true
*/
handle?: boolean
handle?: boolean;
/**
* Capture duration from afterHandle
*
* @default true
*/
afterHandle?: boolean
afterHandle?: boolean;
/**
* Capture duration from mapResponse
*
* @default true
*/
error?: boolean
error?: boolean;
/**
* Capture duration from mapResponse
*
* @default true
*/
mapResponse?: boolean
mapResponse?: boolean;
/**
* Capture total duration from start to finish
*
* @default true
*/
total?: boolean
}
total?: boolean;
};
/**
* Determine whether or not Server Timing should be enabled
*
* @default NODE_ENV !== 'production'
*/
enabled?: boolean
enabled?: boolean;
/**
* A condition whether server timing should be log
*
* @default undefined
*/
allow?:
| MaybePromise<boolean>
| ((context: Omit<Context, 'path'>) => MaybePromise<boolean>)
allow?: MaybePromise<boolean> | ((context: Omit<Context, "path">) => MaybePromise<boolean>);
/**
* A custom mapResponse provided by user
*
* @default undefined
*/
mapResponse?: MapResponse
mapResponse?: MapResponse;
}
const getLabel = (
event: TraceEvent,
listener: (
callback: (process: TraceProcess<'begin', true>) => unknown
) => unknown,
listener: (callback: (process: TraceProcess<"begin", true>) => unknown) => unknown,
write: (value: string) => void
) => {
listener(async ({ onStop, onEvent, total }) => {
let label = ''
let label = "";
if (total === 0) return
if (total === 0) return;
onEvent(({ name, index, onStop }) => {
onStop(({ elapsed }) => {
label += `${event}.${index}.${name || 'anon'};dur=${elapsed},`
})
})
label += `${event}.${index}.${name || "anon"};dur=${elapsed},`;
});
});
onStop(({ elapsed }) => {
label += `${event};dur=${elapsed},`
label += `${event};dur=${elapsed},`;
write(label)
})
})
}
write(label);
});
});
};
export const serverTiming = ({
allow,
enabled = process.env.NODE_ENV !== 'production',
enabled = process.env.NODE_ENV !== "production",
trace: {
request: traceRequest = true,
parse: traceParse = true,
@ -177,8 +167,8 @@ export const serverTiming = ({
} = {},
mapResponse
}: ServerTimingOptions = {}) => {
const app = new Elysia().decorate('timeLog', new TimeLogger()).trace(
{ as: 'global' },
const app = new Elysia().decorate("timeLog", new TimeLogger()).trace(
{ as: "global" },
async ({
onRequest,
onParse,
@ -195,84 +185,77 @@ export const serverTiming = ({
request: { method }
}
}) => {
if (!enabled) return
let label = ''
if (!enabled) return;
let label = "";
const write = (nextValue: string) => {
label += nextValue
}
label += nextValue;
};
let start: number
let start: number;
onRequest(({ begin }) => {
context.timeLog.startTotal()
start = begin
})
context.timeLog.startTotal();
start = begin;
});
if (traceRequest) getLabel('request', onRequest, write)
if (traceParse) getLabel('parse', onParse, write)
if (traceTransform) getLabel('transform', onTransform, write)
if (traceBeforeHandle)
getLabel('beforeHandle', onBeforeHandle, write)
if (traceAfterHandle) getLabel('afterHandle', onAfterHandle, write)
if (traceError) getLabel('error', onError, write)
if (traceMapResponse) getLabel('mapResponse', onMapResponse, write)
if (traceRequest) getLabel("request", onRequest, write);
if (traceParse) getLabel("parse", onParse, write);
if (traceTransform) getLabel("transform", onTransform, write);
if (traceBeforeHandle) getLabel("beforeHandle", onBeforeHandle, write);
if (traceAfterHandle) getLabel("afterHandle", onAfterHandle, write);
if (traceError) getLabel("error", onError, write);
if (traceMapResponse) getLabel("mapResponse", onMapResponse, write);
if (traceHandle)
onHandle(({ name, onStop }) => {
onStop(({ elapsed }) => {
label += `handle.${name};dur=${elapsed},`
})
})
label += `handle.${name};dur=${elapsed},`;
});
});
onMapResponse(({ onStop }) => {
onStop(async ({ end }) => {
const completedDurations =
context.timeLog.getCompletedDurations()
const completedDurations = context.timeLog.getCompletedDurations();
if (completedDurations.length > 0) {
label +=
completedDurations
.map(
({ name, duration }) =>
`${name};dur=${duration}`
)
.join(', ') + ','
.map(({ name, duration }) => `${name};dur=${duration}`)
.join(", ") + ",";
}
const elapsed = context.timeLog.endTotal();
let allowed = allow
if (allowed instanceof Promise) allowed = await allowed
let allowed = allow;
if (allowed instanceof Promise) allowed = await allowed;
if (traceTotal) label += `total;dur=${elapsed}`
else label = label.slice(0, -1)
if (traceTotal) label += `total;dur=${elapsed}`;
else label = label.slice(0, -1);
// ? Must wait until request is reported
switch (typeof allowed) {
case 'boolean':
if (allowed === false)
delete set.headers['Server-Timing']
case "boolean":
if (allowed === false) delete set.headers["Server-Timing"];
set.headers['Server-Timing'] = label
set.headers["Server-Timing"] = label;
break
break;
case 'function':
case "function":
if ((await allowed(context)) === false)
delete set.headers['Server-Timing']
delete set.headers["Server-Timing"];
set.headers['Server-Timing'] = label
set.headers["Server-Timing"] = label;
break
break;
default:
set.headers['Server-Timing'] = label
set.headers["Server-Timing"] = label;
}
})
})
});
});
}
)
return app
}
);
return app;
};
export default serverTiming
export default serverTiming;

View File

@ -2,60 +2,57 @@ import { Elysia, t } from "elysia";
import { ip } from "elysia-ip";
import { verifyUser, createSession, getSessionExpirationDate } from "@elysia/lib/auth";
export const loginHandler = new Elysia({ prefix: "/auth" })
.use(ip())
.post(
"/session",
async ({ body, set, cookie, ip, request }) => {
const { username, password } = body;
export const loginHandler = new Elysia({ prefix: "/auth" }).use(ip()).post(
"/session",
async ({ body, set, cookie, ip, request }) => {
const { username, password } = body;
const user = await verifyUser(username, password);
if (!user) {
set.status = 401;
return { message: "Invalid credentials." };
}
const userAgent = request.headers.get("user-agent") || "Unknown";
const sessionId = await createSession(user.id, ip || null, userAgent);
const expiresAt = getSessionExpirationDate();
cookie.sessionId.value = sessionId;
cookie.sessionId.httpOnly = true;
cookie.sessionId.secure = process.env.NODE_ENV === 'production';
cookie.sessionId.sameSite = 'strict';
cookie.sessionId.expires = expiresAt;
return {
message: "You are logged in.",
user: {
id: user.id,
username: user.username,
nickname: user.nickname,
role: user.role
},
sessionID: sessionId
};
},
{
response: {
200: t.Object({
message: t.String(),
user: t.Object({
id: t.Integer(),
username: t.String(),
nickname: t.Optional(t.String()),
role: t.String()
}),
sessionID: t.String()
}),
401: t.Object({
message: t.String()
})
},
body: t.Object({
username: t.String(),
password: t.String()
})
const user = await verifyUser(username, password);
if (!user) {
set.status = 401;
return { message: "Invalid credentials." };
}
)
const userAgent = request.headers.get("user-agent") || "Unknown";
const sessionId = await createSession(user.id, ip || null, userAgent);
const expiresAt = getSessionExpirationDate();
cookie.sessionId.value = sessionId;
cookie.sessionId.httpOnly = true;
cookie.sessionId.secure = process.env.NODE_ENV === "production";
cookie.sessionId.sameSite = "strict";
cookie.sessionId.expires = expiresAt;
return {
message: "You are logged in.",
user: {
id: user.id,
username: user.username,
nickname: user.nickname,
role: user.role
},
sessionID: sessionId
};
},
{
response: {
200: t.Object({
message: t.String(),
user: t.Object({
id: t.Integer(),
username: t.String(),
nickname: t.Optional(t.String()),
role: t.String()
}),
sessionID: t.String()
}),
401: t.Object({
message: t.String()
})
},
body: t.Object({
username: t.String(),
password: t.String()
})
}
);

View File

@ -9,54 +9,53 @@ const SingerObj = t.Object({
message: t.Optional(t.String())
});
export const rootHandler = new Elysia()
.get(
"/",
async () => {
let singer: Singer | Singer[];
const shouldShowSpecialSinger = Math.random() < 0.016;
if (getSingerForBirthday().length !== 0) {
singer = JSON.parse(JSON.stringify(getSingerForBirthday())) as Singer[];
for (const s of singer) {
delete s.birthday;
s.message = `${s.name}生日快乐~`;
}
} else if (shouldShowSpecialSinger) {
singer = pickSpecialSinger();
} else {
singer = pickSinger();
}
return {
project: {
name: "中V档案馆",
mascot: "知夏",
quote: "星河知海夏生光"
},
status: 200,
version: VERSION,
time: Date.now(),
singer: singer
};
},
{
response: {
200: t.Object({
project: t.Object({
name: t.String(),
mascot: t.String(),
quote: t.String()
}),
status: t.Number(),
version: t.String(),
time: t.Number(),
singer: t.Union([SingerObj, t.Array(SingerObj)])
})
},
detail: {
summary: "Root route",
description:
"The root path. It returns a JSON object containing a random virtual singer, \
backend version, current server time and other miscellaneous information."
export const rootHandler = new Elysia().get(
"/",
async () => {
let singer: Singer | Singer[];
const shouldShowSpecialSinger = Math.random() < 0.016;
if (getSingerForBirthday().length !== 0) {
singer = JSON.parse(JSON.stringify(getSingerForBirthday())) as Singer[];
for (const s of singer) {
delete s.birthday;
s.message = `${s.name}生日快乐~`;
}
} else if (shouldShowSpecialSinger) {
singer = pickSpecialSinger();
} else {
singer = pickSinger();
}
)
return {
project: {
name: "中V档案馆",
mascot: "知夏",
quote: "星河知海夏生光"
},
status: 200,
version: VERSION,
time: Date.now(),
singer: singer
};
},
{
response: {
200: t.Object({
project: t.Object({
name: t.String(),
mascot: t.String(),
quote: t.String()
}),
status: t.Number(),
version: t.String(),
time: t.Number(),
singer: t.Union([SingerObj, t.Array(SingerObj)])
})
},
detail: {
summary: "Root route",
description:
"The root path. It returns a JSON object containing a random virtual singer, \
backend version, current server time and other miscellaneous information."
}
}
);

View File

@ -1,7 +1,6 @@
import { Elysia } from "elysia";
import { db } from "@core/drizzle";
import { bilibiliMetadata, latestVideoSnapshot, songs } from "@core/drizzle/main/schema";
import { eq, like, or } from "drizzle-orm";
import { db, bilibiliMetadata, latestVideoSnapshot, songs } from "@core/drizzle";
import { eq, like } from "drizzle-orm";
import { BiliAPIVideoMetadataSchema, BiliVideoSchema, SongSchema } from "@elysia/lib/schema";
import { z } from "zod";
import { getVideoInfo } from "@core/net/getVideoInfo";
@ -82,9 +81,8 @@ const getVideoSearchResult = async (searchQuery: string) => {
let data;
const cachedData = await retrieveVideoInfoFromCache(aid);
if (cachedData) {
data = cachedData
}
else {
data = cachedData;
} else {
data = await getVideoInfo(aid, "getVideoInfo");
if (typeof data === "number") return [];
const cacheKey = `cvsa:videoInfo:av${aid}`;

View File

@ -2,15 +2,14 @@ import { Elysia, t } from "elysia";
import { biliIDToAID } from "@elysia/lib/bilibiliID";
import { requireAuth } from "@elysia/middlewares/auth";
import { LatestVideosQueue } from "@elysia/lib/mq";
import { db } from "@core/drizzle";
import { songs } from "@core/drizzle/main/schema";
import { db, songs } from "@core/drizzle";
import { eq, and } from "drizzle-orm";
export const addSongHandler = new Elysia()
.use(requireAuth)
.post(
"/song/import/bilibili",
async ({ body, status }) => {
async ({ body, status, user }) => {
const id = body.id;
const aid = biliIDToAID(id);
if (!aid) {
@ -32,7 +31,8 @@ export const addSongHandler = new Elysia()
}
const job = await LatestVideosQueue.add("getVideoInfo", {
aid: aid,
insertSongs: true
insertSongs: true,
uid: user!.unqId
});
if (!job.id) {
return status(500, {
@ -76,7 +76,7 @@ export const addSongHandler = new Elysia()
result: {
message: "Video already exists in the songs table."
}
}
};
}
const job = await LatestVideosQueue.getJob(jobID);
if (!job) {

View File

@ -1,14 +1,19 @@
import { Elysia, t } from "elysia";
import { requireAuth } from "@elysia/middlewares/auth";
import { db } from "@core/drizzle";
import { songs } from "@core/drizzle/main/schema";
import { songs, history, db } from "@core/drizzle";
import { eq } from "drizzle-orm";
export const deleteSongHandler = new Elysia({ prefix: "/song" }).use(requireAuth).delete(
"/:id",
async ({ params }) => {
async ({ params, user }) => {
const id = Number(params.id);
await db.update(songs).set({ deleted: true }).where(eq(songs.id, id));
await db.insert(history).values({
objectId: id,
changeType: "del-song",
changedBy: user!.unqId,
data: null
});
return {
message: `Successfully deleted song ${id}.`
};

View File

@ -1,6 +1,5 @@
import { Elysia, t } from "elysia";
import { dbMain } from "@core/drizzle";
import { relations, singer, songs } from "@core/drizzle/main/schema";
import { db, history, songs } from "@core/drizzle";
import { eq, and } from "drizzle-orm";
import { bv2av } from "@elysia/lib/bilibiliID";
import { requireAuth } from "@elysia/middlewares/auth";
@ -14,11 +13,7 @@ async function getSongIDFromBiliID(id: string) {
} else {
return null;
}
const songID = await dbMain
.select({ id: songs.id })
.from(songs)
.where(eq(songs.aid, aid))
.limit(1);
const songID = await db.select({ id: songs.id }).from(songs).where(eq(songs.aid, aid)).limit(1);
if (songID.length > 0) {
return songID[0].id;
}
@ -38,7 +33,7 @@ async function getSongID(id: string) {
}
async function getSongInfo(id: number) {
const songInfo = await dbMain
const songInfo = await db
.select()
.from(songs)
.where(and(eq(songs.id, id), eq(songs.deleted, false)))
@ -46,23 +41,6 @@ async function getSongInfo(id: number) {
return songInfo[0];
}
async function getSingers(id: number) {
const singers = await dbMain
.select({
singers: singer.name
})
.from(relations)
.innerJoin(singer, eq(relations.targetId, singer.id))
.where(
and(
eq(relations.sourceId, id),
eq(relations.sourceType, "song"),
eq(relations.relation, "sing")
)
);
return singers.map((singer) => singer.singers);
}
const songInfoGetHandler = new Elysia({ prefix: "/song" }).get(
"/:id/info",
async ({ params, status }) => {
@ -81,14 +59,12 @@ const songInfoGetHandler = new Elysia({ prefix: "/song" }).get(
message: "Given song cannot be found."
});
}
const singers = await getSingers(info.id);
return {
id: info.id,
name: info.name,
aid: info.aid,
producer: info.producer,
duration: info.duration,
singers: singers,
cover: info.image || undefined,
publishedAt: info.publishedAt
};
@ -101,9 +77,8 @@ const songInfoGetHandler = new Elysia({ prefix: "/song" }).get(
aid: t.Union([t.Number(), t.Null()]),
producer: t.Union([t.String(), t.Null()]),
duration: t.Union([t.Number(), t.Null()]),
singers: t.Array(t.String()),
cover: t.Optional(t.String()),
publishedAt: t.Union([t.String(), t.Null()]),
publishedAt: t.Union([t.String(), t.Null()])
}),
404: t.Object({
code: t.String(),
@ -127,7 +102,7 @@ const songInfoGetHandler = new Elysia({ prefix: "/song" }).get(
const songInfoUpdateHandler = new Elysia({ prefix: "/song" }).use(requireAuth).patch(
"/:id/info",
async ({ params, status, body }) => {
async ({ params, status, body, user }) => {
const id = params.id;
const songID = await getSongID(id);
if (!songID) {
@ -145,16 +120,22 @@ const songInfoUpdateHandler = new Elysia({ prefix: "/song" }).use(requireAuth).p
}
if (body.name) {
await dbMain.update(songs).set({ name: body.name }).where(eq(songs.id, songID));
await db.update(songs).set({ name: body.name }).where(eq(songs.id, songID));
}
if (body.producer) {
await dbMain
await db
.update(songs)
.set({ producer: body.producer })
.where(eq(songs.id, songID))
.returning();
}
const updatedData = await dbMain.select().from(songs).where(eq(songs.id, songID));
const updatedData = await db.select().from(songs).where(eq(songs.id, songID));
await db.insert(history).values({
objectId: songID,
changeType: "update-song",
changedBy: user!.unqId,
data: updatedData.length > 0 ? updatedData[0] : null
});
return {
message: "Successfully updated song info.",
updated: updatedData.length > 0 ? updatedData[0] : null

View File

@ -1,7 +1,6 @@
import { Elysia, t } from "elysia";
import { dbMain } from "@core/drizzle";
import { bilibiliMetadata, eta, latestVideoSnapshot } from "@core/drizzle/main/schema";
import { eq, and, gte, lt, desc } from "drizzle-orm";
import { db, bilibiliMetadata, eta } from "@core/drizzle";
import { eq, and, gte, lt } from "drizzle-orm";
import serverTiming from "@elysia/middlewares/timing";
import z from "zod";
import { BiliVideoSchema } from "@elysia/lib/schema";
@ -9,9 +8,9 @@ import { BiliVideoSchema } from "@elysia/lib/schema";
type MileStoneType = "dendou" | "densetsu" | "shinwa";
const range = {
dendou: [90000, 99999, 2160],
densetsu: [900000, 999999, 8760],
shinwa: [5000000, 9999999, 87600]
dendou: [0, 100000, 2160],
densetsu: [100000, 1000000, 8760],
shinwa: [1000000, 10000000, 43800]
};
export const closeMileStoneHandler = new Elysia({ prefix: "/songs" }).use(serverTiming()).get(
@ -21,7 +20,7 @@ export const closeMileStoneHandler = new Elysia({ prefix: "/songs" }).use(server
const type = params.type;
const min = range[type as MileStoneType][0];
const max = range[type as MileStoneType][1];
return dbMain
return db
.select()
.from(eta)
.innerJoin(bilibiliMetadata, eq(bilibiliMetadata.aid, eta.aid))

View File

@ -1,6 +1,5 @@
import { Elysia, t } from "elysia";
import { db } from "@core/drizzle";
import { eta } from "@core/drizzle/main/schema";
import { db, eta } from "@core/drizzle";
import { eq } from "drizzle-orm";
import { biliIDToAID } from "@elysia/lib/bilibiliID";

View File

@ -1,6 +1,5 @@
import { Elysia, t } from "elysia";
import { dbMain } from "@core/drizzle";
import { videoSnapshot } from "@core/drizzle/main/schema";
import { db, videoSnapshot } from "@core/drizzle";
import { bv2av } from "@elysia/lib/bilibiliID";
import { getVideoInfo } from "@core/net/getVideoInfo";
import { redis } from "@core/db/redis";
@ -31,7 +30,7 @@ async function insertVideoSnapshot(data: VideoInfoData) {
const favorites = data.stat.favorite;
const aid = data.aid;
await dbMain.insert(videoSnapshot).values({
await db.insert(videoSnapshot).values({
aid,
views,
danmakus,

View File

@ -1,6 +1,5 @@
import { Elysia } from "elysia";
import { dbMain } from "@core/drizzle";
import { videoSnapshot } from "@core/drizzle/main/schema";
import { db, videoSnapshot } from "@core/drizzle";
import { bv2av } from "@elysia/lib/bilibiliID";
import { ErrorResponseSchema } from "@elysia/src/schema";
import { eq, desc } from "drizzle-orm";
@ -26,7 +25,7 @@ export const getVideoSnapshotsHandler = new Elysia({ prefix: "/video" }).get(
});
}
const data = await dbMain
const data = await db
.select()
.from(videoSnapshot)
.where(eq(videoSnapshot.aid, aid))

View File

@ -1,4 +1,4 @@
import { Elysia } from "elysia";
import { Elysia, file } from "elysia";
import { getBindingInfo, logStartup } from "./startMessage";
import { pingHandler } from "@elysia/routes/ping";
import openapi from "@elysiajs/openapi";
@ -14,6 +14,7 @@ import { getVideoSnapshotsHandler } from "@elysia/routes/video/snapshots";
import { addSongHandler } from "@elysia/routes/song/add";
import { deleteSongHandler } from "@elysia/routes/song/delete";
import { songEtaHandler } from "@elysia/routes/video/eta";
import "./mq";
const [host, port] = getBindingInfo();
logStartup(host, port);
@ -45,6 +46,14 @@ const app = new Elysia({
.use(addSongHandler)
.use(deleteSongHandler)
.use(songEtaHandler)
.get("/a", () => file("public/background.jpg"))
.get("/song/:id", ({ redirect, params }) => {
console.log(`/song/${params.id}/info`);
return redirect(`/song/${params.id}/info`, 302);
})
.get("/video/:id", ({ redirect, params }) => {
return redirect(`/video/${params.id}/info`, 302);
})
.listen(15412);
export const VERSION = "0.7.0";

View File

@ -0,0 +1,21 @@
import { db, history } from "@core/drizzle";
import { ConnectionOptions, QueueEvents, QueueEventsListener } from "bullmq";
import { redis } from "bun";
interface CustomListener extends QueueEventsListener {
addSong: (args: { uid: string; songID: number }, id: string) => void;
}
const queueEvents = new QueueEvents("latestVideos", {
connection: redis as ConnectionOptions
});
queueEvents.on<CustomListener>(
"addSong",
async ({ uid, songID }: { uid: string; songID: number }) => {
await db.insert(history).values({
objectId: songID,
changeType: "add-song",
changedBy: uid,
data: null
});
}
);

View File

@ -1,4 +1,4 @@
import Elysia from "elysia";
import Elysia, { ElysiaFile } from "elysia";
const encoder = new TextEncoder();
@ -11,20 +11,14 @@ export const onAfterHandler = new Elysia().onAfterHandle(
const requestJson = contentType.includes("application/json");
const isBrowser =
!requestJson && (accept.includes("text/html") || secFetchMode === "navigate");
const responseValueType = typeof responseValue;
const isObject = responseValueType === "object";
const isObject = typeof responseValue === "object";
if (!isObject) {
const response = {
message: responseValue
};
const text = isBrowser ? JSON.stringify(response, null, 2) : JSON.stringify(response);
return new Response(encoder.encode(text), {
headers: {
"Content-Type": "application/json; charset=utf-8"
}
});
return;
}
const realResponse = responseValue as Record<string, unknown>;
if (responseValue instanceof ElysiaFile || responseValue instanceof Response) {
return;
}
const realResponse = responseValue as Record<string, any>;
if (realResponse.code) {
const text = isBrowser
? JSON.stringify(realResponse.response, null, 2)

View File

@ -1,8 +1,4 @@
import postgres from "postgres";
import { postgresConfigCred, postgresConfig } from "./pgConfigNew";
import { postgresConfig } from "./pgConfigNew";
export const sql = postgres(postgresConfig);
export const sqlCred = postgres(postgresConfigCred);
export const sqlTest = postgres(postgresConfig);
export const sql = postgres(postgresConfig);

View File

@ -1,8 +1,8 @@
import { dbMain } from "@core/drizzle";
import { db } from "@core/drizzle";
import { sql } from "drizzle-orm";
export const getClosestSnapshot = async (aid: number, targetTime: Date) => {
const closest = await dbMain.execute<{ created_at: Date; views: number }>(sql`
const closest = await db.execute<{ created_at: Date; views: number }>(sql`
SELECT created_at, views
FROM (
(SELECT created_at, views, 'later' AS type

View File

@ -1,9 +1,12 @@
import { dbMain } from "@core/drizzle";
import { latestVideoSnapshot } from "@core/drizzle/main/schema";
import { db, latestVideoSnapshot } from "@core/drizzle";
import { eq } from "drizzle-orm";
export const getLatestSnapshot = async (aid: number) => {
const result = await dbMain.select().from(latestVideoSnapshot).where(eq(latestVideoSnapshot.aid, aid)).limit(1);
const result = await db
.select()
.from(latestVideoSnapshot)
.where(eq(latestVideoSnapshot.aid, aid))
.limit(1);
if (result.length === 0) {
return null;
}

View File

@ -1,5 +1,4 @@
import { dbMain } from "@core/drizzle";
import { eta as etaTable } from "@core/drizzle/main/schema";
import { db, eta as etaTable } from "@core/drizzle";
import { eq } from "drizzle-orm";
import { MINUTE, HOUR, getClosetMilestone } from "@core/lib";
import { getLatestSnapshot, getClosestSnapshot } from "@core/db";
@ -34,7 +33,7 @@ export const getGroundTruthMilestoneETA = async (
};
export const getMilestoneETA = async (aid: number) => {
const data = await dbMain.select().from(etaTable).where(eq(etaTable.aid, aid)).limit(1);
const data = await db.select().from(etaTable).where(eq(etaTable.aid, aid)).limit(1);
if (data.length > 0) {
return data[0].eta;
}

View File

@ -1,44 +0,0 @@
-- Current sql file was generated after introspecting the database
-- If you want to run this migration please uncomment this code before executing migrations
/*
CREATE SEQUENCE "public"."captcha_difficulty_settings_id_seq" INCREMENT BY 1 MINVALUE 1 MAXVALUE 2147483647 START WITH 1 CACHE 1;--> statement-breakpoint
CREATE SEQUENCE "public"."users_id_seq" INCREMENT BY 1 MINVALUE 1 MAXVALUE 2147483647 START WITH 1 CACHE 1;--> statement-breakpoint
CREATE TABLE "captcha_difficulty_settings" (
"id" integer DEFAULT nextval('captcha_difficulty_settings_id_seq'::regclass) NOT NULL,
"method" text NOT NULL,
"path" text NOT NULL,
"duration" real NOT NULL,
"threshold" integer NOT NULL,
"difficulty" integer NOT NULL,
"global" boolean NOT NULL
);
--> statement-breakpoint
CREATE TABLE "login_sessions" (
"id" text NOT NULL,
"uid" integer NOT NULL,
"created_at" timestamp with time zone DEFAULT CURRENT_TIMESTAMP NOT NULL,
"expire_at" timestamp with time zone,
"last_used_at" timestamp with time zone,
"ip_address" "inet",
"user_agent" text,
"deactivated_at" timestamp with time zone
);
--> statement-breakpoint
CREATE TABLE "users" (
"id" integer DEFAULT nextval('users_id_seq'::regclass) NOT NULL,
"nickname" text,
"username" text NOT NULL,
"password" text NOT NULL,
"unq_id" text DEFAULT gen_random_uuid() NOT NULL,
"role" text DEFAULT 'USER' NOT NULL,
"created_at" timestamp with time zone DEFAULT CURRENT_TIMESTAMP NOT NULL
);
--> statement-breakpoint
CREATE UNIQUE INDEX "captcha_difficulty_settings_pkey" ON "captcha_difficulty_settings" USING btree ("id" int4_ops);--> statement-breakpoint
CREATE INDEX "inx_login-sessions_uid" ON "login_sessions" USING btree ("uid" int4_ops);--> statement-breakpoint
CREATE UNIQUE INDEX "login_sessions_pkey" ON "login_sessions" USING btree ("id" text_ops);--> statement-breakpoint
CREATE UNIQUE INDEX "users_pkey" ON "users" USING btree ("id" int4_ops);--> statement-breakpoint
CREATE UNIQUE INDEX "users_pkey1" ON "users" USING btree ("id" int4_ops);--> statement-breakpoint
CREATE UNIQUE INDEX "users_unq_id_key" ON "users" USING btree ("unq_id" text_ops);--> statement-breakpoint
CREATE UNIQUE INDEX "users_username_key" ON "users" USING btree ("username" text_ops);
*/

View File

@ -1,350 +0,0 @@
{
"id": "00000000-0000-0000-0000-000000000000",
"prevId": "",
"version": "7",
"dialect": "postgresql",
"tables": {
"public.captcha_difficulty_settings": {
"name": "captcha_difficulty_settings",
"schema": "",
"columns": {
"id": {
"name": "id",
"type": "integer",
"primaryKey": false,
"notNull": true,
"default": "nextval('captcha_difficulty_settings_id_seq'::regclass)"
},
"method": {
"name": "method",
"type": "text",
"primaryKey": false,
"notNull": true
},
"path": {
"name": "path",
"type": "text",
"primaryKey": false,
"notNull": true
},
"duration": {
"name": "duration",
"type": "real",
"primaryKey": false,
"notNull": true
},
"threshold": {
"name": "threshold",
"type": "integer",
"primaryKey": false,
"notNull": true
},
"difficulty": {
"name": "difficulty",
"type": "integer",
"primaryKey": false,
"notNull": true
},
"global": {
"name": "global",
"type": "boolean",
"primaryKey": false,
"notNull": true
}
},
"indexes": {
"captcha_difficulty_settings_pkey": {
"name": "captcha_difficulty_settings_pkey",
"columns": [
{
"expression": "id",
"asc": true,
"nulls": "last",
"opclass": "int4_ops",
"isExpression": false
}
],
"isUnique": true,
"concurrently": false,
"method": "btree",
"with": {}
}
},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"checkConstraints": {},
"policies": {},
"isRLSEnabled": false
},
"public.login_sessions": {
"name": "login_sessions",
"schema": "",
"columns": {
"id": {
"name": "id",
"type": "text",
"primaryKey": false,
"notNull": true
},
"uid": {
"name": "uid",
"type": "integer",
"primaryKey": false,
"notNull": true
},
"created_at": {
"name": "created_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": true,
"default": "CURRENT_TIMESTAMP"
},
"expire_at": {
"name": "expire_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": false
},
"last_used_at": {
"name": "last_used_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": false
},
"ip_address": {
"name": "ip_address",
"type": "inet",
"primaryKey": false,
"notNull": false
},
"user_agent": {
"name": "user_agent",
"type": "text",
"primaryKey": false,
"notNull": false
},
"deactivated_at": {
"name": "deactivated_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": false
}
},
"indexes": {
"inx_login-sessions_uid": {
"name": "inx_login-sessions_uid",
"columns": [
{
"expression": "uid",
"asc": true,
"nulls": "last",
"opclass": "int4_ops",
"isExpression": false
}
],
"isUnique": false,
"concurrently": false,
"method": "btree",
"with": {}
},
"login_sessions_pkey": {
"name": "login_sessions_pkey",
"columns": [
{
"expression": "id",
"asc": true,
"nulls": "last",
"opclass": "text_ops",
"isExpression": false
}
],
"isUnique": true,
"concurrently": false,
"method": "btree",
"with": {}
}
},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"checkConstraints": {},
"policies": {},
"isRLSEnabled": false
},
"public.users": {
"name": "users",
"schema": "",
"columns": {
"id": {
"name": "id",
"type": "integer",
"primaryKey": false,
"notNull": true,
"default": "nextval('users_id_seq'::regclass)"
},
"nickname": {
"name": "nickname",
"type": "text",
"primaryKey": false,
"notNull": false
},
"username": {
"name": "username",
"type": "text",
"primaryKey": false,
"notNull": true
},
"password": {
"name": "password",
"type": "text",
"primaryKey": false,
"notNull": true
},
"unq_id": {
"name": "unq_id",
"type": "text",
"primaryKey": false,
"notNull": true,
"default": "gen_random_uuid()"
},
"role": {
"name": "role",
"type": "text",
"primaryKey": false,
"notNull": true,
"default": "'USER'"
},
"created_at": {
"name": "created_at",
"type": "timestamp with time zone",
"primaryKey": false,
"notNull": true,
"default": "CURRENT_TIMESTAMP"
}
},
"indexes": {
"users_pkey": {
"name": "users_pkey",
"columns": [
{
"expression": "id",
"asc": true,
"nulls": "last",
"opclass": "int4_ops",
"isExpression": false
}
],
"isUnique": true,
"concurrently": false,
"method": "btree",
"with": {}
},
"users_pkey1": {
"name": "users_pkey1",
"columns": [
{
"expression": "id",
"asc": true,
"nulls": "last",
"opclass": "int4_ops",
"isExpression": false
}
],
"isUnique": true,
"concurrently": false,
"method": "btree",
"with": {}
},
"users_unq_id_key": {
"name": "users_unq_id_key",
"columns": [
{
"expression": "unq_id",
"asc": true,
"nulls": "last",
"opclass": "text_ops",
"isExpression": false
}
],
"isUnique": true,
"concurrently": false,
"method": "btree",
"with": {}
},
"users_username_key": {
"name": "users_username_key",
"columns": [
{
"expression": "username",
"asc": true,
"nulls": "last",
"opclass": "text_ops",
"isExpression": false
}
],
"isUnique": true,
"concurrently": false,
"method": "btree",
"with": {}
}
},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"checkConstraints": {},
"policies": {},
"isRLSEnabled": false
}
},
"enums": {},
"schemas": {},
"sequences": {
"public.captcha_difficulty_settings_id_seq": {
"name": "captcha_difficulty_settings_id_seq",
"schema": "public",
"startWith": "1",
"minValue": "1",
"maxValue": "2147483647",
"increment": "1",
"cycle": false,
"cache": "1"
},
"public.users_id_seq": {
"name": "users_id_seq",
"schema": "public",
"startWith": "1",
"minValue": "1",
"maxValue": "2147483647",
"increment": "1",
"cycle": false,
"cache": "1"
}
},
"roles": {},
"policies": {},
"views": {},
"_meta": {
"schemas": {},
"tables": {},
"columns": {}
},
"internal": {
"tables": {
"captcha_difficulty_settings": {
"columns": {
"id": {
"isDefaultAnExpression": true
}
}
},
"users": {
"columns": {
"id": {
"isDefaultAnExpression": true
}
}
}
}
}
}

View File

@ -1,13 +0,0 @@
{
"version": "7",
"dialect": "postgresql",
"entries": [
{
"idx": 0,
"version": "7",
"when": 1750513073792,
"tag": "0000_moaning_shotgun",
"breakpoints": true
}
]
}

View File

@ -1,2 +0,0 @@
import { relations } from "drizzle-orm/relations";
import {} from "./schema";

View File

@ -1,93 +0,0 @@
import {
pgTable,
uniqueIndex,
integer,
text,
real,
boolean,
index,
timestamp,
inet,
pgSequence
} from "drizzle-orm/pg-core";
import { sql } from "drizzle-orm";
export const captchaDifficultySettingsIdSeq = pgSequence("captcha_difficulty_settings_id_seq", {
startWith: "1",
increment: "1",
minValue: "1",
maxValue: "2147483647",
cache: "1",
cycle: false
});
export const usersIdSeq = pgSequence("users_id_seq", {
startWith: "1",
increment: "1",
minValue: "1",
maxValue: "2147483647",
cache: "1",
cycle: false
});
export const captchaDifficultySettings = pgTable(
"captcha_difficulty_settings",
{
id: integer()
.default(sql`nextval('captcha_difficulty_settings_id_seq'::regclass)`)
.notNull(),
method: text().notNull(),
path: text().notNull(),
duration: real().notNull(),
threshold: integer().notNull(),
difficulty: integer().notNull(),
global: boolean().notNull()
},
(table) => [
uniqueIndex("captcha_difficulty_settings_pkey").using("btree", table.id.asc().nullsLast().op("int4_ops"))
]
);
export const loginSessions = pgTable(
"login_sessions",
{
id: text().notNull(),
uid: integer().notNull(),
createdAt: timestamp("created_at", { withTimezone: true, mode: "string" })
.default(sql`CURRENT_TIMESTAMP`)
.notNull(),
expireAt: timestamp("expire_at", { withTimezone: true, mode: "string" }),
lastUsedAt: timestamp("last_used_at", { withTimezone: true, mode: "string" }),
ipAddress: inet("ip_address"),
userAgent: text("user_agent"),
deactivatedAt: timestamp("deactivated_at", { withTimezone: true, mode: "string" })
},
(table) => [
index("inx_login-sessions_uid").using("btree", table.uid.asc().nullsLast().op("int4_ops")),
uniqueIndex("login_sessions_pkey").using("btree", table.id.asc().nullsLast().op("text_ops"))
]
);
export const users = pgTable(
"users",
{
id: integer()
.default(sql`nextval('users_id_seq'::regclass)`)
.notNull(),
nickname: text(),
username: text().notNull(),
password: text().notNull(),
unqId: text("unq_id")
.default(sql`gen_random_uuid()`)
.notNull(),
role: text().default("USER").notNull(),
createdAt: timestamp("created_at", { withTimezone: true, mode: "string" })
.default(sql`CURRENT_TIMESTAMP`)
.notNull()
},
(table) => [
uniqueIndex("users_pkey").using("btree", table.id.asc().nullsLast().op("int4_ops")),
uniqueIndex("users_pkey1").using("btree", table.id.asc().nullsLast().op("int4_ops")),
uniqueIndex("users_unq_id_key").using("btree", table.unqId.asc().nullsLast().op("text_ops")),
uniqueIndex("users_username_key").using("btree", table.username.asc().nullsLast().op("text_ops"))
]
);

View File

@ -1,8 +1,8 @@
"use server";
import { drizzle } from "drizzle-orm/postgres-js";
import { sqlCred, sql } from "@core/db/dbNew";
import { sql } from "@core/db/dbNew";
export const dbMain = drizzle(sql);
export const dbCred = drizzle(sqlCred);
export const db = drizzle(sql);
export const db = drizzle(sql);
export * from "./main/schema";
export * from "./type";

View File

@ -1,24 +1,15 @@
import { relations } from "drizzle-orm/relations";
import { songs, relationSinger, singer, relationsProducer } from "./schema";
import { usersInCredentials, history, songs, relationsProducer, singer, relationSinger } from "./schema";
export const relationSingerRelations = relations(relationSinger, ({one}) => ({
song: one(songs, {
fields: [relationSinger.songId],
references: [songs.id]
}),
singer: one(singer, {
fields: [relationSinger.singerId],
references: [singer.id]
export const historyRelations = relations(history, ({one}) => ({
usersInCredential: one(usersInCredentials, {
fields: [history.changedBy],
references: [usersInCredentials.unqId]
}),
}));
export const songsRelations = relations(songs, ({many}) => ({
relationSingers: many(relationSinger),
relationsProducers: many(relationsProducer),
}));
export const singerRelations = relations(singer, ({many}) => ({
relationSingers: many(relationSinger),
export const usersInCredentialsRelations = relations(usersInCredentials, ({many}) => ({
histories: many(history),
}));
export const relationsProducerRelations = relations(relationsProducer, ({one}) => ({
@ -26,4 +17,24 @@ export const relationsProducerRelations = relations(relationsProducer, ({one}) =
fields: [relationsProducer.songId],
references: [songs.id]
}),
}));
export const songsRelations = relations(songs, ({many}) => ({
relationsProducers: many(relationsProducer),
relationSingers: many(relationSinger),
}));
export const relationSingerRelations = relations(relationSinger, ({one}) => ({
singer: one(singer, {
fields: [relationSinger.singerId],
references: [singer.id]
}),
song: one(songs, {
fields: [relationSinger.songId],
references: [songs.id]
}),
}));
export const singerRelations = relations(singer, ({many}) => ({
relationSingers: many(relationSinger),
}));

View File

@ -1,4 +1,4 @@
import { pgTable, index, unique, serial, bigint, text, timestamp, uniqueIndex, integer, varchar, smallint, jsonb, boolean, foreignKey, bigserial, real, pgSchema, inet, pgSequence } from "drizzle-orm/pg-core"
import { pgTable, pgSchema, uniqueIndex, check, integer, text, timestamp, foreignKey, serial, bigint, jsonb, index, inet, varchar, smallint, real, unique, boolean, bigserial, pgSequence } from "drizzle-orm/pg-core"
import { sql } from "drizzle-orm"
export const credentials = pgSchema("credentials");
@ -6,28 +6,69 @@ export const userRoleInCredentials = credentials.enum("user_role", ['ADMIN', 'US
export const allDataIdSeq = pgSequence("all_data_id_seq", { startWith: "1", increment: "1", minValue: "1", maxValue: "2147483647", cache: "1", cycle: false })
export const labelingResultIdSeq = pgSequence("labeling_result_id_seq", { startWith: "1", increment: "1", minValue: "1", maxValue: "2147483647", cache: "1", cycle: false })
export const relationSingerIdSeq = pgSequence("relation_singer_id_seq", { startWith: "1", increment: "1", minValue: "1", maxValue: "2147483647", cache: "1", cycle: false })
export const relationsProducerIdSeq = pgSequence("relations_producer_id_seq", { startWith: "1", increment: "1", minValue: "1", maxValue: "2147483647", cache: "1", cycle: false })
export const songsIdSeq = pgSequence("songs_id_seq", { startWith: "1", increment: "1", minValue: "1", maxValue: "2147483647", cache: "1", cycle: false })
export const videoSnapshotIdSeq = pgSequence("video_snapshot_id_seq", { startWith: "1", increment: "1", minValue: "1", maxValue: "2147483647", cache: "1", cycle: false })
export const viewsIncrementRateIdSeq = pgSequence("views_increment_rate_id_seq", { startWith: "1", increment: "1", minValue: "1", maxValue: "9223372036854775807", cache: "1", cycle: false })
export const relationSingerIdSeq = pgSequence("relation_singer_id_seq", { startWith: "1", increment: "1", minValue: "1", maxValue: "2147483647", cache: "1", cycle: false })
export const relationsProducerIdSeq = pgSequence("relations_producer_id_seq", { startWith: "1", increment: "1", minValue: "1", maxValue: "2147483647", cache: "1", cycle: false })
export const usersIdSeqInCredentials = credentials.sequence("users_id_seq", { startWith: "1", increment: "1", minValue: "1", maxValue: "2147483647", cache: "1", cycle: false })
export const relations = pgTable("relations", {
export const usersInCredentials = credentials.table("users", {
id: integer().default(sql`nextval('credentials.users_id_seq'::regclass)`).notNull(),
nickname: text(),
username: text().notNull(),
password: text().notNull(),
unqId: text("unq_id").notNull(),
role: userRoleInCredentials().default('USER').notNull(),
createdAt: timestamp("created_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(),
}, (table) => [
uniqueIndex("users_pkey").using("btree", table.id.asc().nullsLast().op("int4_ops")),
uniqueIndex("users_pkey1").using("btree", table.id.asc().nullsLast().op("int4_ops")),
uniqueIndex("users_username_key").using("btree", table.username.asc().nullsLast().op("text_ops")),
check("users_id_not_null", sql`NOT NULL id`),
check("users_username_not_null", sql`NOT NULL username`),
check("users_password_not_null", sql`NOT NULL password`),
check("users_unq_id_not_null", sql`NOT NULL unq_id`),
check("users_role_not_null", sql`NOT NULL role`),
check("users_created_at_not_null", sql`NOT NULL created_at`),
]);
export const history = pgTable("history", {
id: serial().primaryKey().notNull(),
// You can use { mode: "bigint" } if numbers are exceeding js number limitations
sourceId: bigint("source_id", { mode: "number" }).notNull(),
sourceType: text("source_type").notNull(),
// You can use { mode: "bigint" } if numbers are exceeding js number limitations
targetId: bigint("target_id", { mode: "number" }).notNull(),
targetType: text("target_type").notNull(),
relation: text().notNull(),
createdAt: timestamp("created_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(),
updatedAt: timestamp("updated_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(),
objectId: bigint("object_id", { mode: "number" }).notNull(),
changeType: text("change_type").notNull(),
changedAt: timestamp("changed_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(),
changedBy: text("changed_by").notNull(),
data: jsonb(),
}, (table) => [
index("idx_relations_source_id_source_type_relation").using("btree", table.sourceId.asc().nullsLast().op("int8_ops"), table.sourceType.asc().nullsLast().op("int8_ops"), table.relation.asc().nullsLast().op("text_ops")),
index("idx_relations_target_id_target_type_relation").using("btree", table.targetId.asc().nullsLast().op("text_ops"), table.targetType.asc().nullsLast().op("text_ops"), table.relation.asc().nullsLast().op("text_ops")),
unique("unq_relations").on(table.sourceId, table.sourceType, table.targetId, table.targetType, table.relation),
foreignKey({
columns: [table.changedBy],
foreignColumns: [usersInCredentials.unqId],
name: "rel_history_changed_by"
}),
check("history_id_not_null", sql`NOT NULL id`),
check("history_object_id_not_null", sql`NOT NULL object_id`),
check("history_change_type_not_null", sql`NOT NULL change_type`),
check("history_changed_at_not_null", sql`NOT NULL changed_at`),
check("history_changed_by_not_null", sql`NOT NULL changed_by`),
]);
export const loginSessionsInCredentials = credentials.table("login_sessions", {
id: text().notNull(),
uid: integer().notNull(),
createdAt: timestamp("created_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(),
expireAt: timestamp("expire_at", { withTimezone: true, mode: 'string' }),
lastUsedAt: timestamp("last_used_at", { withTimezone: true, mode: 'string' }),
ipAddress: inet("ip_address"),
userAgent: text("user_agent"),
deactivatedAt: timestamp("deactivated_at", { withTimezone: true, mode: 'string' }),
}, (table) => [
index("inx_login-sessions_uid").using("btree", table.uid.asc().nullsLast().op("int4_ops")),
uniqueIndex("login_sessions_pkey").using("btree", table.id.asc().nullsLast().op("text_ops")),
check("login_sessions_id_not_null", sql`NOT NULL id`),
check("login_sessions_uid_not_null", sql`NOT NULL uid`),
check("login_sessions_created_at_not_null", sql`NOT NULL created_at`),
]);
export const bilibiliMetadata = pgTable("bilibili_metadata", {
@ -52,6 +93,9 @@ export const bilibiliMetadata = pgTable("bilibili_metadata", {
index("idx_all-data_uid").using("btree", table.uid.asc().nullsLast().op("int8_ops")),
index("idx_bili-meta_status").using("btree", table.status.asc().nullsLast().op("int4_ops")),
uniqueIndex("unq_all-data_aid").using("btree", table.aid.asc().nullsLast().op("int8_ops")),
check("bilibili_metadata_id_not_null", sql`NOT NULL id`),
check("bilibili_metadata_aid_not_null", sql`NOT NULL aid`),
check("bilibili_metadata_status_not_null", sql`NOT NULL status`),
]);
export const humanClassifiedLables = pgTable("human_classified_lables", {
@ -66,22 +110,41 @@ export const humanClassifiedLables = pgTable("human_classified_lables", {
index("idx_classified-labels-human_author").using("btree", table.uid.asc().nullsLast().op("int4_ops")),
index("idx_classified-labels-human_created-at").using("btree", table.createdAt.asc().nullsLast().op("timestamptz_ops")),
index("idx_classified-labels-human_label").using("btree", table.label.asc().nullsLast().op("int2_ops")),
check("human_classified_lables_id_not_null", sql`NOT NULL id`),
check("human_classified_lables_aid_not_null", sql`NOT NULL aid`),
check("human_classified_lables_uid_not_null", sql`NOT NULL uid`),
check("human_classified_lables_label_not_null", sql`NOT NULL label`),
check("human_classified_lables_created_at_not_null", sql`NOT NULL created_at`),
]);
export const singer = pgTable("singer", {
id: serial().primaryKey().notNull(),
name: text().notNull(),
});
export const history = pgTable("history", {
id: serial().primaryKey().notNull(),
export const videoSnapshot = pgTable("video_snapshot", {
id: integer().default(sql`nextval('video_snapshot_id_seq'::regclass)`).notNull(),
createdAt: timestamp("created_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(),
views: integer().notNull(),
coins: integer(),
likes: integer(),
favorites: integer(),
shares: integer(),
danmakus: integer(),
// You can use { mode: "bigint" } if numbers are exceeding js number limitations
objectId: bigint("object_id", { mode: "number" }).notNull(),
changeType: text("change_type").notNull(),
changedAt: timestamp("changed_at", { withTimezone: true, mode: 'string' }).notNull(),
changedBy: integer("changed_by").notNull(),
data: jsonb().notNull(),
});
aid: bigint({ mode: "number" }).notNull(),
replies: integer(),
}, (table) => [
index("idx_vid_snapshot_aid_created_at").using("btree", table.aid.asc().nullsLast().op("int8_ops"), table.createdAt.asc().nullsLast().op("int8_ops")),
index("idx_vid_snapshot_time").using("btree", table.createdAt.asc().nullsLast().op("timestamptz_ops")),
check("video_snapshot_id_not_null", sql`NOT NULL id`),
check("video_snapshot_created_at_not_null", sql`NOT NULL created_at`),
check("video_snapshot_views_not_null", sql`NOT NULL views`),
check("video_snapshot_aid_not_null", sql`NOT NULL aid`),
]);
export const producer = pgTable("producer", {
id: integer().primaryKey().notNull(),
name: text().notNull(),
}, (table) => [
check("producer_id_not_null", sql`NOT NULL id`),
check("producer_name_not_null", sql`NOT NULL name`),
]);
export const labellingResult = pgTable("labelling_result", {
id: integer().default(sql`nextval('labeling_result_id_seq'::regclass)`).notNull(),
@ -97,6 +160,11 @@ export const labellingResult = pgTable("labelling_result", {
index("idx_labelling_aid-label").using("btree", table.aid.asc().nullsLast().op("int2_ops"), table.label.asc().nullsLast().op("int2_ops")),
uniqueIndex("labeling_result_pkey").using("btree", table.id.asc().nullsLast().op("int4_ops")),
uniqueIndex("unq_labelling-result_aid_model-version").using("btree", table.aid.asc().nullsLast().op("int8_ops"), table.modelVersion.asc().nullsLast().op("int8_ops")),
check("labelling_result_id_not_null", sql`NOT NULL id`),
check("labelling_result_aid_not_null", sql`NOT NULL aid`),
check("labelling_result_label_not_null", sql`NOT NULL label`),
check("labelling_result_model_version_not_null", sql`NOT NULL model_version`),
check("labelling_result_created_at_not_null", sql`NOT NULL created_at`),
]);
export const latestVideoSnapshot = pgTable("latest_video_snapshot", {
@ -113,6 +181,80 @@ export const latestVideoSnapshot = pgTable("latest_video_snapshot", {
}, (table) => [
index("idx_latest-video-snapshot_time").using("btree", table.time.asc().nullsLast().op("timestamptz_ops")),
index("idx_latest-video-snapshot_views").using("btree", table.views.asc().nullsLast().op("int4_ops")),
check("latest_video_snapshot_aid_not_null", sql`NOT NULL aid`),
check("latest_video_snapshot_time_not_null", sql`NOT NULL "time"`),
check("latest_video_snapshot_views_not_null", sql`NOT NULL views`),
check("latest_video_snapshot_coins_not_null", sql`NOT NULL coins`),
check("latest_video_snapshot_likes_not_null", sql`NOT NULL likes`),
check("latest_video_snapshot_favorites_not_null", sql`NOT NULL favorites`),
check("latest_video_snapshot_replies_not_null", sql`NOT NULL replies`),
check("latest_video_snapshot_danmakus_not_null", sql`NOT NULL danmakus`),
check("latest_video_snapshot_shares_not_null", sql`NOT NULL shares`),
]);
export const relationsProducer = pgTable("relations_producer", {
id: integer().default(sql`nextval('relations_producer_id_seq'::regclass)`).primaryKey().notNull(),
// You can use { mode: "bigint" } if numbers are exceeding js number limitations
songId: bigint("song_id", { mode: "number" }).notNull(),
createdAt: timestamp("created_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(),
producerId: integer("producer_id").notNull(),
updatedAt: timestamp("updated_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(),
}, (table) => [
foreignKey({
columns: [table.songId],
foreignColumns: [songs.id],
name: "fkey_relations_producer_songs_id"
}),
check("relations_producer_id_not_null", sql`NOT NULL id`),
check("relations_producer_song_id_not_null", sql`NOT NULL song_id`),
check("relations_producer_created_at_not_null", sql`NOT NULL created_at`),
check("relations_producer_producer_id_not_null", sql`NOT NULL producer_id`),
check("relations_producer_updated_at_not_null", sql`NOT NULL updated_at`),
]);
export const eta = pgTable("eta", {
// You can use { mode: "bigint" } if numbers are exceeding js number limitations
aid: bigint({ mode: "number" }).primaryKey().notNull(),
eta: real().notNull(),
speed: real().notNull(),
currentViews: integer("current_views").notNull(),
updatedAt: timestamp("updated_at", { withTimezone: true, mode: 'string' }).defaultNow().notNull(),
}, (table) => [
index("idx_eta_eta_current_views").using("btree", table.eta.asc().nullsLast().op("int4_ops"), table.currentViews.asc().nullsLast().op("int4_ops")),
check("eta_aid_not_null", sql`NOT NULL aid`),
check("eta_eta_not_null", sql`NOT NULL eta`),
check("eta_speed_not_null", sql`NOT NULL speed`),
check("eta_current_views_not_null", sql`NOT NULL current_views`),
check("eta_updated_at_not_null", sql`NOT NULL updated_at`),
]);
export const bilibiliUser = pgTable("bilibili_user", {
id: serial().primaryKey().notNull(),
// You can use { mode: "bigint" } if numbers are exceeding js number limitations
uid: bigint({ mode: "number" }).notNull(),
username: text().notNull(),
desc: text().notNull(),
fans: integer().notNull(),
createdAt: timestamp("created_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(),
updatedAt: timestamp("updated_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(),
}, (table) => [
index("idx_bili-user_uid").using("btree", table.uid.asc().nullsLast().op("int8_ops")),
unique("unq_bili-user_uid").on(table.uid),
check("bilibili_user_id_not_null", sql`NOT NULL id`),
check("bilibili_user_uid_not_null", sql`NOT NULL uid`),
check("bilibili_user_username_not_null", sql`NOT NULL username`),
check("bilibili_user_desc_not_null", sql`NOT NULL "desc"`),
check("bilibili_user_fans_not_null", sql`NOT NULL fans`),
check("bilibili_user_created_at_not_null", sql`NOT NULL created_at`),
check("bilibili_user_updated_at_not_null", sql`NOT NULL updated_at`),
]);
export const singer = pgTable("singer", {
id: serial().primaryKey().notNull(),
name: text().notNull(),
}, (table) => [
check("singer_id_not_null", sql`NOT NULL id`),
check("singer_name_not_null", sql`NOT NULL name`),
]);
export const songs = pgTable("songs", {
@ -131,71 +273,16 @@ export const songs = pgTable("songs", {
image: text(),
producer: text(),
}, (table) => [
index("idx_aid").using("btree", table.aid.asc().nullsLast().op("int8_ops")),
index("idx_hash_songs_aid").using("hash", table.aid.asc().nullsLast().op("int8_ops")),
index("idx_netease_id").using("btree", table.neteaseId.asc().nullsLast().op("int8_ops")),
index("idx_published_at").using("btree", table.publishedAt.asc().nullsLast().op("timestamptz_ops")),
index("idx_type").using("btree", table.type.asc().nullsLast().op("int2_ops")),
uniqueIndex("unq_songs_aid").using("btree", table.aid.asc().nullsLast().op("int8_ops")),
uniqueIndex("unq_songs_netease_id").using("btree", table.neteaseId.asc().nullsLast().op("int8_ops")),
]);
export const videoSnapshot = pgTable("video_snapshot", {
id: integer().default(sql`nextval('video_snapshot_id_seq'::regclass)`).notNull(),
createdAt: timestamp("created_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(),
views: integer().notNull(),
coins: integer(),
likes: integer(),
favorites: integer(),
shares: integer(),
danmakus: integer(),
// You can use { mode: "bigint" } if numbers are exceeding js number limitations
aid: bigint({ mode: "number" }).notNull(),
replies: integer(),
}, (table) => [
index("idx_vid_snapshot_aid").using("btree", table.aid.asc().nullsLast().op("int8_ops")),
index("idx_vid_snapshot_aid_created_at").using("btree", table.aid.asc().nullsLast().op("int8_ops"), table.createdAt.asc().nullsLast().op("timestamptz_ops")),
index("idx_vid_snapshot_time").using("btree", table.createdAt.asc().nullsLast().op("timestamptz_ops")),
index("idx_vid_snapshot_views").using("btree", table.views.asc().nullsLast().op("int4_ops")),
uniqueIndex("video_snapshot_pkey").using("btree", table.id.asc().nullsLast().op("int4_ops")),
]);
export const bilibiliUser = pgTable("bilibili_user", {
id: serial().primaryKey().notNull(),
// You can use { mode: "bigint" } if numbers are exceeding js number limitations
uid: bigint({ mode: "number" }).notNull(),
username: text().notNull(),
desc: text().notNull(),
fans: integer().notNull(),
createdAt: timestamp("created_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(),
updatedAt: timestamp("updated_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(),
}, (table) => [
index("idx_bili-user_uid").using("btree", table.uid.asc().nullsLast().op("int8_ops")),
unique("unq_bili-user_uid").on(table.uid),
]);
export const producer = pgTable("producer", {
id: integer().primaryKey().notNull(),
name: text().notNull(),
});
export const relationSinger = pgTable("relation_singer", {
id: integer().default(sql`nextval('relation_singer_id_seq'::regclass)`).primaryKey().notNull(),
songId: integer("song_id").notNull(),
singerId: integer("singer_id").notNull(),
createdAt: timestamp("created_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(),
updatedAt: timestamp("updated_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(),
}, (table) => [
foreignKey({
columns: [table.songId],
foreignColumns: [songs.id],
name: "fkey_song_id"
}),
foreignKey({
columns: [table.singerId],
foreignColumns: [singer.id],
name: "fkey_singer_id"
}),
check("songs_id_not_null", sql`NOT NULL id`),
check("songs_created_at_not_null", sql`NOT NULL created_at`),
check("songs_updated_at_not_null", sql`NOT NULL updated_at`),
check("songs_deleted_not_null", sql`NOT NULL deleted`),
]);
export const snapshotSchedule = pgTable("snapshot_schedule", {
@ -208,65 +295,58 @@ export const snapshotSchedule = pgTable("snapshot_schedule", {
finishedAt: timestamp("finished_at", { withTimezone: true, mode: 'string' }),
status: text().default('pending').notNull(),
}, (table) => [
index("idx_snapshot_schedule_aid").using("btree", table.aid.asc().nullsLast().op("int8_ops")),
index("idx_snapshot_schedule_started-at_status_type").using("btree", table.startedAt.asc().nullsLast().op("text_ops"), table.status.asc().nullsLast().op("timestamptz_ops"), table.type.asc().nullsLast().op("timestamptz_ops")),
index("idx_snapshot_schedule_started_at").using("btree", table.startedAt.asc().nullsLast().op("timestamptz_ops")),
index("idx_snapshot_schedule_status").using("btree", table.status.asc().nullsLast().op("text_ops")),
index("idx_snapshot_schedule_status_type_aid").using("btree", table.status.asc().nullsLast().op("int8_ops"), table.type.asc().nullsLast().op("int8_ops"), table.aid.asc().nullsLast().op("text_ops")),
index("idx_snapshot_schedule_type").using("btree", table.type.asc().nullsLast().op("text_ops")),
index("idx_snapshot_schedule_aid_status_type").using("btree", table.aid.asc().nullsLast().op("int8_ops"), table.status.asc().nullsLast().op("text_ops"), table.type.asc().nullsLast().op("text_ops")),
index("idx_snapshot_schedule_status_started_at").using("btree", table.status.asc().nullsLast().op("timestamptz_ops"), table.startedAt.asc().nullsLast().op("text_ops")),
uniqueIndex("snapshot_schedule_pkey").using("btree", table.id.asc().nullsLast().op("int8_ops")),
check("snapshot_schedule_id_not_null", sql`NOT NULL id`),
check("snapshot_schedule_aid_not_null", sql`NOT NULL aid`),
check("snapshot_schedule_created_at_not_null", sql`NOT NULL created_at`),
check("snapshot_schedule_status_not_null", sql`NOT NULL status`),
]);
export const eta = pgTable("eta", {
export const relationSinger = pgTable("relation_singer", {
id: integer().default(sql`nextval('relation_singer_id_seq'::regclass)`).primaryKey().notNull(),
// You can use { mode: "bigint" } if numbers are exceeding js number limitations
aid: bigint({ mode: "number" }).primaryKey().notNull(),
eta: real().notNull(),
speed: real().notNull(),
currentViews: integer("current_views").notNull(),
updatedAt: timestamp("updated_at", { withTimezone: true, mode: 'string' }).defaultNow().notNull(),
}, (table) => [
index("idx_eta_eta").using("btree", table.eta.asc().nullsLast().op("float4_ops")),
]);
export const loginSessionsInCredentials = credentials.table("login_sessions", {
id: text().notNull(),
uid: integer().notNull(),
songId: bigint("song_id", { mode: "number" }).notNull(),
singerId: integer("singer_id").notNull(),
createdAt: timestamp("created_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(),
expireAt: timestamp("expire_at", { withTimezone: true, mode: 'string' }),
lastUsedAt: timestamp("last_used_at", { withTimezone: true, mode: 'string' }),
ipAddress: inet("ip_address"),
userAgent: text("user_agent"),
deactivatedAt: timestamp("deactivated_at", { withTimezone: true, mode: 'string' }),
}, (table) => [
index("inx_login-sessions_uid").using("btree", table.uid.asc().nullsLast().op("int4_ops")),
uniqueIndex("login_sessions_pkey").using("btree", table.id.asc().nullsLast().op("text_ops")),
]);
export const usersInCredentials = credentials.table("users", {
id: integer().default(sql`nextval('credentials.users_id_seq'::regclass)`).notNull(),
nickname: text(),
username: text().notNull(),
password: text().notNull(),
unqId: text("unq_id").notNull(),
role: userRoleInCredentials().default('USER').notNull(),
createdAt: timestamp("created_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(),
}, (table) => [
uniqueIndex("users_pkey").using("btree", table.id.asc().nullsLast().op("int4_ops")),
uniqueIndex("users_pkey1").using("btree", table.id.asc().nullsLast().op("int4_ops")),
uniqueIndex("users_unq_id_key").using("btree", table.unqId.asc().nullsLast().op("text_ops")),
uniqueIndex("users_username_key").using("btree", table.username.asc().nullsLast().op("text_ops")),
]);
export const relationsProducer = pgTable("relations_producer", {
id: integer().default(sql`nextval('relations_producer_id_seq'::regclass)`).primaryKey().notNull(),
songId: integer("song_id").notNull(),
createdAt: timestamp("created_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(),
producerId: integer("producer_id").notNull(),
updatedAt: timestamp("updated_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(),
}, (table) => [
foreignKey({
columns: [table.singerId],
foreignColumns: [singer.id],
name: "fkey_singer_id"
}),
foreignKey({
columns: [table.songId],
foreignColumns: [songs.id],
name: "fkey_relations_producer_songs_id"
name: "fkey_song_id"
}),
check("relation_singer_id_not_null", sql`NOT NULL id`),
check("relation_singer_song_id_not_null", sql`NOT NULL song_id`),
check("relation_singer_singer_id_not_null", sql`NOT NULL singer_id`),
check("relation_singer_created_at_not_null", sql`NOT NULL created_at`),
check("relation_singer_updated_at_not_null", sql`NOT NULL updated_at`),
]);
export const relations = pgTable("relations", {
id: integer().notNull(),
// You can use { mode: "bigint" } if numbers are exceeding js number limitations
sourceId: bigint("source_id", { mode: "number" }).notNull(),
sourceType: text("source_type").notNull(),
// You can use { mode: "bigint" } if numbers are exceeding js number limitations
targetId: bigint("target_id", { mode: "number" }).notNull(),
targetType: text("target_type").notNull(),
relation: text().notNull(),
createdAt: timestamp("created_at", { precision: 6, withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(),
updatedAt: timestamp("updated_at", { precision: 6, withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(),
}, (table) => [
check("relations_id_not_null", sql`NOT NULL id`),
check("relations_source_id_not_null", sql`NOT NULL source_id`),
check("relations_source_type_not_null", sql`NOT NULL source_type`),
check("relations_target_id_not_null", sql`NOT NULL target_id`),
check("relations_target_type_not_null", sql`NOT NULL target_type`),
check("relations_relation_not_null", sql`NOT NULL relation`),
check("relations_created_at_not_null", sql`NOT NULL created_at`),
check("relations_updated_at_not_null", sql`NOT NULL updated_at`),
]);

View File

@ -2,3 +2,4 @@ export * from "./math";
export * from "./milestone";
export * from "./randomID";
export * from "./time";
export * from "./type";

View File

@ -0,0 +1 @@
export type PartialBy<T, K extends keyof T> = Omit<T, K> & Partial<Pick<T, K>>;

View File

@ -1,67 +1,95 @@
import type { Psql } from "@core/db/psql.d";
import { BiliUserType, BiliVideoMetadataType } from "@core/db/schema";
import {
bilibiliMetadata,
BilibiliMetadataType,
bilibiliUser,
db,
labellingResult
} from "@core/drizzle";
import { AkariModelVersion } from "ml/const";
import { eq, isNull } from "drizzle-orm";
import { PartialBy } from "@core/lib";
export async function videoExistsInAllData(sql: Psql, aid: number) {
const rows = await sql<{ exists: boolean }[]>`
SELECT EXISTS(SELECT 1 FROM bilibili_metadata WHERE aid = ${aid})
`;
return rows[0].exists;
export async function insertIntoMetadata(
data: PartialBy<BilibiliMetadataType, "id" | "createdAt" | "status">
) {
await db.insert(bilibiliMetadata).values(data);
}
export async function userExistsInBiliUsers(sql: Psql, uid: number) {
const rows = await sql<{ exists: boolean }[]>`
SELECT EXISTS(SELECT 1 FROM bilibili_user WHERE uid = ${uid})
`;
return rows[0].exists;
export async function videoExistsInAllData(aid: number) {
const rows = await db
.select({
id: bilibiliMetadata.id
})
.from(bilibiliMetadata)
.where(eq(bilibiliMetadata.aid, aid))
.limit(1);
return rows.length > 0;
}
export async function getUnlabelledVideos(sql: Psql) {
const rows = await sql<{ aid: number }[]>`
SELECT a.aid FROM bilibili_metadata a LEFT JOIN labelling_result l ON a.aid = l.aid WHERE l.aid IS NULL
`;
export async function userExistsInBiliUsers(uid: number) {
const rows = await db
.select({ id: bilibiliUser.id })
.from(bilibiliUser)
.where(eq(bilibiliUser.uid, uid))
.limit(1);
return rows.length > 0;
}
export async function getUnlabelledVideos() {
const rows = await db
.select({ aid: bilibiliMetadata.aid })
.from(bilibiliMetadata)
.leftJoin(labellingResult, eq(bilibiliMetadata.aid, labellingResult.aid))
.where(isNull(labellingResult.aid));
return rows.map((row) => row.aid);
}
export async function insertVideoLabel(sql: Psql, aid: number, label: number) {
await sql`
INSERT INTO labelling_result (aid, label, model_version) VALUES (${aid}, ${label}, ${AkariModelVersion}) ON CONFLICT (aid, model_version) DO NOTHING
`;
export async function insertVideoLabel(aid: number, label: number) {
await db
.insert(labellingResult)
.values({
aid,
label,
modelVersion: AkariModelVersion
})
.onConflictDoNothing({
target: [labellingResult.aid, labellingResult.modelVersion]
});
}
export async function getVideoInfoFromAllData(sql: Psql, aid: number) {
const rows = await sql<BiliVideoMetadataType[]>`
SELECT * FROM bilibili_metadata WHERE aid = ${aid}
`;
const row = rows[0];
let authorInfo = "";
if (row.uid && (await userExistsInBiliUsers(sql, row.uid))) {
const userRows = await sql<BiliUserType[]>`
SELECT * FROM bilibili_user WHERE uid = ${row.uid}
`;
const userRow = userRows[0];
if (userRow) {
authorInfo = userRow.desc;
}
export async function getVideoInfoFromAllData(aid: number) {
const rows = await db
.select()
.from(bilibiliMetadata)
.where(eq(bilibiliMetadata.aid, aid))
.limit(1);
if (rows.length === 0) {
return null;
}
const row = rows[0];
return {
title: row.title,
description: row.description,
tags: row.tags,
author_info: authorInfo
tags: row.tags
};
}
export async function setBiliVideoStatus(sql: Psql, aid: number, status: number) {
await sql`
UPDATE bilibili_metadata SET status = ${status} WHERE aid = ${aid}
`;
export async function setBiliVideoStatus(aid: number, status: number) {
await db.update(bilibiliMetadata).set({ status }).where(eq(bilibiliMetadata.aid, aid));
}
export async function getBiliVideoStatus(sql: Psql, aid: number) {
const rows = await sql<{ status: number }[]>`
SELECT status FROM bilibili_metadata WHERE aid = ${aid}
`;
export async function getBiliVideoStatus(aid: number) {
const rows = await db
.select({ status: bilibiliMetadata.status })
.from(bilibiliMetadata)
.where(eq(bilibiliMetadata.aid, aid))
.limit(1);
if (rows.length === 0) return 0;
return rows[0].status;
}

View File

@ -1,30 +1,56 @@
import { LatestSnapshotType } from "@core/db/schema";
import { SnapshotNumber } from "mq/task/getVideoStats";
import type { Psql } from "@core/db/psql.d";
import {
db,
eta,
latestVideoSnapshot as lv,
songs,
videoSnapshot,
VideoSnapshotType
} from "@core/drizzle";
import { PartialBy } from "@core/lib";
import { and, eq, getTableColumns, gte, lte, lt, or } from "drizzle-orm";
import { union } from "drizzle-orm/pg-core";
import { sql } from "drizzle-orm";
export async function getVideosNearMilestone(sql: Psql) {
const queryResult = await sql<LatestSnapshotType[]>`
SELECT ls.*
FROM latest_video_snapshot ls
RIGHT JOIN songs ON songs.aid = ls.aid
WHERE
(views >= 60000 AND views < 100000) OR
(views >= 900000 AND views < 1000000) OR
views > 1000000
UNION
SELECT ls.*
FROM latest_video_snapshot ls
WHERE
(views >= 90000 AND views < 100000) OR
(views >= 900000 AND views < 1000000) OR
(views >= CEIL(views::float/1000000::float)*1000000-100000 AND views < CEIL(views::float/1000000::float)*1000000)
UNION
SELECT ls.*
FROM latest_video_snapshot ls
JOIN eta ON eta.aid = ls.aid
WHERE eta.eta < 2300
`;
return queryResult.map((row) => {
export async function insertVideoSnapshot(data: PartialBy<VideoSnapshotType, "id">) {
await db.insert(videoSnapshot).values(data);
}
export async function getVideosNearMilestone() {
const results = await union(
db
.select({ ...getTableColumns(lv) })
.from(lv)
.rightJoin(songs, eq(lv.aid, songs.aid))
.where(
or(
and(gte(lv.views, 60000), lt(lv.views, 100000)),
and(gte(lv.views, 900000), lt(lv.views, 1000000)),
gte(lv.views, 1000000)
)
),
db
.select({ ...getTableColumns(lv) })
.from(lv)
.where(
or(
and(gte(lv.views, 60000), lt(lv.views, 100000)),
and(gte(lv.views, 900000), lt(lv.views, 1000000)),
and(
sql`views >= CEIL(views::float/1000000::float)*1000000-100000`,
sql`views < CEIL(views::float/1000000::float)*1000000)`
)
)
),
db
.select({ ...getTableColumns(lv) })
.from(lv)
.innerJoin(eta, eq(lv.aid, eta.aid))
.where(lte(eta.eta, 2300))
);
return results.map((row) => {
return {
...row,
aid: Number(row.aid)
@ -32,7 +58,10 @@ export async function getVideosNearMilestone(sql: Psql) {
});
}
export async function getLatestVideoSnapshot(sql: Psql, aid: number): Promise<null | SnapshotNumber> {
export async function getLatestVideoSnapshot(
sql: Psql,
aid: number
): Promise<null | SnapshotNumber> {
const queryResult = await sql<LatestSnapshotType[]>`
SELECT *
FROM latest_video_snapshot

View File

@ -3,63 +3,63 @@ import logger from "@core/log";
import { WorkerError } from "mq/schema";
class AkariAPI {
private readonly serviceReady: Promise<boolean>;
private readonly serviceReady: Promise<boolean>;
constructor() {
// Wait for the ML API service to be ready on startup
this.serviceReady = apiManager.waitForService();
}
constructor() {
// Wait for the ML API service to be ready on startup
this.serviceReady = apiManager.waitForService();
}
public async init(): Promise<void> {
const isReady = await this.serviceReady;
if (!isReady) {
throw new WorkerError(
new Error("ML API service failed to become ready"),
"ml",
"fn:init"
);
}
logger.log("Akari API initialized successfully", "ml");
}
public async init(): Promise<void> {
const isReady = await this.serviceReady;
if (!isReady) {
throw new WorkerError(
new Error("ML API service failed to become ready"),
"ml",
"fn:init"
);
}
logger.log("Akari API initialized successfully", "ml");
}
public async classifyVideo(
title: string,
description: string,
tags: string,
aid?: number
): Promise<number> {
try {
// Ensure service is ready
await this.serviceReady;
const label = await apiManager.classifyVideo(title, description, tags, aid);
return label;
} catch (error) {
logger.error(`Classification failed for aid ${aid}: ${error}`, "ml");
throw new WorkerError(error as Error, "ml", "fn:classifyVideo");
}
}
public async classifyVideo(
title: string,
description: string,
tags: string,
aid?: number
): Promise<number> {
try {
// Ensure service is ready
await this.serviceReady;
public async classifyVideosBatch(
videos: Array<{ title: string; description: string; tags: string; aid?: number }>
): Promise<Array<{ aid?: number; label: number; probabilities: number[] }>> {
try {
// Ensure service is ready
await this.serviceReady;
const results = await apiManager.classifyVideosBatch(videos);
return results;
} catch (error) {
logger.error(`Batch classification failed: ${error}`, "ml");
throw new WorkerError(error as Error, "ml", "fn:classifyVideosBatch");
}
}
const label = await apiManager.classifyVideo(title, description, tags, aid);
return label;
} catch (error) {
logger.error(`Classification failed for aid ${aid}: ${error}`, "ml");
throw new WorkerError(error as Error, "ml", "fn:classifyVideo");
}
}
public async healthCheck(): Promise<boolean> {
return await apiManager.healthCheck();
}
public async classifyVideosBatch(
videos: Array<{ title: string; description: string; tags: string; aid?: number }>
): Promise<Array<{ aid?: number; label: number; probabilities: number[] }>> {
try {
// Ensure service is ready
await this.serviceReady;
const results = await apiManager.classifyVideosBatch(videos);
return results;
} catch (error) {
logger.error(`Batch classification failed: ${error}`, "ml");
throw new WorkerError(error as Error, "ml", "fn:classifyVideosBatch");
}
}
public async healthCheck(): Promise<boolean> {
return await apiManager.healthCheck();
}
}
// Create a singleton instance
const Akari = new AkariAPI();
export default Akari;
export default Akari;

View File

@ -2,145 +2,145 @@ import logger from "@core/log";
import { WorkerError } from "mq/schema";
interface ClassificationRequest {
title: string;
description: string;
tags: string;
aid?: number;
title: string;
description: string;
tags: string;
aid?: number;
}
interface ClassificationResponse {
label: number;
probabilities: number[];
aid?: number;
label: number;
probabilities: number[];
aid?: number;
}
interface HealthResponse {
status: string;
models_loaded: boolean;
status: string;
models_loaded: boolean;
}
export class APIManager {
private readonly baseUrl: string;
private readonly timeout: number;
private readonly baseUrl: string;
private readonly timeout: number;
constructor(baseUrl: string = "http://localhost:8544", timeout: number = 30000) {
this.baseUrl = baseUrl;
this.timeout = timeout;
}
constructor(baseUrl: string = "http://localhost:8544", timeout: number = 30000) {
this.baseUrl = baseUrl;
this.timeout = timeout;
}
public async healthCheck(): Promise<boolean> {
try {
const response = await fetch(`${this.baseUrl}/health`, {
method: 'GET',
headers: {
'Content-Type': 'application/json',
},
signal: AbortSignal.timeout(this.timeout),
});
public async healthCheck(): Promise<boolean> {
try {
const response = await fetch(`${this.baseUrl}/health`, {
method: "GET",
headers: {
"Content-Type": "application/json"
},
signal: AbortSignal.timeout(this.timeout)
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
const data: HealthResponse = await response.json();
return data.models_loaded;
} catch (error) {
logger.error(`Health check failed: ${error}`, "ml");
return false;
}
}
const data: HealthResponse = await response.json();
return data.models_loaded;
} catch (error) {
logger.error(`Health check failed: ${error}`, "ml");
return false;
}
}
public async classifyVideo(
title: string,
description: string,
tags: string,
aid?: number
): Promise<number> {
const request: ClassificationRequest = {
title: title.trim() || "untitled",
description: description.trim() || "N/A",
tags: tags.trim() || "empty",
aid: aid
};
public async classifyVideo(
title: string,
description: string,
tags: string,
aid?: number
): Promise<number> {
const request: ClassificationRequest = {
title: title.trim() || "untitled",
description: description.trim() || "N/A",
tags: tags.trim() || "empty",
aid: aid
};
try {
const response = await fetch(`${this.baseUrl}/classify`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(request),
signal: AbortSignal.timeout(this.timeout),
});
try {
const response = await fetch(`${this.baseUrl}/classify`, {
method: "POST",
headers: {
"Content-Type": "application/json"
},
body: JSON.stringify(request),
signal: AbortSignal.timeout(this.timeout)
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
const data: ClassificationResponse = await response.json();
if (aid) {
logger.log(
`Prediction result for aid: ${aid}: [${data.probabilities.map((p) => p.toFixed(5))}]`,
"ml"
);
}
const data: ClassificationResponse = await response.json();
return data.label;
} catch (error) {
logger.error(`Classification failed for aid ${aid}: ${error}`, "ml");
throw new WorkerError(error as Error, "ml", "fn:classifyVideo");
}
}
if (aid) {
logger.log(
`Prediction result for aid: ${aid}: [${data.probabilities.map((p) => p.toFixed(5))}]`,
"ml"
);
}
public async classifyVideosBatch(
requests: Array<{ title: string; description: string; tags: string; aid?: number }>
): Promise<Array<{ aid?: number; label: number; probabilities: number[] }>> {
try {
const response = await fetch(`${this.baseUrl}/classify_batch`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(requests),
signal: AbortSignal.timeout(this.timeout * 2), // Longer timeout for batch
});
return data.label;
} catch (error) {
logger.error(`Classification failed for aid ${aid}: ${error}`, "ml");
throw new WorkerError(error as Error, "ml", "fn:classifyVideo");
}
}
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
public async classifyVideosBatch(
requests: Array<{ title: string; description: string; tags: string; aid?: number }>
): Promise<Array<{ aid?: number; label: number; probabilities: number[] }>> {
try {
const response = await fetch(`${this.baseUrl}/classify_batch`, {
method: "POST",
headers: {
"Content-Type": "application/json"
},
body: JSON.stringify(requests),
signal: AbortSignal.timeout(this.timeout * 2) // Longer timeout for batch
});
const data = await response.json();
return data.results;
} catch (error) {
logger.error(`Batch classification failed: ${error}`, "ml");
throw new WorkerError(error as Error, "ml", "fn:classifyVideosBatch");
}
}
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
public async waitForService(timeoutMs: number = 60000): Promise<boolean> {
const startTime = Date.now();
const checkInterval = 2000; // Check every 2 seconds
const data = await response.json();
return data.results;
} catch (error) {
logger.error(`Batch classification failed: ${error}`, "ml");
throw new WorkerError(error as Error, "ml", "fn:classifyVideosBatch");
}
}
while (Date.now() - startTime < timeoutMs) {
try {
const isHealthy = await this.healthCheck();
if (isHealthy) {
logger.log("ML API service is healthy", "ml");
return true;
}
} catch (error) {
// Service not ready yet, continue waiting
}
public async waitForService(timeoutMs: number = 60000): Promise<boolean> {
const startTime = Date.now();
const checkInterval = 2000; // Check every 2 seconds
await new Promise(resolve => setTimeout(resolve, checkInterval));
}
while (Date.now() - startTime < timeoutMs) {
try {
const isHealthy = await this.healthCheck();
if (isHealthy) {
logger.log("ML API service is healthy", "ml");
return true;
}
} catch (error) {
// Service not ready yet, continue waiting
}
logger.error("ML API service did not become ready within timeout", "ml");
return false;
}
await new Promise((resolve) => setTimeout(resolve, checkInterval));
}
logger.error("ML API service did not become ready within timeout", "ml");
return false;
}
}
// Create a singleton instance
const apiManager = new APIManager();
export default apiManager;
export default apiManager;

View File

@ -22,7 +22,11 @@ export class AIManager {
public getModelSession(key: string): ort.InferenceSession {
if (this.sessions[key] === undefined) {
throw new WorkerError(new Error(`Model ${key} not found / not initialized.`), "ml", "fn:getModelSession");
throw new WorkerError(
new Error(`Model ${key} not found / not initialized.`),
"ml",
"fn:getModelSession"
);
}
return this.sessions[key];
}

View File

@ -1,5 +1,9 @@
import { Job } from "bullmq";
import { getUnlabelledVideos, getVideoInfoFromAllData, insertVideoLabel } from "../../db/bilibili_metadata";
import {
getUnlabelledVideos,
getVideoInfoFromAllData,
insertVideoLabel
} from "../../db/bilibili_metadata";
import Akari from "ml/akari_api";
import { ClassifyVideoQueue } from "mq/index";
import logger from "@core/log";
@ -16,20 +20,23 @@ export const classifyVideoWorker = async (job: Job) => {
return 3;
}
const videoInfo = await getVideoInfoFromAllData(sql, aid);
const title = videoInfo.title?.trim() || "untitled";
const description = videoInfo.description?.trim() || "N/A";
const videoInfo = await getVideoInfoFromAllData(aid);
if (!videoInfo) {
return 3;
}
const title = videoInfo.title.trim();
const description = videoInfo.description.trim();
const tags = videoInfo.tags?.trim() || "empty";
const label = await Akari.classifyVideo(title, description, tags, aid);
if (label == -1) {
logger.warn(`Failed to classify video ${aid}`, "ml");
}
await insertVideoLabel(sql, aid, label);
await insertVideoLabel(aid, label);
const exists = await aidExistsInSongs(sql, aid);
if (!exists && label !== 0) {
await scheduleSnapshot(sql, aid, "new", Date.now() + 10 * MINUTE, true);
await insertIntoSongs(sql, aid);
await insertIntoSongs(aid);
}
await job.updateData({
@ -48,7 +55,7 @@ export const classifyVideosWorker = async () => {
await lockManager.acquireLock("classifyVideos", 5 * 60);
const videos = await getUnlabelledVideos(sql);
const videos = await getUnlabelledVideos();
logger.log(`Found ${videos.length} unlabelled videos`);
const startTime = new Date().getTime();

View File

@ -2,17 +2,19 @@ import { queueJobsCounter } from "metrics";
import { SnapshotQueue } from "mq";
export const collectQueueMetrics = async () => {
const counts = await SnapshotQueue.getJobCounts();
const counts = await SnapshotQueue.getJobCounts();
const waiting = counts?.waiting;
const prioritized = counts?.prioritized;
const active = counts?.active;
const completed = counts?.completed;
const failed = counts?.failed;
const delayed = counts?.delayed;
waiting && queueJobsCounter.record(waiting, { queueName: "SnapshotQueue", status: "waiting" });
prioritized && queueJobsCounter.record(prioritized, { queueName: "SnapshotQueue", status: "prioritized" });
active && queueJobsCounter.record(active, { queueName: "SnapshotQueue", status: "active" });
completed && queueJobsCounter.record(completed, { queueName: "SnapshotQueue", status: "completed" });
failed && queueJobsCounter.record(failed, { queueName: "SnapshotQueue", status: "failed" });
delayed && queueJobsCounter.record(delayed, { queueName: "SnapshotQueue", status: "delayed" });
}
const active = counts?.active;
const completed = counts?.completed;
const failed = counts?.failed;
const delayed = counts?.delayed;
waiting && queueJobsCounter.record(waiting, { queueName: "SnapshotQueue", status: "waiting" });
prioritized &&
queueJobsCounter.record(prioritized, { queueName: "SnapshotQueue", status: "prioritized" });
active && queueJobsCounter.record(active, { queueName: "SnapshotQueue", status: "active" });
completed &&
queueJobsCounter.record(completed, { queueName: "SnapshotQueue", status: "completed" });
failed && queueJobsCounter.record(failed, { queueName: "SnapshotQueue", status: "failed" });
delayed && queueJobsCounter.record(delayed, { queueName: "SnapshotQueue", status: "delayed" });
};

View File

@ -4,14 +4,14 @@ import { sql } from "@core/db/dbNew";
import { lockManager } from "@core/mq/lockManager";
export const directSnapshotWorker = async (job: Job): Promise<void> => {
const lock = await lockManager.isLocked(`directSnapshot-${job.data.aid}`);
if (lock) {
return;
}
const lock = await lockManager.isLocked(`directSnapshot-${job.data.aid}`);
if (lock) {
return;
}
const aid = job.data.aid;
if (!aid) {
throw new Error("aid does not exists");
}
await insertVideoSnapshot(sql, aid, "snapshotMilestoneVideo");
await lockManager.acquireLock(`directSnapshot-${job.data.aid}`, 75);
await lockManager.acquireLock(`directSnapshot-${job.data.aid}`, 75);
};

View File

@ -1,7 +1,10 @@
import { Job } from "bullmq";
import { getLatestVideoSnapshot } from "db/snapshot";
import { truncate } from "utils/truncate";
import { getVideosWithoutActiveSnapshotScheduleByType, scheduleSnapshot } from "db/snapshotSchedule";
import {
getVideosWithoutActiveSnapshotScheduleByType,
scheduleSnapshot
} from "db/snapshotSchedule";
import logger from "@core/log";
import { HOUR, MINUTE, WEEK } from "@core/lib";
import { lockManager } from "@core/mq/lockManager";
@ -25,7 +28,11 @@ export const dispatchRegularSnapshotsWorker = async (_job: Job): Promise<void> =
const lastSnapshotedAt = latestSnapshot?.time ?? now;
const interval = await getRegularSnapshotInterval(sql, aid);
logger.log(`Scheduled regular snapshot for aid ${aid} in ${interval} hours.`, "mq");
const targetTime = truncate(lastSnapshotedAt + interval * HOUR, now + 1, now + 100000 * WEEK);
const targetTime = truncate(
lastSnapshotedAt + interval * HOUR,
now + 1,
now + 100000 * WEEK
);
await scheduleSnapshot(sql, aid, "normal", targetTime);
if (now - startedAt > 25 * MINUTE) {
return;

View File

@ -1,14 +1,109 @@
import { Job } from "bullmq";
import { insertVideoInfo } from "mq/task/getVideoDetails";
import { getVideoDetails } from "net/getVideoDetails";
import logger from "@core/log";
import { sql } from "@core/db/dbNew";
import { ClassifyVideoQueue, latestVideosEventsProducer } from "mq/index";
import {
insertIntoMetadata,
userExistsInBiliUsers,
videoExistsInAllData
} from "db/bilibili_metadata";
import { insertIntoSongs } from "mq/task/collectSongs";
import { bilibiliUser, db, videoSnapshot } from "@core/drizzle";
import { eq } from "drizzle-orm";
import { GetVideoInfoJobData } from "mq/schema";
export const getVideoInfoWorker = async (job: Job): Promise<void> => {
interface AddSongEventPayload {
eventName: string;
uid: string;
songID: number;
}
const publishAddsongEvent = async (songID: number, uid: string) =>
latestVideosEventsProducer.publishEvent<AddSongEventPayload>({
eventName: "addSong",
uid: uid,
songID: songID
});
export const getVideoInfoWorker = async (job: Job<GetVideoInfoJobData>): Promise<void> => {
const aid = job.data.aid;
const insert = job.data.insertSongs || false;
const insertSongs = job.data.insertSongs || false;
if (!aid) {
logger.warn("aid does not exists", "mq", "job:getVideoInfo");
return;
}
await insertVideoInfo(sql, aid, insert);
const videoExists = await videoExistsInAllData(aid);
if (videoExists && !insertSongs) {
return;
}
if (videoExists && insertSongs) {
const songs = await insertIntoSongs(aid);
if (songs.length === 0) {
logger.warn(`Failed to insert song for aid: ${aid}`, "mq", "fn:getVideoInfoWorker");
return;
}
await publishAddsongEvent(songs[0].id, job.data.uid);
return;
}
const data = await getVideoDetails(aid);
if (data === null) {
return null;
}
const uid = data.View.owner.mid;
await insertIntoMetadata({
aid,
bvid: data.View.bvid,
description: data.View.desc,
uid: uid,
tags: data.Tags.filter((tag) => !["old_channel", "topic"].indexOf(tag.tag_type))
.map((tag) => tag.tag_name)
.join(","),
title: data.View.title,
publishedAt: new Date(data.View.pubdate).toISOString(),
duration: data.View.duration,
coverUrl: data.View.pic
});
const userExists = await userExistsInBiliUsers(aid);
if (!userExists) {
await db.insert(bilibiliUser).values({
uid,
username: data.View.owner.name,
desc: data.Card.card.sign,
fans: data.Card.follower
});
} else {
await db
.update(bilibiliUser)
.set({ username: data.View.owner.name, desc: data.Card.card.sign })
.where(eq(bilibiliUser.uid, uid));
}
const stat = data.View.stat;
await db.insert(videoSnapshot).values({
aid,
views: stat.view,
danmakus: stat.danmaku,
replies: stat.reply,
likes: stat.like,
coins: stat.coin,
shares: stat.share,
favorites: stat.favorite
});
logger.log(`Inserted video metadata for aid: ${aid}`, "mq");
if (!insertSongs) {
await ClassifyVideoQueue.add("classifyVideo", { aid });
return;
}
const songs = await insertIntoSongs(aid);
if (songs.length === 0) {
logger.warn(`Failed to insert song for aid: ${aid}`, "mq", "fn:getVideoInfoWorker");
return;
}
await publishAddsongEvent(songs[0].id, job.data.uid);
};

View File

@ -1,5 +1,5 @@
import { Queue, ConnectionOptions } from "bullmq";
import { redis } from "@core/db/redis";
import { Queue, ConnectionOptions, QueueEventsProducer } from "bullmq";
import { redis } from "bun";
export const LatestVideosQueue = new Queue("latestVideos", {
connection: redis as ConnectionOptions
@ -15,4 +15,8 @@ export const SnapshotQueue = new Queue("snapshot", {
export const MiscQueue = new Queue("misc", {
connection: redis as ConnectionOptions
});
});
export const latestVideosEventsProducer = new QueueEventsProducer("latestVideos", {
connection: redis as ConnectionOptions
});

View File

@ -10,3 +10,9 @@ export class WorkerError extends Error {
this.rawError = rawError;
}
}
export interface GetVideoInfoJobData {
aid: number;
insertSongs?: boolean;
uid?: string;
}

View File

@ -4,49 +4,53 @@ import logger from "@core/log";
import { scheduleSnapshot } from "db/snapshotSchedule";
import { MINUTE } from "@core/lib";
import type { Psql } from "@core/db/psql.d";
import { db, songs } from "@core/drizzle";
import { and, eq, sql as drizzleSQL } from "drizzle-orm";
export async function collectSongs() {
const aids = await getNotCollectedSongs(sql);
for (const aid of aids) {
const exists = await aidExistsInSongs(sql, aid);
if (exists) continue;
await insertIntoSongs(sql, aid);
await insertIntoSongs(aid);
await scheduleSnapshot(sql, aid, "new", Date.now() + 10 * MINUTE, true);
logger.log(`Video ${aid} was added into the songs table.`, "mq", "fn:collectSongs");
}
}
export async function insertIntoSongs(sql: Psql, aid: number) {
const songExistsAndDeleted = await sql`
SELECT EXISTS (
SELECT 1
FROM songs
WHERE aid = ${aid}
AND deleted = true
);
`;
if (songExistsAndDeleted[0].exists) {
await sql`
UPDATE songs
SET deleted = false
WHERE aid = ${aid}
`;
export async function insertIntoSongs(aid: number) {
const song = await db
.select({ id: songs.id })
.from(songs)
.where(and(eq(songs.aid, aid), eq(songs.deleted, true)))
.limit(1);
const songExistsAndDeleted = song.length > 0;
if (songExistsAndDeleted) {
const data = await db
.update(songs)
.set({ deleted: false })
.where(eq(songs.id, song[0].id))
.returning();
return data;
}
await sql`
INSERT INTO songs (aid, published_at, duration, image, producer)
VALUES (
$1,
(SELECT published_at FROM bilibili_metadata WHERE aid = ${aid}),
(SELECT duration FROM bilibili_metadata WHERE aid = ${aid}),
(SELECT cover_url FROM bilibili_metadata WHERE aid = ${aid}),
(
SELECT username
FROM bilibili_user bu
JOIN bilibili_metadata bm
ON bm.uid = bu.uid
WHERE bm.aid = ${aid}
)
)
ON CONFLICT DO NOTHING
`;
const data = await db
.insert(songs)
.values({
aid,
publishedAt: drizzleSQL`SELECT published_at FROM bilibili_metadata WHERE aid = ${aid}`,
duration: drizzleSQL`SELECT duration FROM bilibili_metadata WHERE aid = ${aid}`,
image: drizzleSQL`SELECT cover_url FROM bilibili_metadata WHERE aid = ${aid}`,
producer: drizzleSQL`
SELECT username
FROM bilibili_user bu
JOIN bilibili_metadata bm
ON bm.uid = bu.uid
WHERE bm.aid = ${aid}
`
})
.onConflictDoNothing()
.returning();
return data;
}

View File

@ -1,72 +0,0 @@
import { getVideoDetails } from "net/getVideoDetails";
import { formatTimestampToPsql } from "utils/formatTimestampToPostgre";
import logger from "@core/log";
import { ClassifyVideoQueue } from "mq/index";
import { userExistsInBiliUsers, videoExistsInAllData } from "db/bilibili_metadata";
import { HOUR, SECOND } from "@core/lib";
import type { Psql } from "@core/db/psql.d";
import { insertIntoSongs } from "./collectSongs";
export async function insertVideoInfo(sql: Psql, aid: number, insertSongs = false) {
const videoExists = await videoExistsInAllData(sql, aid);
if (videoExists && !insertSongs) {
return;
}
if (videoExists && insertSongs) {
await insertIntoSongs(sql, aid);
return;
}
const data = await getVideoDetails(aid);
if (data === null) {
return null;
}
const bvid = data.View.bvid;
const desc = data.View.desc;
const uid = data.View.owner.mid;
const tags = data.Tags.filter((tag) => !["old_channel", "topic"].indexOf(tag.tag_type))
.map((tag) => tag.tag_name)
.join(",");
const title = data.View.title;
const published_at = formatTimestampToPsql(data.View.pubdate * SECOND + 8 * HOUR);
const duration = data.View.duration;
const cover = data.View.pic;
await sql`
INSERT INTO bilibili_metadata (aid, bvid, description, uid, tags, title, published_at, duration, cover_url)
VALUES (${aid}, ${bvid}, ${desc}, ${uid}, ${tags}, ${title}, ${published_at}, ${duration}, ${cover})
`;
const userExists = await userExistsInBiliUsers(sql, aid);
if (!userExists) {
await sql`
INSERT INTO bilibili_user (uid, username, "desc", fans)
VALUES (${uid}, ${data.View.owner.name}, ${data.Card.card.sign}, ${data.Card.follower})
`;
} else {
await sql`
UPDATE bilibili_user SET fans = ${data.Card.follower} WHERE uid = ${uid}
`;
}
const stat = data.View.stat;
await sql`
INSERT INTO video_snapshot (aid, views, danmakus, replies, likes, coins, shares, favorites)
VALUES (
${aid},
${stat.view},
${stat.danmaku},
${stat.reply},
${stat.like},
${stat.coin},
${stat.share},
${stat.favorite}
)
`;
logger.log(`Inserted video metadata for aid: ${aid}`, "mq");
if (!insertSongs) {
await ClassifyVideoQueue.add("classifyVideo", { aid });
return;
}
await insertIntoSongs(sql, aid);
}

View File

@ -20,7 +20,7 @@ export async function queueLatestVideos(sql: Psql): Promise<number | null> {
let allExists = true;
let delay = 0;
for (const aid of aids) {
const videoExists = await videoExistsInAllData(sql, aid);
const videoExists = await videoExistsInAllData(aid);
if (videoExists) {
continue;
}

View File

@ -2,7 +2,10 @@ import type { VideoListResponse } from "@core/net/bilibili.d";
import logger from "@core/log";
import networkDelegate from "@core/net/delegate";
export async function getLatestVideoAids(page: number = 1, pageSize: number = 10): Promise<number[]> {
export async function getLatestVideoAids(
page: number = 1,
pageSize: number = 10
): Promise<number[]> {
const startFrom = 1 + pageSize * (page - 1);
const endTo = pageSize * page;
const range = `${startFrom}-${endTo}`;

View File

@ -2,9 +2,15 @@ import networkDelegate from "@core/net/delegate";
import type { VideoDetailsData, VideoDetailsResponse } from "@core/net/bilibili.d";
import logger from "@core/log";
export async function getVideoDetails(aid: number, archive: boolean = false): Promise<VideoDetailsData | null> {
export async function getVideoDetails(
aid: number,
archive: boolean = false
): Promise<VideoDetailsData | null> {
const url = `https://api.bilibili.com/x/web-interface/view/detail?aid=${aid}`;
const { data } = await networkDelegate.request<VideoDetailsResponse>(url, archive ? "" : "getVideoInfo");
const { data } = await networkDelegate.request<VideoDetailsResponse>(
url,
archive ? "" : "getVideoInfo"
);
const errMessage = `Error fetching metadata for ${aid}:`;
if (data.code !== 0) {
logger.error(errMessage + data.code + "-" + data.message, "net", "fn:getVideoInfo");

View File

@ -12,7 +12,7 @@ const shutdown = async (signal: string, filterWorker: Worker<any, any, string>)
process.exit(0);
};
await Akari.init()
await Akari.init();
const filterWorker = new Worker(
"classifyVideo",

View File

@ -78,7 +78,7 @@ const snapshotWorker = new Worker(
"snapshot",
async (job: Job) => {
switch (job.name) {
case "directSnapshot":
case "directSnapshot":
return await directSnapshotWorker(job);
case "snapshotVideo":
return await snapshotVideoWorker(job);
@ -124,4 +124,4 @@ const miscWorker = new Worker(
miscWorker.on("error", (err) => {
const e = err as WorkerError;
logger.error(e.rawError, e.service, e.codePath);
});
});