From bdc10f17a1a128b2bbb4806a7e56eab6fde04ed9 Mon Sep 17 00:00:00 2001 From: alikia2x Date: Fri, 9 Jan 2026 15:37:51 +0800 Subject: [PATCH] fix: several problems causing Redis window count lossing sync with ground truth --- packages/core/drizzle/main/schema.ts | 171 +++++++++------------- packages/crawler/db/snapshotSchedule.ts | 186 ++++++++++++++++-------- 2 files changed, 194 insertions(+), 163 deletions(-) diff --git a/packages/core/drizzle/main/schema.ts b/packages/core/drizzle/main/schema.ts index 8da0a25..81d819b 100644 --- a/packages/core/drizzle/main/schema.ts +++ b/packages/core/drizzle/main/schema.ts @@ -1,4 +1,4 @@ -import { pgTable, pgSchema, uniqueIndex, check, integer, text, timestamp, foreignKey, serial, bigint, jsonb, index, inet, varchar, smallint, real, boolean, bigserial, unique, pgSequence } from "drizzle-orm/pg-core" +import { pgTable, pgSchema, uniqueIndex, integer, text, timestamp, foreignKey, serial, bigint, jsonb, index, inet, varchar, smallint, real, boolean, unique, doublePrecision, vector, bigserial, pgView, numeric, pgSequence } from "drizzle-orm/pg-core" import { sql } from "drizzle-orm" export const credentials = pgSchema("credentials"); @@ -26,12 +26,6 @@ export const usersInCredentials = credentials.table("users", { uniqueIndex("users_pkey").using("btree", table.id.asc().nullsLast().op("int4_ops")), uniqueIndex("users_pkey1").using("btree", table.id.asc().nullsLast().op("int4_ops")), uniqueIndex("users_username_key").using("btree", table.username.asc().nullsLast().op("text_ops")), - check("users_id_not_null", sql`NOT NULL id`), - check("users_username_not_null", sql`NOT NULL username`), - check("users_password_not_null", sql`NOT NULL password`), - check("users_unq_id_not_null", sql`NOT NULL unq_id`), - check("users_role_not_null", sql`NOT NULL role`), - check("users_created_at_not_null", sql`NOT NULL created_at`), ]); export const history = pgTable("history", { @@ -48,11 +42,6 @@ export const history = pgTable("history", { foreignColumns: [usersInCredentials.unqId], name: "rel_history_changed_by" }), - check("history_id_not_null", sql`NOT NULL id`), - check("history_object_id_not_null", sql`NOT NULL object_id`), - check("history_change_type_not_null", sql`NOT NULL change_type`), - check("history_changed_at_not_null", sql`NOT NULL changed_at`), - check("history_changed_by_not_null", sql`NOT NULL changed_by`), ]); export const loginSessionsInCredentials = credentials.table("login_sessions", { @@ -67,9 +56,23 @@ export const loginSessionsInCredentials = credentials.table("login_sessions", { }, (table) => [ index("inx_login-sessions_uid").using("btree", table.uid.asc().nullsLast().op("int4_ops")), uniqueIndex("login_sessions_pkey").using("btree", table.id.asc().nullsLast().op("text_ops")), - check("login_sessions_id_not_null", sql`NOT NULL id`), - check("login_sessions_uid_not_null", sql`NOT NULL uid`), - check("login_sessions_created_at_not_null", sql`NOT NULL created_at`), +]); + +export const videoSnapshot = pgTable("video_snapshot", { + id: integer().default(sql`nextval('video_snapshot_id_seq'::regclass)`).notNull(), + createdAt: timestamp("created_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(), + views: integer().notNull(), + coins: integer(), + likes: integer(), + favorites: integer(), + shares: integer(), + danmakus: integer(), + // You can use { mode: "bigint" } if numbers are exceeding js number limitations + aid: bigint({ mode: "number" }).notNull(), + replies: integer(), +}, (table) => [ + index("video_snapshot_new_aid_created_at_idx").using("btree", table.aid.asc().nullsLast().op("int8_ops"), table.createdAt.asc().nullsLast().op("int8_ops")), + index("video_snapshot_new_created_at_idx").using("btree", table.createdAt.asc().nullsLast().op("timestamptz_ops")), ]); export const bilibiliMetadata = pgTable("bilibili_metadata", { @@ -94,9 +97,6 @@ export const bilibiliMetadata = pgTable("bilibili_metadata", { index("idx_all-data_uid").using("btree", table.uid.asc().nullsLast().op("int8_ops")), index("idx_bili-meta_status").using("btree", table.status.asc().nullsLast().op("int4_ops")), uniqueIndex("unq_all-data_aid").using("btree", table.aid.asc().nullsLast().op("int8_ops")), - check("bilibili_metadata_id_not_null", sql`NOT NULL id`), - check("bilibili_metadata_aid_not_null", sql`NOT NULL aid`), - check("bilibili_metadata_status_not_null", sql`NOT NULL status`), ]); export const humanClassifiedLables = pgTable("human_classified_lables", { @@ -111,14 +111,9 @@ export const humanClassifiedLables = pgTable("human_classified_lables", { index("idx_classified-labels-human_author").using("btree", table.uid.asc().nullsLast().op("int4_ops")), index("idx_classified-labels-human_created-at").using("btree", table.createdAt.asc().nullsLast().op("timestamptz_ops")), index("idx_classified-labels-human_label").using("btree", table.label.asc().nullsLast().op("int2_ops")), - check("human_classified_lables_id_not_null", sql`NOT NULL id`), - check("human_classified_lables_aid_not_null", sql`NOT NULL aid`), - check("human_classified_lables_uid_not_null", sql`NOT NULL uid`), - check("human_classified_lables_label_not_null", sql`NOT NULL label`), - check("human_classified_lables_created_at_not_null", sql`NOT NULL created_at`), ]); -export const videoSnapshot = pgTable("video_snapshot", { +export const videoSnapshotBackup = pgTable("video_snapshot_backup", { id: integer().default(sql`nextval('video_snapshot_id_seq'::regclass)`).notNull(), createdAt: timestamp("created_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(), views: integer().notNull(), @@ -133,19 +128,12 @@ export const videoSnapshot = pgTable("video_snapshot", { }, (table) => [ index("idx_vid_snapshot_aid_created_at").using("btree", table.aid.asc().nullsLast().op("int8_ops"), table.createdAt.asc().nullsLast().op("int8_ops")), index("idx_vid_snapshot_time").using("btree", table.createdAt.asc().nullsLast().op("timestamptz_ops")), - check("video_snapshot_id_not_null", sql`NOT NULL id`), - check("video_snapshot_created_at_not_null", sql`NOT NULL created_at`), - check("video_snapshot_views_not_null", sql`NOT NULL views`), - check("video_snapshot_aid_not_null", sql`NOT NULL aid`), ]); export const producer = pgTable("producer", { id: integer().primaryKey().notNull(), name: text().notNull(), -}, (table) => [ - check("producer_id_not_null", sql`NOT NULL id`), - check("producer_name_not_null", sql`NOT NULL name`), -]); +}); export const labellingResult = pgTable("labelling_result", { id: integer().default(sql`nextval('labeling_result_id_seq'::regclass)`).notNull(), @@ -161,11 +149,6 @@ export const labellingResult = pgTable("labelling_result", { index("idx_labelling_aid-label").using("btree", table.aid.asc().nullsLast().op("int2_ops"), table.label.asc().nullsLast().op("int2_ops")), uniqueIndex("labeling_result_pkey").using("btree", table.id.asc().nullsLast().op("int4_ops")), uniqueIndex("unq_labelling-result_aid_model-version").using("btree", table.aid.asc().nullsLast().op("int8_ops"), table.modelVersion.asc().nullsLast().op("int8_ops")), - check("labelling_result_id_not_null", sql`NOT NULL id`), - check("labelling_result_aid_not_null", sql`NOT NULL aid`), - check("labelling_result_label_not_null", sql`NOT NULL label`), - check("labelling_result_model_version_not_null", sql`NOT NULL model_version`), - check("labelling_result_created_at_not_null", sql`NOT NULL created_at`), ]); export const latestVideoSnapshot = pgTable("latest_video_snapshot", { @@ -182,15 +165,6 @@ export const latestVideoSnapshot = pgTable("latest_video_snapshot", { }, (table) => [ index("idx_latest-video-snapshot_time").using("btree", table.time.asc().nullsLast().op("timestamptz_ops")), index("idx_latest-video-snapshot_views").using("btree", table.views.asc().nullsLast().op("int4_ops")), - check("latest_video_snapshot_aid_not_null", sql`NOT NULL aid`), - check("latest_video_snapshot_time_not_null", sql`NOT NULL "time"`), - check("latest_video_snapshot_views_not_null", sql`NOT NULL views`), - check("latest_video_snapshot_coins_not_null", sql`NOT NULL coins`), - check("latest_video_snapshot_likes_not_null", sql`NOT NULL likes`), - check("latest_video_snapshot_favorites_not_null", sql`NOT NULL favorites`), - check("latest_video_snapshot_replies_not_null", sql`NOT NULL replies`), - check("latest_video_snapshot_danmakus_not_null", sql`NOT NULL danmakus`), - check("latest_video_snapshot_shares_not_null", sql`NOT NULL shares`), ]); export const relationsProducer = pgTable("relations_producer", { @@ -206,11 +180,6 @@ export const relationsProducer = pgTable("relations_producer", { foreignColumns: [songs.id], name: "fkey_relations_producer_songs_id" }), - check("relations_producer_id_not_null", sql`NOT NULL id`), - check("relations_producer_song_id_not_null", sql`NOT NULL song_id`), - check("relations_producer_created_at_not_null", sql`NOT NULL created_at`), - check("relations_producer_producer_id_not_null", sql`NOT NULL producer_id`), - check("relations_producer_updated_at_not_null", sql`NOT NULL updated_at`), ]); export const eta = pgTable("eta", { @@ -222,20 +191,12 @@ export const eta = pgTable("eta", { updatedAt: timestamp("updated_at", { withTimezone: true, mode: 'string' }).defaultNow().notNull(), }, (table) => [ index("idx_eta_eta_current_views").using("btree", table.eta.asc().nullsLast().op("int4_ops"), table.currentViews.asc().nullsLast().op("int4_ops")), - check("eta_aid_not_null", sql`NOT NULL aid`), - check("eta_eta_not_null", sql`NOT NULL eta`), - check("eta_speed_not_null", sql`NOT NULL speed`), - check("eta_current_views_not_null", sql`NOT NULL current_views`), - check("eta_updated_at_not_null", sql`NOT NULL updated_at`), ]); export const singer = pgTable("singer", { id: serial().primaryKey().notNull(), name: text().notNull(), -}, (table) => [ - check("singer_id_not_null", sql`NOT NULL id`), - check("singer_name_not_null", sql`NOT NULL name`), -]); +}); export const songs = pgTable("songs", { id: integer().default(sql`nextval('songs_id_seq'::regclass)`).notNull(), @@ -260,29 +221,6 @@ export const songs = pgTable("songs", { index("idx_type").using("btree", table.type.asc().nullsLast().op("int2_ops")), uniqueIndex("unq_songs_aid").using("btree", table.aid.asc().nullsLast().op("int8_ops")), uniqueIndex("unq_songs_netease_id").using("btree", table.neteaseId.asc().nullsLast().op("int8_ops")), - check("songs_id_not_null", sql`NOT NULL id`), - check("songs_created_at_not_null", sql`NOT NULL created_at`), - check("songs_updated_at_not_null", sql`NOT NULL updated_at`), - check("songs_deleted_not_null", sql`NOT NULL deleted`), -]); - -export const snapshotSchedule = pgTable("snapshot_schedule", { - id: bigserial({ mode: "bigint" }).notNull(), - // You can use { mode: "bigint" } if numbers are exceeding js number limitations - aid: bigint({ mode: "number" }).notNull(), - type: text(), - createdAt: timestamp("created_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(), - startedAt: timestamp("started_at", { withTimezone: true, mode: 'string' }), - finishedAt: timestamp("finished_at", { withTimezone: true, mode: 'string' }), - status: text().default('pending').notNull(), -}, (table) => [ - index("idx_snapshot_schedule_aid_status_type").using("btree", table.aid.asc().nullsLast().op("int8_ops"), table.status.asc().nullsLast().op("text_ops"), table.type.asc().nullsLast().op("text_ops")), - index("idx_snapshot_schedule_status_started_at").using("btree", table.status.asc().nullsLast().op("timestamptz_ops"), table.startedAt.asc().nullsLast().op("text_ops")), - uniqueIndex("snapshot_schedule_pkey").using("btree", table.id.asc().nullsLast().op("int8_ops")), - check("snapshot_schedule_id_not_null", sql`NOT NULL id`), - check("snapshot_schedule_aid_not_null", sql`NOT NULL aid`), - check("snapshot_schedule_created_at_not_null", sql`NOT NULL created_at`), - check("snapshot_schedule_status_not_null", sql`NOT NULL status`), ]); export const relationSinger = pgTable("relation_singer", { @@ -303,11 +241,6 @@ export const relationSinger = pgTable("relation_singer", { foreignColumns: [songs.id], name: "fkey_song_id" }), - check("relation_singer_id_not_null", sql`NOT NULL id`), - check("relation_singer_song_id_not_null", sql`NOT NULL song_id`), - check("relation_singer_singer_id_not_null", sql`NOT NULL singer_id`), - check("relation_singer_created_at_not_null", sql`NOT NULL created_at`), - check("relation_singer_updated_at_not_null", sql`NOT NULL updated_at`), ]); export const bilibiliUser = pgTable("bilibili_user", { @@ -323,29 +256,67 @@ export const bilibiliUser = pgTable("bilibili_user", { }, (table) => [ index("idx_bili-user_uid").using("btree", table.uid.asc().nullsLast().op("int8_ops")), unique("unq_bili-user_uid").on(table.uid), - check("bilibili_user_id_not_null", sql`NOT NULL id`), - check("bilibili_user_uid_not_null", sql`NOT NULL uid`), - check("bilibili_user_username_not_null", sql`NOT NULL username`), - check("bilibili_user_desc_not_null", sql`NOT NULL "desc"`), - check("bilibili_user_fans_not_null", sql`NOT NULL fans`), - check("bilibili_user_created_at_not_null", sql`NOT NULL created_at`), - check("bilibili_user_updated_at_not_null", sql`NOT NULL updated_at`), ]); +export const etaInInternal = internal.table("eta", { + // You can use { mode: "bigint" } if numbers are exceeding js number limitations + aid: bigint({ mode: "number" }).primaryKey().notNull(), + increment15M: doublePrecision("increment_15m"), + increment2H: doublePrecision("increment_2h"), + increment5H: doublePrecision("increment_5h"), + increment12H: doublePrecision("increment_12h"), + increment1D: doublePrecision("increment_1d"), + increment3D: doublePrecision("increment_3d"), + increment7D: doublePrecision("increment_7d"), + increment14D: doublePrecision("increment_14d"), + increment30D: doublePrecision("increment_30d"), + targetMilestone: integer("target_milestone"), + etaHours: doublePrecision("eta_hours"), + nextSnapshot: timestamp("next_snapshot", { withTimezone: true, mode: 'string' }), + updatedAt: timestamp("updated_at", { withTimezone: true, mode: 'string' }), +}); + export const videoTypeLabelInInternal = internal.table("video_type_label", { id: serial().primaryKey().notNull(), // You can use { mode: "bigint" } if numbers are exceeding js number limitations aid: bigint({ mode: "number" }).notNull(), label: boolean().notNull(), - user: text().default('i3wW8JdZ9sT3ASkk').notNull(), + user: text().notNull(), + createdAt: timestamp("created_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(), }, (table) => [ foreignKey({ columns: [table.user], foreignColumns: [usersInCredentials.unqId], name: "fkey_video_type_label_user" }), - check("video_type_label_id_not_null", sql`NOT NULL id`), - check("video_type_label_aid_not_null", sql`NOT NULL aid`), - check("video_type_label_label_not_null", sql`NOT NULL label`), - check("video_type_label_user_not_null", sql`NOT NULL "user"`), ]); + +export const embeddingsInInternal = internal.table("embeddings", { + id: serial().primaryKey().notNull(), + modelName: text("model_name").notNull(), + dataChecksum: text("data_checksum").notNull(), + vec2048: vector("vec_2048", { dimensions: 2048 }), + vec1536: vector("vec_1536", { dimensions: 1536 }), + vec1024: vector("vec_1024", { dimensions: 1024 }), + createdAt: timestamp("created_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`), + dimensions: smallint().notNull(), +}, (table) => [ + unique("embeddings_unique_model_dimensions_checksum").on(table.modelName, table.dataChecksum, table.dimensions), +]); + +export const snapshotSchedule = pgTable("snapshot_schedule", { + id: bigserial({ mode: "number" }).notNull(), + // You can use { mode: "bigint" } if numbers are exceeding js number limitations + aid: bigint({ mode: "number" }).notNull(), + type: text(), + createdAt: timestamp("created_at", { withTimezone: true, mode: 'string' }).default(sql`CURRENT_TIMESTAMP`).notNull(), + startedAt: timestamp("started_at", { withTimezone: true, mode: 'string' }), + finishedAt: timestamp("finished_at", { withTimezone: true, mode: 'string' }), + status: text().default('pending').notNull(), + startedAt5MinUtc: timestamp("started_at_5min_utc", { mode: 'string' }).generatedAlwaysAs(sql`(date_trunc('hour'::text, (started_at AT TIME ZONE 'UTC'::text)) + ((((EXTRACT(minute FROM (started_at AT TIME ZONE 'UTC'::text)))::integer / 5))::double precision * '00:05:00'::interval))`), +}, (table) => [ + index("idx_snapshot_schedule_aid_status_type").using("btree", table.aid.asc().nullsLast().op("int8_ops"), table.status.asc().nullsLast().op("text_ops"), table.type.asc().nullsLast().op("int8_ops")), + index("idx_snapshot_schedule_pending_5min").using("btree", table.status.asc().nullsLast().op("text_ops"), table.startedAt5MinUtc.asc().nullsLast().op("timestamp_ops")).where(sql`(status = 'pending'::text)`), + index("idx_snapshot_schedule_status_started_at").using("btree", table.status.asc().nullsLast().op("timestamptz_ops"), table.startedAt.asc().nullsLast().op("text_ops")), + uniqueIndex("snapshot_schedule_pkey").using("btree", table.id.asc().nullsLast().op("int8_ops")), +]); \ No newline at end of file diff --git a/packages/crawler/db/snapshotSchedule.ts b/packages/crawler/db/snapshotSchedule.ts index 075983b..17ef71a 100644 --- a/packages/crawler/db/snapshotSchedule.ts +++ b/packages/crawler/db/snapshotSchedule.ts @@ -1,44 +1,47 @@ import type { Psql } from "@core/db/psql.d"; import { redis } from "@core/db/redis"; import type { SnapshotScheduleType } from "@core/db/schema.d"; +import { db, snapshotSchedule } from "@core/drizzle"; import { MINUTE } from "@core/lib"; import logger from "@core/log"; +import dayjs from "dayjs"; +import { eq, inArray } from "drizzle-orm"; import type { Redis } from "ioredis"; import { parseTimestampFromPsql } from "../utils/formatTimestampToPostgre"; const REDIS_KEY = "cvsa:snapshot_window_counts"; -function getCurrentWindowIndex(): number { - const now = new Date(); - const minutesSinceMidnight = now.getHours() * 60 + now.getMinutes(); - return Math.floor(minutesSinceMidnight / 5); +const WINDOW_SIZE = 5 * MINUTE; + +function getWindowFromDate(date: Date) { + const roundedMs = Math.floor(date.getTime() / WINDOW_SIZE) * WINDOW_SIZE; + return new Date(roundedMs); } export async function refreshSnapshotWindowCounts(sql: Psql, redisClient: Redis) { - const now = new Date(); - const startTime = now.getTime(); - const result = await sql<{ window_start: Date; count: number }[]>` SELECT - date_trunc('hour', started_at) + - (EXTRACT(minute FROM started_at)::int / 5 * INTERVAL '5 minutes') AS window_start, - COUNT(*) AS count - FROM snapshot_schedule - WHERE started_at >= NOW() AND status = 'pending' AND started_at <= NOW() + INTERVAL '14 days' - GROUP BY 1 - ORDER BY window_start + started_at_5min_utc AT TIME ZONE 'UTC' AS window_start, + count + FROM ( + SELECT + started_at_5min_utc, + COUNT(*) AS count + FROM snapshot_schedule + WHERE + status = 'pending' + AND started_at_5min_utc >= date_trunc('hour', now() AT TIME ZONE 'UTC') + AND started_at_5min_utc < date_trunc('hour', now() AT TIME ZONE 'UTC') + + interval '14 days' + GROUP BY started_at_5min_utc + ) t + ORDER BY started_at_5min_utc `; await redisClient.del(REDIS_KEY); - const currentWindow = getCurrentWindowIndex(); - for (const row of result) { - const targetOffset = Math.floor((row.window_start.getTime() - startTime) / (5 * MINUTE)); - const offset = currentWindow + targetOffset; - if (offset >= 0) { - await redisClient.hset(REDIS_KEY, offset.toString(), Number(row.count)); - } + await redisClient.hset(REDIS_KEY, row.window_start.toISOString(), Number(row.count)); } } @@ -49,11 +52,19 @@ export async function initSnapshotWindowCounts(sql: Psql, redisClient: Redis) { }, 5 * MINUTE); } -async function getWindowCount(redisClient: Redis, offset: number): Promise { - const count = await redisClient.hget(REDIS_KEY, offset.toString()); +async function getWindowCount(redisClient: Redis, window: Date): Promise { + const count = await redisClient.hget(REDIS_KEY, window.toISOString()); return count ? parseInt(count, 10) : 0; } +async function incrWindowCount(redisClient: Redis, window: Date) { + return redisClient.hincrby(REDIS_KEY, window.toISOString(), 1); +} + +async function decrWindowCount(redisClient: Redis, window: Date) { + return redisClient.hincrby(REDIS_KEY, window.toISOString(), -1); +} + export async function snapshotScheduleExists(sql: Psql, id: number) { const rows = await sql<{ id: number }[]>` SELECT id @@ -224,7 +235,7 @@ export async function scheduleSnapshot( ) { let adjustedTime = new Date(targetTime); const hasActiveSchedule = await videoHasActiveScheduleWithType(sql, aid, type); - if (type == "milestone" && hasActiveSchedule) { + if (type === "milestone" && hasActiveSchedule) { const latestActiveSchedule = await getLatestActiveScheduleWithType(sql, aid, type); if (!latestActiveSchedule) { return; @@ -233,11 +244,21 @@ export async function scheduleSnapshot( parseTimestampFromPsql(latestActiveSchedule.started_at) ); if (latestScheduleStartedAt > adjustedTime) { - await sql` - UPDATE snapshot_schedule - SET started_at = ${adjustedTime.toISOString()} - WHERE id = ${latestActiveSchedule.id} - `; + await db.transaction(async (tx) => { + const old = await tx.select().from(snapshotSchedule).where( + eq(snapshotSchedule.id, latestActiveSchedule.id) + ).for("update"); + if (old.length === 0) return; + const oldSchedule = old[0]; + await tx.update(snapshotSchedule).set({ startedAt: adjustedTime.toISOString() }).where( + eq(snapshotSchedule.id, latestActiveSchedule.id) + ); + if (oldSchedule.status !== "pending") return; + const oldWindow = getWindowFromDate(new Date(oldSchedule.startedAt)); + await decrWindowCount(redis, oldWindow); + const window = getWindowFromDate(adjustedTime); + await incrWindowCount(redis, window); + }); logger.log( `Updated snapshot schedule for ${aid} at ${adjustedTime.toISOString()}`, "mq", @@ -250,6 +271,10 @@ export async function scheduleSnapshot( if (type !== "milestone" && type !== "new" && adjustTime) { adjustedTime = await adjustSnapshotTime(new Date(targetTime), 3000, redis); } + else { + const window = getWindowFromDate(adjustedTime); + await incrWindowCount(redis, window); + } logger.log( `Scheduled ${type} snapshot for ${aid} at ${adjustedTime.toISOString()}`, "mq", @@ -284,32 +309,38 @@ export async function adjustSnapshotTime( allowedCounts: number = 1000, redisClient: Redis ): Promise { - const currentWindow = getCurrentWindowIndex(); - const targetOffset = Math.floor((expectedStartTime.getTime() - Date.now()) / (5 * MINUTE)) - 6; - - const initialOffset = currentWindow + Math.max(targetOffset, 0); + const initialWindow = dayjs(getWindowFromDate(expectedStartTime)); const MAX_ITERATIONS = 4032; // 10 days - for (let i = initialOffset; i < MAX_ITERATIONS; i++) { - const offset = i; - const count = await getWindowCount(redisClient, offset); + for (let i = 0; i < MAX_ITERATIONS; i++) { + const window = initialWindow.add(i * WINDOW_SIZE, "milliseconds"); + const newCount = await incrWindowCount(redisClient, window.toDate()); - if (count < allowedCounts) { - await redisClient.hincrby(REDIS_KEY, offset.toString(), 1); - - const startPoint = new Date(); - startPoint.setHours(0, 0, 0, 0); - const startTime = startPoint.getTime(); - const windowStart = startTime + offset * 5 * MINUTE; - const randomDelay = Math.floor(Math.random() * 5 * MINUTE); - const delayedDate = new Date(windowStart + randomDelay); - const now = new Date(); - - if (delayedDate.getTime() < now.getTime()) { - return now; - } - return delayedDate; + if (newCount > allowedCounts) { + await decrWindowCount(redisClient, window.toDate()); + continue; } + + const randomDelay = Math.random() * WINDOW_SIZE; + + const randomizedExecutionTime = window.add(randomDelay, "milliseconds"); + const delayedDate = randomizedExecutionTime.toDate(); + const now = new Date(); + + if (delayedDate.getTime() < now.getTime()) { + return now; + } + return delayedDate; + } + + for (let i = 0; i < 6; i++) { + const window = initialWindow.subtract(i * WINDOW_SIZE, "milliseconds"); + + const newCount = await incrWindowCount(redisClient, window.toDate()); + if (newCount <= allowedCounts) { + return window.toDate(); + } + await decrWindowCount(redisClient, window.toDate()); } return expectedStartTime; } @@ -343,20 +374,49 @@ export async function getBulkSnapshotsInNextSecond(sql: Psql) { `; } -export async function setSnapshotStatus(sql: Psql, id: number, status: string) { - return sql` - UPDATE snapshot_schedule - SET status = ${status} - WHERE id = ${id} - `; +export async function setSnapshotStatus(_sql: Psql, id: number, status: string) { + return await db.transaction(async (tx) => { + const snapshots = await tx + .select() + .from(snapshotSchedule) + .where(eq(snapshotSchedule.id, id)) + .for("update"); + + if (snapshots.length === 0) return; + + const snapshot = snapshots[0]; + + const removeFromPending = snapshot.status === "pending" && status !== "pending"; + + if (removeFromPending) { + const window = getWindowFromDate(new Date(snapshot.startedAt)); + await decrWindowCount(redis, window); + } + + return await tx.update(snapshotSchedule).set({ status }).where(eq(snapshotSchedule.id, id)); + }); } -export async function bulkSetSnapshotStatus(sql: Psql, ids: number[], status: string) { - return sql` - UPDATE snapshot_schedule - SET status = ${status} - WHERE id = ANY (${ids}) - `; +export async function bulkSetSnapshotStatus(_sql: Psql, ids: number[], status: string) { + return await db.transaction(async (tx) => { + const snapshots = await tx + .select() + .from(snapshotSchedule) + .where(inArray(snapshotSchedule.id, ids)) + .for("update"); + + if (snapshots.length === 0) return; + + for (const snapshot of snapshots) { + const removeFromPending = snapshot.status === "pending" && status !== "pending"; + if (removeFromPending) { + const window = getWindowFromDate(new Date(snapshot.startedAt)); + await decrWindowCount(redis, window); + } + } + + return await tx.update(snapshotSchedule).set({ status }).where(inArray(snapshotSchedule.id, ids)); + }); } export async function getVideosWithoutActiveSnapshotScheduleByType(sql: Psql, type: string) {