update: bulk fetch
This commit is contained in:
parent
d44ba8a0ae
commit
aea9e10d1a
@ -271,6 +271,13 @@ export async function setSnapshotStatus(client: Client, id: number, status: stri
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function bulkSetSnapshotStatus(client: Client, ids: number[], status: string) {
|
||||||
|
return await client.queryObject(
|
||||||
|
`UPDATE snapshot_schedule SET status = $2 WHERE id = ANY($1)`,
|
||||||
|
[ids, status],
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
export async function getVideosWithoutActiveSnapshotSchedule(client: Client) {
|
export async function getVideosWithoutActiveSnapshotSchedule(client: Client) {
|
||||||
const query: string = `
|
const query: string = `
|
||||||
SELECT s.aid
|
SELECT s.aid
|
||||||
|
@ -2,6 +2,7 @@ import { Job } from "bullmq";
|
|||||||
import { db } from "lib/db/init.ts";
|
import { db } from "lib/db/init.ts";
|
||||||
import { getLatestVideoSnapshot, getVideosNearMilestone } from "lib/db/snapshot.ts";
|
import { getLatestVideoSnapshot, getVideosNearMilestone } from "lib/db/snapshot.ts";
|
||||||
import {
|
import {
|
||||||
|
bulkSetSnapshotStatus,
|
||||||
findClosestSnapshot,
|
findClosestSnapshot,
|
||||||
findSnapshotBefore,
|
findSnapshotBefore,
|
||||||
getLatestSnapshot,
|
getLatestSnapshot,
|
||||||
@ -23,6 +24,7 @@ import { getBiliVideoStatus, setBiliVideoStatus } from "lib/db/allData.ts";
|
|||||||
import { truncate } from "lib/utils/truncate.ts";
|
import { truncate } from "lib/utils/truncate.ts";
|
||||||
import { lockManager } from "lib/mq/lockManager.ts";
|
import { lockManager } from "lib/mq/lockManager.ts";
|
||||||
import { getSongsPublihsedAt } from "lib/db/songs.ts";
|
import { getSongsPublihsedAt } from "lib/db/songs.ts";
|
||||||
|
import { bulkGetVideoStats } from "lib/net/bulkGetVideoStats.ts";
|
||||||
|
|
||||||
const priorityMap: { [key: string]: number } = {
|
const priorityMap: { [key: string]: number } = {
|
||||||
"milestone": 1,
|
"milestone": 1,
|
||||||
@ -187,6 +189,31 @@ export const regularSnapshotsWorker = async (_job: Job) => {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
export const takeBulkSnapshotForVideosWorker = async (job: Job) => {
|
||||||
|
const dataMap: {[key: number]: number} = job.data.map;
|
||||||
|
const ids = Object.keys(dataMap).map((id) => Number(id));
|
||||||
|
const aidsToFetch: number[] = [];
|
||||||
|
const client = await db.connect();
|
||||||
|
try {
|
||||||
|
for (const id of ids) {
|
||||||
|
const aid = Number(dataMap[id]);
|
||||||
|
const exists = await snapshotScheduleExists(client, id);
|
||||||
|
if (!exists) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
aidsToFetch.push(aid);
|
||||||
|
}
|
||||||
|
const data = await bulkGetVideoStats(aidsToFetch);
|
||||||
|
if (typeof data === "number") {
|
||||||
|
await bulkSetSnapshotStatus(client, ids, "failed");
|
||||||
|
return `GET_BILI_STATUS_${data}`;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
client.release();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export const takeSnapshotForVideoWorker = async (job: Job) => {
|
export const takeSnapshotForVideoWorker = async (job: Job) => {
|
||||||
const id = job.data.id;
|
const id = job.data.id;
|
||||||
const aid = Number(job.data.aid);
|
const aid = Number(job.data.aid);
|
||||||
|
@ -339,6 +339,12 @@ bili_test[1].max = 36;
|
|||||||
bili_test[2].max = 150;
|
bili_test[2].max = 150;
|
||||||
bili_test[3].max = 1000;
|
bili_test[3].max = 1000;
|
||||||
|
|
||||||
|
const bili_strict = biliLimiterConfig;
|
||||||
|
bili_strict[0].max = 4;
|
||||||
|
bili_strict[1].max = 8;
|
||||||
|
bili_strict[2].max = 30;
|
||||||
|
bili_strict[3].max = 100;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Execution order for setup:
|
Execution order for setup:
|
||||||
|
|
||||||
@ -378,11 +384,21 @@ netScheduler.addTask("snapshotVideo", "bili_test", [
|
|||||||
"alicloud-shenzhen",
|
"alicloud-shenzhen",
|
||||||
"alicloud-hohhot",
|
"alicloud-hohhot",
|
||||||
]);
|
]);
|
||||||
|
netScheduler.addTask("bulkSnapshot", "bili_strict", [
|
||||||
|
"alicloud-qingdao",
|
||||||
|
"alicloud-shanghai",
|
||||||
|
"alicloud-zhangjiakou",
|
||||||
|
"alicloud-chengdu",
|
||||||
|
"alicloud-shenzhen",
|
||||||
|
"alicloud-hohhot",
|
||||||
|
]);
|
||||||
netScheduler.setTaskLimiter("getVideoInfo", videoInfoRateLimiterConfig);
|
netScheduler.setTaskLimiter("getVideoInfo", videoInfoRateLimiterConfig);
|
||||||
netScheduler.setTaskLimiter("getLatestVideos", null);
|
netScheduler.setTaskLimiter("getLatestVideos", null);
|
||||||
netScheduler.setTaskLimiter("snapshotMilestoneVideo", null);
|
netScheduler.setTaskLimiter("snapshotMilestoneVideo", null);
|
||||||
netScheduler.setTaskLimiter("snapshotVideo", null);
|
netScheduler.setTaskLimiter("snapshotVideo", null);
|
||||||
|
netScheduler.setTaskLimiter("bulkSnapshot", null);
|
||||||
netScheduler.setProviderLimiter("bilibili", biliLimiterConfig);
|
netScheduler.setProviderLimiter("bilibili", biliLimiterConfig);
|
||||||
netScheduler.setProviderLimiter("bili_test", bili_test);
|
netScheduler.setProviderLimiter("bili_test", bili_test);
|
||||||
|
netScheduler.setProviderLimiter("bili_strict", bili_strict);
|
||||||
|
|
||||||
export default netScheduler;
|
export default netScheduler;
|
||||||
|
5
lib/net/bilibili.d.ts
vendored
5
lib/net/bilibili.d.ts
vendored
@ -11,10 +11,11 @@ export type VideoTagsResponse = BaseResponse<VideoTagsData>;
|
|||||||
export type VideoInfoResponse = BaseResponse<VideoInfoData>;
|
export type VideoInfoResponse = BaseResponse<VideoInfoData>;
|
||||||
export type MediaListInfoResponse = BaseResponse<MediaListInfoData>;
|
export type MediaListInfoResponse = BaseResponse<MediaListInfoData>;
|
||||||
|
|
||||||
type MediaListInfoData = MediaListInfoItem[];
|
export type MediaListInfoData = MediaListInfoItem[];
|
||||||
|
|
||||||
|
|
||||||
interface MediaListInfoItem {
|
export interface MediaListInfoItem {
|
||||||
|
attr: number;
|
||||||
bvid: string;
|
bvid: string;
|
||||||
id: number;
|
id: number;
|
||||||
cnt_info: {
|
cnt_info: {
|
||||||
|
27
lib/net/bulkGetVideoStats.ts
Normal file
27
lib/net/bulkGetVideoStats.ts
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
import netScheduler from "lib/mq/scheduler.ts";
|
||||||
|
import { MediaListInfoData, MediaListInfoResponse } from "lib/net/bilibili.d.ts";
|
||||||
|
import logger from "lib/log/logger.ts";
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Bulk fetch video metadata from bilibili API
|
||||||
|
* @param {number[]} aids - The aid list to fetch
|
||||||
|
* @returns {Promise<MediaListInfoData | number>} MediaListInfoData or the error code returned by bilibili API
|
||||||
|
* @throws {NetSchedulerError} - The error will be thrown in following cases:
|
||||||
|
* - No proxy is available currently: with error code `NO_PROXY_AVAILABLE`
|
||||||
|
* - The native `fetch` function threw an error: with error code `FETCH_ERROR`
|
||||||
|
* - The alicloud-fc threw an error: with error code `ALICLOUD_FC_ERROR`
|
||||||
|
*/
|
||||||
|
export async function bulkGetVideoStats(aids: number[]): Promise<MediaListInfoData | number> {
|
||||||
|
const baseURL = `https://api.bilibili.com/medialist/gateway/base/resource/infos?resources=`;
|
||||||
|
let url = baseURL;
|
||||||
|
for (const aid of aids) {
|
||||||
|
url += `${aid}:2,`;
|
||||||
|
}
|
||||||
|
const data = await netScheduler.request<MediaListInfoResponse>(url, "bulkSnapshot");
|
||||||
|
const errMessage = `Error fetching metadata for aid list: ${aids.join(",")}:`;
|
||||||
|
if (data.code !== 0) {
|
||||||
|
logger.error(errMessage + data.code + "-" + data.message, "net", "fn:getVideoInfo");
|
||||||
|
return data.code;
|
||||||
|
}
|
||||||
|
return data.data;
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user