From a9582722f45f686085eddf9a0695457c63448b1a Mon Sep 17 00:00:00 2001 From: alikia2x Date: Sun, 13 Apr 2025 05:06:52 +0800 Subject: [PATCH] ref: wrap some worker functions with withDbConnection --- .zed/settings.json | 58 ++++++++++----------- packages/crawler/mq/exec/collectSongs.ts | 13 ++--- packages/crawler/mq/exec/executors.ts | 1 + packages/crawler/mq/exec/getLatestVideos.ts | 16 +++--- packages/crawler/mq/exec/getVideoInfo.ts | 20 ++++--- packages/crawler/src/worker.ts | 2 +- 6 files changed, 52 insertions(+), 58 deletions(-) diff --git a/.zed/settings.json b/.zed/settings.json index a58d028..97f9ab8 100644 --- a/.zed/settings.json +++ b/.zed/settings.json @@ -3,33 +3,33 @@ // For a full list of overridable settings, and general information on folder-specific settings, // see the documentation: https://zed.dev/docs/configuring-zed#settings-files { - "lsp": { - "deno": { - "settings": { - "deno": { - "enable": true - } - } - } - }, - "languages": { - "TypeScript": { - "language_servers": [ - "deno", - "!typescript-language-server", - "!vtsls", - "!eslint" - ], - "formatter": "language_server" - }, - "TSX": { - "language_servers": [ - "deno", - "!typescript-language-server", - "!vtsls", - "!eslint" - ], - "formatter": "language_server" - } - } + "lsp": { + "deno": { + "settings": { + "deno": { + "enable": true + } + } + } + }, + "languages": { + "TypeScript": { + "language_servers": [ + "deno", + "!typescript-language-server", + "!vtsls", + "!eslint" + ], + "formatter": "language_server" + }, + "TSX": { + "language_servers": [ + "deno", + "!typescript-language-server", + "!vtsls", + "!eslint" + ], + "formatter": "language_server" + } + } } diff --git a/packages/crawler/mq/exec/collectSongs.ts b/packages/crawler/mq/exec/collectSongs.ts index dc059fc..1e4db3c 100644 --- a/packages/crawler/mq/exec/collectSongs.ts +++ b/packages/crawler/mq/exec/collectSongs.ts @@ -1,12 +1,9 @@ import { Job } from "npm:bullmq@5.45.2"; -import { db } from "db/init.ts"; import { collectSongs } from "mq/task/collectSongs.ts"; +import { withDbConnection } from "db/withConnection.ts"; +import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; -export const collectSongsWorker = async (_job: Job): Promise => { - const client = await db.connect(); - try { +export const collectSongsWorker = (_job: Job): Promise => + withDbConnection(async (client: Client) => { await collectSongs(client); - } finally { - client.release(); - } -}; + }); diff --git a/packages/crawler/mq/exec/executors.ts b/packages/crawler/mq/exec/executors.ts index f3fe95b..cddb5e4 100644 --- a/packages/crawler/mq/exec/executors.ts +++ b/packages/crawler/mq/exec/executors.ts @@ -1,3 +1,4 @@ export * from "mq/exec/getLatestVideos.ts"; export * from "./getVideoInfo.ts"; export * from "./collectSongs.ts"; +export * from "./takeBulkSnapshot.ts"; \ No newline at end of file diff --git a/packages/crawler/mq/exec/getLatestVideos.ts b/packages/crawler/mq/exec/getLatestVideos.ts index 4112ecd..db04b97 100644 --- a/packages/crawler/mq/exec/getLatestVideos.ts +++ b/packages/crawler/mq/exec/getLatestVideos.ts @@ -1,12 +1,10 @@ import { Job } from "bullmq"; import { queueLatestVideos } from "mq/task/queueLatestVideo.ts"; -import { db } from "db/init.ts"; +import { withDbConnection } from "db/withConnection.ts"; +import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; -export const getLatestVideosWorker = async (_job: Job): Promise => { - const client = await db.connect(); - try { - await queueLatestVideos(client); - } finally { - client.release(); - } -}; + +export const getLatestVideosWorker = (_job: Job): Promise => + withDbConnection(async (client: Client) => { + await queueLatestVideos(client) + }); diff --git a/packages/crawler/mq/exec/getVideoInfo.ts b/packages/crawler/mq/exec/getVideoInfo.ts index 3bdf038..5397faf 100644 --- a/packages/crawler/mq/exec/getVideoInfo.ts +++ b/packages/crawler/mq/exec/getVideoInfo.ts @@ -1,17 +1,15 @@ import { Job } from "npm:bullmq@5.45.2"; -import { db } from "db/init.ts"; import { insertVideoInfo } from "mq/task/getVideoDetails.ts"; +import { withDbConnection } from "db/withConnection.ts"; +import { Client } from "https://deno.land/x/postgres@v0.19.3/mod.ts"; +import logger from "log/logger.ts"; -export const getVideoInfoWorker = async (job: Job): Promise => { - const client = await db.connect(); - try { +export const getVideoInfoWorker = async (job: Job): Promise => + await withDbConnection(async (client: Client) => { const aid = job.data.aid; if (!aid) { - return 3; + logger.warn("aid does not exists", "mq", "job:getVideoInfo"); + return; } - await insertVideoInfo(client, aid); - return 0; - } finally { - client.release(); - } -}; + await insertVideoInfo(client, aid) + }); diff --git a/packages/crawler/src/worker.ts b/packages/crawler/src/worker.ts index 0c0a96e..9b92913 100644 --- a/packages/crawler/src/worker.ts +++ b/packages/crawler/src/worker.ts @@ -12,7 +12,7 @@ import { snapshotTickWorker, takeSnapshotForVideoWorker, } from "mq/exec/snapshotTick.ts"; -import {takeBulkSnapshotForVideosWorker} from "../mq/exec/takeBulkSnapshot.ts"; +import {takeBulkSnapshotForVideosWorker} from "mq/exec/executors.ts"; Deno.addSignalListener("SIGINT", async () => { logger.log("SIGINT Received: Shutting down workers...", "mq");