update: getVideoTags with NetScheduler
improve: extracted PostgreSQL config
This commit is contained in:
parent
248978a3e8
commit
471a522d05
@ -1,22 +1,26 @@
|
|||||||
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
import { Client, Transaction } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
import { AllDataType } from "lib/db/schema.d.ts";
|
import { AllDataType } from "lib/db/schema.d.ts";
|
||||||
import logger from "lib/log/logger.ts";
|
import logger from "lib/log/logger.ts";
|
||||||
|
|
||||||
export async function videoExistsInAllData(client: Client, aid: number) {
|
export async function videoExistsInAllData(client: Client, aid: number) {
|
||||||
return await client.queryObject<{ exists: boolean }>("SELECT EXISTS(SELECT 1 FROM all_data WHERE aid = $1)", [aid])
|
return await client.queryObject<{ exists: boolean }>(`SELECT EXISTS(SELECT 1 FROM all_data WHERE aid = $1)`, [aid])
|
||||||
.then((result) => result.rows[0].exists);
|
.then((result) => result.rows[0].exists);
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function insertIntoAllData(client: Client, data: AllDataType) {
|
export async function insertIntoAllData(client: Client, data: AllDataType) {
|
||||||
logger.log(`inserted ${data.aid}`, "db-all_data")
|
logger.log(`inserted ${data.aid}`, "db-all_data");
|
||||||
return await client.queryObject(
|
return await client.queryObject(
|
||||||
"INSERT INTO all_data (aid, bvid, description, uid, tags, title, published_at) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (aid) DO NOTHING",
|
`INSERT INTO all_data (aid, bvid, description, uid, tags, title, published_at)
|
||||||
|
VALUES ($1, $2, $3, $4, $5, $6, $7)
|
||||||
|
ON CONFLICT (aid) DO NOTHING`,
|
||||||
[data.aid, data.bvid, data.description, data.uid, data.tags, data.title, data.published_at],
|
[data.aid, data.bvid, data.description, data.uid, data.tags, data.title, data.published_at],
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function getLatestVideoTimestampFromAllData(client: Client) {
|
export async function getLatestVideoTimestampFromAllData(client: Client) {
|
||||||
return await client.queryObject<{ published_at: string }>("SELECT published_at FROM all_data ORDER BY published_at DESC LIMIT 1")
|
return await client.queryObject<{ published_at: string }>(
|
||||||
|
`SELECT published_at FROM all_data ORDER BY published_at DESC LIMIT 1`,
|
||||||
|
)
|
||||||
.then((result) => {
|
.then((result) => {
|
||||||
const date = new Date(result.rows[0].published_at);
|
const date = new Date(result.rows[0].published_at);
|
||||||
if (isNaN(date.getTime())) {
|
if (isNaN(date.getTime())) {
|
||||||
@ -25,3 +29,10 @@ export async function getLatestVideoTimestampFromAllData(client: Client) {
|
|||||||
return date.getTime();
|
return date.getTime();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function videoTagsIsNull(client: Client | Transaction, aid: number) {
|
||||||
|
return await client.queryObject<{ exists: boolean }>(
|
||||||
|
`SELECT EXISTS(SELECT 1 FROM all_data WHERE aid = $1 AND tags IS NULL)`,
|
||||||
|
[aid],
|
||||||
|
).then((result) => result.rows[0].exists);
|
||||||
|
}
|
||||||
|
@ -1,27 +1,6 @@
|
|||||||
import { Pool } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
import { Pool } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
|
import {postgresConfig} from "lib/db/pgConfig.ts";
|
||||||
|
|
||||||
const requiredEnvVars = ["DB_HOST", "DB_NAME", "DB_USER", "DB_PASSWORD", "DB_PORT"];
|
const pool = new Pool(postgresConfig, 32);
|
||||||
|
|
||||||
const unsetVars = requiredEnvVars.filter((key) => Deno.env.get(key) === undefined);
|
|
||||||
|
|
||||||
if (unsetVars.length > 0) {
|
|
||||||
throw new Error(`Missing required environment variables: ${unsetVars.join(", ")}`);
|
|
||||||
}
|
|
||||||
|
|
||||||
const databaseHost = Deno.env.get("DB_HOST")!;
|
|
||||||
const databaseName = Deno.env.get("DB_NAME")!;
|
|
||||||
const databaseUser = Deno.env.get("DB_USER")!;
|
|
||||||
const databasePassword = Deno.env.get("DB_PASSWORD")!;
|
|
||||||
const databasePort = Deno.env.get("DB_PORT")!;
|
|
||||||
|
|
||||||
const postgresConfig = {
|
|
||||||
hostname: databaseHost,
|
|
||||||
port: parseInt(databasePort),
|
|
||||||
database: databaseName,
|
|
||||||
user: databaseUser,
|
|
||||||
password: databasePassword,
|
|
||||||
};
|
|
||||||
|
|
||||||
const pool = new Pool(postgresConfig, 4);
|
|
||||||
|
|
||||||
export const db = pool;
|
export const db = pool;
|
||||||
|
21
lib/db/pgConfig.ts
Normal file
21
lib/db/pgConfig.ts
Normal file
@ -0,0 +1,21 @@
|
|||||||
|
const requiredEnvVars = ["DB_HOST", "DB_NAME", "DB_USER", "DB_PASSWORD", "DB_PORT"];
|
||||||
|
|
||||||
|
const unsetVars = requiredEnvVars.filter((key) => Deno.env.get(key) === undefined);
|
||||||
|
|
||||||
|
if (unsetVars.length > 0) {
|
||||||
|
throw new Error(`Missing required environment variables: ${unsetVars.join(", ")}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const databaseHost = Deno.env.get("DB_HOST")!;
|
||||||
|
const databaseName = Deno.env.get("DB_NAME")!;
|
||||||
|
const databaseUser = Deno.env.get("DB_USER")!;
|
||||||
|
const databasePassword = Deno.env.get("DB_PASSWORD")!;
|
||||||
|
const databasePort = Deno.env.get("DB_PORT")!;
|
||||||
|
|
||||||
|
export const postgresConfig = {
|
||||||
|
hostname: databaseHost,
|
||||||
|
port: parseInt(databasePort),
|
||||||
|
database: databaseName,
|
||||||
|
user: databaseUser,
|
||||||
|
password: databasePassword,
|
||||||
|
};
|
@ -1,7 +1,7 @@
|
|||||||
import logger from "lib/log/logger.ts";
|
import logger from "lib/log/logger.ts";
|
||||||
import { RateLimiter } from "lib/mq/rateLimiter.ts";
|
import {RateLimiter} from "lib/mq/rateLimiter.ts";
|
||||||
import { SlidingWindow } from "lib/mq/slidingWindow.ts";
|
import {SlidingWindow} from "lib/mq/slidingWindow.ts";
|
||||||
import { redis } from "lib/db/redis.ts";
|
import {redis} from "lib/db/redis.ts";
|
||||||
import Redis from "ioredis";
|
import Redis from "ioredis";
|
||||||
|
|
||||||
interface Proxy {
|
interface Proxy {
|
||||||
@ -23,10 +23,12 @@ type NetSchedulerErrorCode =
|
|||||||
|
|
||||||
export class NetSchedulerError extends Error {
|
export class NetSchedulerError extends Error {
|
||||||
public errorCode: NetSchedulerErrorCode;
|
public errorCode: NetSchedulerErrorCode;
|
||||||
constructor(message: string, errorCode: NetSchedulerErrorCode) {
|
public rawError: unknown | undefined;
|
||||||
|
constructor(message: string, errorCode: NetSchedulerErrorCode, rawError?: unknown) {
|
||||||
super(message);
|
super(message);
|
||||||
this.name = "NetSchedulerError";
|
this.name = "NetSchedulerError";
|
||||||
this.errorCode = errorCode;
|
this.errorCode = errorCode;
|
||||||
|
this.rawError = rawError;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -56,7 +58,7 @@ class NetScheduler {
|
|||||||
* - The native `fetch` function threw an error: with error code FETCH_ERROR
|
* - The native `fetch` function threw an error: with error code FETCH_ERROR
|
||||||
* - The proxy type is not supported: with error code NOT_IMPLEMENTED
|
* - The proxy type is not supported: with error code NOT_IMPLEMENTED
|
||||||
*/
|
*/
|
||||||
async request<R>(url: string, method: string = "GET", task: string): Promise<R | null> {
|
async request<R>(url: string, task: string, method: string = "GET"): Promise<R> {
|
||||||
// find a available proxy
|
// find a available proxy
|
||||||
const proxiesNames = Object.keys(this.proxies);
|
const proxiesNames = Object.keys(this.proxies);
|
||||||
for (const proxyName of proxiesNames) {
|
for (const proxyName of proxiesNames) {
|
||||||
@ -133,29 +135,28 @@ class NetScheduler {
|
|||||||
private async nativeRequest<R>(url: string, method: string): Promise<R> {
|
private async nativeRequest<R>(url: string, method: string): Promise<R> {
|
||||||
try {
|
try {
|
||||||
const response = await fetch(url, { method });
|
const response = await fetch(url, { method });
|
||||||
const data = await response.json() as R;
|
return await response.json() as R;
|
||||||
return data;
|
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
logger.error(e as Error);
|
throw new NetSchedulerError("Fetch error", "FETCH_ERROR", e);
|
||||||
throw new NetSchedulerError("Fetch error", "FETCH_ERROR");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const netScheduler = new NetScheduler();
|
const netScheduler = new NetScheduler();
|
||||||
|
netScheduler.addProxy("default", "native", "default");
|
||||||
netScheduler.addProxy("tags-native", "native", "getVideoTags");
|
netScheduler.addProxy("tags-native", "native", "getVideoTags");
|
||||||
const tagsRateLimiter = new RateLimiter("getVideoTags", [
|
const tagsRateLimiter = new RateLimiter("getVideoTags", [
|
||||||
{
|
{
|
||||||
window: new SlidingWindow(redis, 1.2),
|
window: new SlidingWindow(redis, 1),
|
||||||
max: 1,
|
max: 3,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
window: new SlidingWindow(redis, 30),
|
window: new SlidingWindow(redis, 30),
|
||||||
max: 5,
|
max: 30,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
window: new SlidingWindow(redis, 5 * 60),
|
window: new SlidingWindow(redis, 2 * 60),
|
||||||
max: 70,
|
max: 50,
|
||||||
},
|
},
|
||||||
]);
|
]);
|
||||||
netScheduler.setProxyLimiter("tags-native", tagsRateLimiter);
|
netScheduler.setProxyLimiter("tags-native", tagsRateLimiter);
|
||||||
|
@ -1,19 +1,35 @@
|
|||||||
import { VideoTagsResponse } from "lib/net/bilibili.d.ts";
|
import { VideoTagsResponse } from "lib/net/bilibili.d.ts";
|
||||||
|
import netScheduler, {NetSchedulerError} from "lib/mq/scheduler.ts";
|
||||||
import logger from "lib/log/logger.ts";
|
import logger from "lib/log/logger.ts";
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Fetch the tags for a video
|
||||||
|
* @param {number} aid The video's aid
|
||||||
|
* @return {Promise<string[] | null>} A promise, which resolves to an array of tags,
|
||||||
|
* or null if an `fetch` error occurred
|
||||||
|
* @throws {NetSchedulerError} If the request failed.
|
||||||
|
*/
|
||||||
export async function getVideoTags(aid: number): Promise<string[] | null> {
|
export async function getVideoTags(aid: number): Promise<string[] | null> {
|
||||||
try {
|
try {
|
||||||
const url = `https://api.bilibili.com/x/tag/archive/tags?aid=${aid}`;
|
const url = `https://api.bilibili.com/x/tag/archive/tags?aid=${aid}`;
|
||||||
const res = await fetch(url);
|
const data = await netScheduler.request<VideoTagsResponse>(url, 'getVideoTags');
|
||||||
const data: VideoTagsResponse = await res.json();
|
|
||||||
if (data.code != 0) {
|
if (data.code != 0) {
|
||||||
logger.error(`Error fetching tags for video ${aid}: ${data.message}`, 'net', 'getVideoTags');
|
logger.error(`Error fetching tags for video ${aid}: ${data.message}`, 'net', 'getVideoTags');
|
||||||
return [];
|
return [];
|
||||||
}
|
}
|
||||||
return data.data.map((tag) => tag.tag_name);
|
return data.data.map((tag) => tag.tag_name);
|
||||||
}
|
}
|
||||||
catch {
|
catch (e) {
|
||||||
logger.error(`Error fetching tags for video ${aid}`, 'net', 'getVideoTags');
|
const error = e as NetSchedulerError;
|
||||||
return null;
|
if (error.errorCode == "FETCH_ERROR") {
|
||||||
|
const rawError = error.rawError! as Error;
|
||||||
|
rawError.message = `Error fetching tags for video ${aid}: ` + rawError.message;
|
||||||
|
logger.error(rawError, 'net', 'getVideoTags');
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
// Re-throw the error
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
33
test/db/videoTagIsNull.test.ts
Normal file
33
test/db/videoTagIsNull.test.ts
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
import { assertEquals } from "jsr:@std/assert";
|
||||||
|
import { videoTagsIsNull } from "lib/db/allData.ts";
|
||||||
|
import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts";
|
||||||
|
import { postgresConfig } from "lib/db/pgConfig.ts";
|
||||||
|
|
||||||
|
// A minimal aid which has an empty tags field in our database
|
||||||
|
const TEST_AID = 63569;
|
||||||
|
|
||||||
|
Deno.test("videoTagsIsNull function", async () => {
|
||||||
|
const client = new Client(postgresConfig);
|
||||||
|
|
||||||
|
try {
|
||||||
|
const transaction = client.createTransaction("test_transaction");
|
||||||
|
await transaction.begin();
|
||||||
|
|
||||||
|
const result1 = await videoTagsIsNull(transaction, TEST_AID);
|
||||||
|
assertEquals(typeof result1, "boolean", "The result should be a boolean value.");
|
||||||
|
assertEquals(result1, false, "The result should be false if tags is not NULL for the given aid.");
|
||||||
|
|
||||||
|
await transaction.queryArray`UPDATE all_data SET tags = NULL WHERE aid = ${TEST_AID}`;
|
||||||
|
|
||||||
|
const result2 = await videoTagsIsNull(transaction, TEST_AID);
|
||||||
|
assertEquals(typeof result2, "boolean", "The result should be a boolean value.");
|
||||||
|
assertEquals(result2, true, "The result should be true if tags is NULL for the given aid.");
|
||||||
|
|
||||||
|
await transaction.rollback();
|
||||||
|
} catch (error) {
|
||||||
|
console.error("Error during test:", error);
|
||||||
|
throw error;
|
||||||
|
} finally {
|
||||||
|
client.end();
|
||||||
|
}
|
||||||
|
});
|
@ -3,7 +3,7 @@ import { getLatestVideosWorker } from "lib/mq/executors.ts";
|
|||||||
import { redis } from "lib/db/redis.ts";
|
import { redis } from "lib/db/redis.ts";
|
||||||
import logger from "lib/log/logger.ts";
|
import logger from "lib/log/logger.ts";
|
||||||
|
|
||||||
const worker = new Worker(
|
const crawlerWorker = new Worker(
|
||||||
"cvsa",
|
"cvsa",
|
||||||
async (job: Job) => {
|
async (job: Job) => {
|
||||||
switch (job.name) {
|
switch (job.name) {
|
||||||
@ -14,13 +14,13 @@ const worker = new Worker(
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
{ connection: redis, concurrency: 4 },
|
{ connection: redis, concurrency: 10 },
|
||||||
);
|
);
|
||||||
|
|
||||||
worker.on("active", () => {
|
crawlerWorker.on("active", () => {
|
||||||
logger.log("Worker activated.", "mq");
|
logger.log("Worker activated.", "mq");
|
||||||
});
|
});
|
||||||
|
|
||||||
worker.on("error", (err) => {
|
crawlerWorker.on("error", (err) => {
|
||||||
logger.error(err);
|
logger.error(err);
|
||||||
});
|
});
|
||||||
|
Loading…
Reference in New Issue
Block a user