From 80d0342636056bc839517b64fd11708abea70237 Mon Sep 17 00:00:00 2001 From: memdmp Date: Wed, 20 Aug 2025 12:46:01 +0000 Subject: feat: locks :3 --- src/lib/vendor/async-mutex/withtimeout.ts | 140 ++++++++++++++++++++++++++++++ 1 file changed, 140 insertions(+) create mode 100644 src/lib/vendor/async-mutex/withtimeout.ts (limited to 'src/lib/vendor/async-mutex/withtimeout.ts') diff --git a/src/lib/vendor/async-mutex/withtimeout.ts b/src/lib/vendor/async-mutex/withtimeout.ts new file mode 100644 index 0000000..0e115b9 --- /dev/null +++ b/src/lib/vendor/async-mutex/withtimeout.ts @@ -0,0 +1,140 @@ +/* eslint-disable @typescript-eslint/no-explicit-any */ +import { E_TIMEOUT } from './errors'; +import type { MutexInterface } from './interface'; +import type { SemaphoreInterface } from './semaphoreinterface'; + +export function withTimeout( + mutex: MutexInterface, + timeout: number, + timeoutError?: Error +): MutexInterface; +export function withTimeout( + semaphore: SemaphoreInterface, + timeout: number, + timeoutError?: Error +): SemaphoreInterface; +export function withTimeout( + sync: MutexInterface | SemaphoreInterface, + timeout: number, + timeoutError = E_TIMEOUT +): any { + return { + acquire: ( + weightOrPriority?: number, + priority?: number + ): Promise< + MutexInterface.Releaser | [number, SemaphoreInterface.Releaser] + > => { + let weight: number | undefined; + if (isSemaphore(sync)) { + weight = weightOrPriority; + } else { + weight = undefined; + priority = weightOrPriority; + } + if (weight !== undefined && weight <= 0) { + throw new Error(`invalid weight ${weight}: must be positive`); + } + + return new Promise(async (resolve, reject) => { + let isTimeout = false; + + const handle = setTimeout(() => { + isTimeout = true; + reject(timeoutError); + }, timeout); + + try { + const ticket = await (isSemaphore(sync) + ? sync.acquire(weight, priority) + : sync.acquire(priority)); + if (isTimeout) { + const release = Array.isArray(ticket) ? ticket[1] : ticket; + + release(); + } else { + clearTimeout(handle); + resolve(ticket); + } + } catch (e) { + if (!isTimeout) { + clearTimeout(handle); + + reject(e); + } + } + }); + }, + + async runExclusive( + callback: (value?: number) => Promise | T, + weight?: number, + priority?: number + ): Promise { + let release: () => void = () => undefined; + + try { + const ticket = await this.acquire(weight, priority); + + if (Array.isArray(ticket)) { + release = ticket[1]; + + return await callback(ticket[0]); + } else { + release = ticket; + + return await callback(); + } + } finally { + release(); + } + }, + + release(weight?: number): void { + sync.release(weight); + }, + + cancel(): void { + return sync.cancel(); + }, + + waitForUnlock: ( + weightOrPriority?: number, + priority?: number + ): Promise => { + let weight: number | undefined; + if (isSemaphore(sync)) { + weight = weightOrPriority; + } else { + weight = undefined; + priority = weightOrPriority; + } + if (weight !== undefined && weight <= 0) { + throw new Error(`invalid weight ${weight}: must be positive`); + } + + return new Promise((resolve, reject) => { + const handle = setTimeout(() => reject(timeoutError), timeout); + (isSemaphore(sync) + ? sync.waitForUnlock(weight, priority) + : sync.waitForUnlock(priority) + ).then(() => { + clearTimeout(handle); + resolve(); + }); + }); + }, + + isLocked: (): boolean => sync.isLocked(), + + getValue: (): number => (sync as SemaphoreInterface).getValue(), + + setValue: (value: number) => (sync as SemaphoreInterface).setValue(value), + }; +} + +function isSemaphore( + sync: SemaphoreInterface | MutexInterface +): sync is SemaphoreInterface { + return (sync as SemaphoreInterface).getValue !== undefined; +} -- cgit v1.2.3