From 094f8a09256792a05c2f70d2917b944c908b0210 Mon Sep 17 00:00:00 2001 From: alikia2x Date: Mon, 6 Jan 2025 23:14:42 +0800 Subject: [PATCH] add: scheduler --- src/electron/backend/encoding.ts | 27 ++-- src/electron/backend/scheduler.ts | 235 ++++++++++++++++++++++++++++ src/electron/backend/screenshot.ts | 38 +++-- src/electron/index.ts | 23 +-- src/electron/utils/backend/index.ts | 6 + src/electron/utils/index.ts | 3 +- 6 files changed, 289 insertions(+), 43 deletions(-) create mode 100644 src/electron/backend/scheduler.ts create mode 100644 src/electron/utils/backend/index.ts diff --git a/src/electron/backend/encoding.ts b/src/electron/backend/encoding.ts index 8bb5f30..05cbedb 100644 --- a/src/electron/backend/encoding.ts +++ b/src/electron/backend/encoding.ts @@ -1,4 +1,3 @@ -import { Database } from "better-sqlite3"; import { exec } from "child_process"; import fs from "fs"; import path, { join } from "path"; @@ -8,13 +7,15 @@ import { getEncodeCommand } from "../utils/index.js"; import { getRecordingsDir, getEncodingTempDir, getScreenshotsDir } from "../utils/index.js"; import cache from "memory-cache"; import { ENCODING_FRAME_INTERVAL, RECORD_FRAME_RATE as FRAME_RATE } from "./consts.js"; +import { getDatabase } from "../utils/index.js"; const THREE_MINUTES = 180; const MIN_FRAMES_TO_ENCODE = THREE_MINUTES * FRAME_RATE; const CONCURRENCY = 1; // Detect and insert encoding tasks -export function checkFramesForEncoding(db: Database) { +export function checkFramesForEncoding() { + const db = getDatabase(); const stmt = db.prepare(` SELECT id, imgFilename, createdAt FROM frame @@ -32,12 +33,12 @@ export function checkFramesForEncoding(db: Database) { const lastFramePath = join(getScreenshotsDir(), lastFrame.imgFilename!); if (!fs.existsSync(framePath)) { console.warn("File not exist:", frame.imgFilename); - deleteFrameFromDB(db, frame.id); + deleteFrameFromDB(frame.id); continue; } if (!fs.existsSync(lastFramePath)) { console.warn("File not exist:", lastFrame.imgFilename); - deleteFrameFromDB(db, lastFrame.id); + deleteFrameFromDB(lastFrame.id); continue; } const currentFrameSize = sizeOf(framePath); @@ -73,7 +74,8 @@ export function checkFramesForEncoding(db: Database) { } } -function deleteEncodedScreenshots(db: Database) { +function deleteEncodedScreenshots() { + const db = getDatabase(); // TODO: double-check that the frame was really encoded into the video const stmt = db.prepare(` SELECT * FROM frame WHERE encodeStatus = 2 AND imgFilename IS NOT NULL; @@ -89,7 +91,8 @@ function deleteEncodedScreenshots(db: Database) { } } -function _deleteNonExistentScreenshots(db: Database) { +function _deleteNonExistentScreenshots() { + const db = getDatabase(); const screenshotDir = getScreenshotsDir(); const filesInDir = new Set(fs.readdirSync(screenshotDir)); @@ -107,12 +110,13 @@ function _deleteNonExistentScreenshots(db: Database) { } } -export async function deleteUnnecessaryScreenshots(db: Database) { - deleteEncodedScreenshots(db); - //deleteNonExistentScreenshots(db); +export async function deleteUnnecessaryScreenshots() { + deleteEncodedScreenshots(); + //deleteNonExistentScreenshots(); } -export function deleteFrameFromDB(db: Database, id: number) { +export function deleteFrameFromDB(id: number) { + const db = getDatabase(); const deleteStmt = db.prepare(` DELETE FROM frame WHERE id = ?; `); @@ -136,7 +140,8 @@ function createMetaFile(frames: Frame[]) { } // Check and process encoding task -export function processEncodingTasks(db: Database) { +export function processEncodingTasks() { + const db = getDatabase(); let tasksPerforming = getTasksPerforming(); if (tasksPerforming.length >= CONCURRENCY) return; diff --git a/src/electron/backend/scheduler.ts b/src/electron/backend/scheduler.ts new file mode 100644 index 0000000..485d103 --- /dev/null +++ b/src/electron/backend/scheduler.ts @@ -0,0 +1,235 @@ +type TaskId = string; +type TaskFunction = () => void; + +interface Task { + id: TaskId; + func: TaskFunction; + interval?: number; + maxInterval?: number; + lastRun?: number; + nextRun?: number; + isPaused: boolean; + delayUntil?: number; +} + +export interface TaskStatus { + status: "NOT_FOUND" | "PAUSED" | "DELAYED" | "SCHEDULED" | "IDLE"; + until?: string; + nextRun?: string; +} + +export class Scheduler { + private tasks: Map = new Map(); + private timer: NodeJS.Timeout | null = null; + private nextTickTime: number | null = null; + + constructor(private readonly minTickInterval: number = 500) { + this.start(); + } + + private start(): void { + this.scheduleNextTick(); + } + + private scheduleNextTick(): void { + if (this.timer) { + clearTimeout(this.timer); + } + + const now = Date.now(); + let nextTick = now + this.minTickInterval; + + for (const task of this.tasks.values()) { + const isTaskPaused = task.isPaused; + const isTaskDelayed = task.delayUntil && now < task.delayUntil; + if (isTaskPaused || isTaskDelayed) { + continue; + } + + const nextTaskEarlierThanNextTick = task.nextRun && task.nextRun < nextTick; + if (nextTaskEarlierThanNextTick) { + nextTick = task.nextRun!; + } + } + + const delay = Math.max(0, nextTick - now); + this.timer = setTimeout(() => this.tick(), delay); + } + + private tickSingleTask( + task: Task, + getNextTick: () => number, + updateNextTick: (nextTick: number) => void + ): void { + const now = Date.now(); + const isTaskPaused = task.isPaused; + const isTaskDelayed = task.delayUntil && now < task.delayUntil; + + if (isTaskPaused || isTaskDelayed) { + return; + } + + const isTaskReadyForIntervalRun = task.interval && task.nextRun && now >= task.nextRun; + if (isTaskReadyForIntervalRun) { + task.func(); + task.lastRun = now; + task.nextRun = now + task.interval!; + } + + const isTaskReadyForMaxIntervalRun = + task.maxInterval && task.lastRun && now - task.lastRun >= task.maxInterval; + if (isTaskReadyForMaxIntervalRun) { + task.func(); + task.lastRun = now; + if (task.interval) { + task.nextRun = now + task.interval; + } + } + + const isTaskNextRunEarlierThanNextTick = task.nextRun && task.nextRun < getNextTick(); + if (isTaskNextRunEarlierThanNextTick) { + updateNextTick(task.nextRun!); + } + } + + private tick(): void { + const now = Date.now(); + let nextTick = now + this.minTickInterval; + + for (const task of this.tasks.values()) { + this.tickSingleTask( + task, + () => nextTick, + (v) => (nextTick = v) + ); + } + + this.scheduleNextTick(); + } + + /** + * Add a new task to the scheduler. + * + * @param id A unique string identifier for the task. + * @param func The function to be executed by the task. + * @param interval The interval (in milliseconds) between task executions. + * @param maxInterval The maximum time (in milliseconds) that a task can wait before being executed. + * If a task has not been executed in this amount of time, it will be executed immediately. + */ + addTask(id: TaskId, func: TaskFunction, interval?: number, maxInterval?: number): void { + this.tasks.set(id, { + id, + func, + interval, + maxInterval, + isPaused: false, + lastRun: undefined, + nextRun: interval ? Date.now() + interval : undefined + }); + + this.scheduleNextTick(); + } + + /** + * Trigger a task to execute immediately, regardless of its current state. + * + * If the task is paused or delayed, it will not be executed. + * + * @param id The unique string identifier for the task. + */ + triggerTask(id: TaskId): void { + const task = this.tasks.get(id); + if (task && !task.isPaused && (!task.delayUntil || Date.now() >= task.delayUntil)) { + task.func(); + task.lastRun = Date.now(); + if (task.interval) { + task.nextRun = Date.now() + task.interval; + } + } + + this.scheduleNextTick(); + } + + /** + * Pause a task, so that it will not be executed until it is resumed. + * + * @param id The unique string identifier for the task. + */ + pauseTask(id: TaskId): void { + const task = this.tasks.get(id); + if (task) { + task.isPaused = true; + } + + this.scheduleNextTick(); + } + + /** + * Resume a paused task, so that it can be executed according to its interval and maxInterval. + * + * @param id The unique string identifier for the task. + */ + resumeTask(id: TaskId): void { + const task = this.tasks.get(id); + if (task) { + task.isPaused = false; + } + + this.scheduleNextTick(); + } + + /** + * Delay a task from being executed for a specified amount of time. + * + * @param id The unique string identifier for the task. + * @param delayMs The amount of time in milliseconds to delay the task's execution. + */ + delayTask(id: TaskId, delayMs: number): void { + const task = this.tasks.get(id); + if (task) { + task.delayUntil = Date.now() + delayMs; + } + + this.scheduleNextTick(); + } + + setTaskInterval(id: TaskId, interval: number): void { + const task = this.tasks.get(id); + if (task) { + task.interval = interval; + task.nextRun = Date.now() + interval; + } + + this.scheduleNextTick(); + } + + getTaskStatus(id: TaskId): TaskStatus { + const task = this.tasks.get(id); + if (!task) { + return { status: "NOT_FOUND" }; + } + if (task.isPaused) { + return { status: "PAUSED" }; + } + if (task.delayUntil && Date.now() < task.delayUntil) { + return { + status: "DELAYED", + until: new Date(task.delayUntil).toLocaleString() + }; + } + if (task.nextRun) { + return { + status: "SCHEDULED", + nextRun: new Date(task.nextRun).toLocaleString() + }; + } + return { status: "IDLE" }; + } + + stop(): void { + if (this.timer) { + clearTimeout(this.timer); + this.timer = null; + } + } +} diff --git a/src/electron/backend/screenshot.ts b/src/electron/backend/screenshot.ts index ce2a074..da5a64a 100644 --- a/src/electron/backend/screenshot.ts +++ b/src/electron/backend/screenshot.ts @@ -1,25 +1,23 @@ import screenshot from "screenshot-desktop"; -import { getScreenshotsDir } from "../utils/index.js"; +import { getDatabase, getScreenshotsDir } from "../utils/index.js"; import { join } from "path"; -import { Database } from "better-sqlite3"; import SqlString from "sqlstring"; -export function startScreenshotLoop(db: Database) { - return setInterval(() => { - const timestamp = new Date().getTime(); - const screenshotDir = getScreenshotsDir(); - const filename = `${timestamp}.png`; - const screenshotPath = join(screenshotDir, filename); - screenshot({ filename: screenshotPath, format: "png" }) - .then(() => { - const SQL = SqlString.format( - "INSERT INTO frame (imgFilename, createdAt) VALUES (?, ?)", - [filename, new Date().getTime() / 1000] - ); - db.exec(SQL); - }) - .catch((err) => { - console.error(err); - }); - }, 2000); +export function takeScreenshot() { + const db = getDatabase(); + const timestamp = new Date().getTime(); + const screenshotDir = getScreenshotsDir(); + const filename = `${timestamp}.png`; + const screenshotPath = join(screenshotDir, filename); + screenshot({ filename: screenshotPath, format: "png" }) + .then(() => { + const SQL = SqlString.format( + "INSERT INTO frame (imgFilename, createdAt) VALUES (?, ?)", + [filename, new Date().getTime() / 1000] + ); + db.exec(SQL); + }) + .catch((err) => { + console.error(err); + }); } diff --git a/src/electron/index.ts b/src/electron/index.ts index 066b22f..3a64ce9 100644 --- a/src/electron/index.ts +++ b/src/electron/index.ts @@ -14,7 +14,7 @@ import initI18n from "./i18n.js"; import { createMainWindow, createSettingsWindow } from "./createWindow.js"; import { initDatabase } from "./backend/init.js"; import { Database } from "better-sqlite3"; -import { startScreenshotLoop } from "./backend/screenshot.js"; +import { takeScreenshot } from "./backend/screenshot.js"; import { __dirname } from "./dirname.js"; import { hideDock } from "./utils/index.js"; import { @@ -27,16 +27,17 @@ import { serve } from "@hono/node-server"; import { findAvailablePort } from "./utils/index.js"; import cache from "memory-cache"; import { generate as generateAPIKey } from "@alikia/random-key"; +import { Scheduler } from "./backend/scheduler.js"; const i18n = initI18n(); const t = i18n.t.bind(i18n); const port = process.env.PORT || "5173"; const dev = !app.isPackaged; +const scheduler = new Scheduler(); let tray: null | Tray = null; let dbConnection: null | Database = null; -let screenshotInterval: null | NodeJS.Timeout = null; let mainWindow: BrowserWindow | null; let settingsWindow: BrowserWindow | null; @@ -109,10 +110,10 @@ app.on("ready", () => { }); }); initDatabase().then((db) => { - screenshotInterval = startScreenshotLoop(db); - setInterval(checkFramesForEncoding, 5000, db); - setInterval(processEncodingTasks, 10000, db); - setInterval(deleteUnnecessaryScreenshots, 20000, db); + scheduler.addTask("screenshot", takeScreenshot, 2000, 2000); + scheduler.addTask("check-encoding", checkFramesForEncoding, 5000, 10000); + scheduler.addTask("process-encoding", processEncodingTasks, 10000, 30000); + scheduler.addTask("delete-screenshots", deleteUnnecessaryScreenshots, 20000, 60000); dbConnection = db; cache.put("server:dbConnection", dbConnection); }); @@ -126,7 +127,7 @@ app.on("ready", () => { app.on("will-quit", () => { dbConnection?.close(); - if (screenshotInterval) clearInterval(screenshotInterval); + scheduler.stop(); }); ipcMain.on("close-settings", () => { @@ -134,8 +135,8 @@ ipcMain.on("close-settings", () => { }); ipcMain.handle("request-api-info", () => { - return { - port: cache.get("server:port"), - apiKey: cache.get("server:APIKey") - }; + return { + port: cache.get("server:port"), + apiKey: cache.get("server:APIKey") + }; }); diff --git a/src/electron/utils/backend/index.ts b/src/electron/utils/backend/index.ts new file mode 100644 index 0000000..64e9497 --- /dev/null +++ b/src/electron/utils/backend/index.ts @@ -0,0 +1,6 @@ +import { Database } from "better-sqlite3"; +import cache from "memory-cache"; + +export function getDatabase(): Database { + return cache.get("server:dbConnection"); +} diff --git a/src/electron/utils/index.ts b/src/electron/utils/index.ts index 68f2c17..1482d9e 100644 --- a/src/electron/utils/index.ts +++ b/src/electron/utils/index.ts @@ -2,4 +2,5 @@ export * from "./fs/index.js"; export * from "./platform/index.js"; export * from "./video/index.js"; export * from "./network/index.js"; -export * from "./logging/index.js"; \ No newline at end of file +export * from "./logging/index.js"; +export * from "./backend/index.js"; \ No newline at end of file