diff options
Diffstat (limited to 'src/lib/vendor/async-mutex')
| -rw-r--r-- | src/lib/vendor/async-mutex/README | 1 | ||||
| -rw-r--r-- | src/lib/vendor/async-mutex/errors.ts | 5 | ||||
| -rw-r--r-- | src/lib/vendor/async-mutex/index.ts | 7 | ||||
| -rw-r--r-- | src/lib/vendor/async-mutex/interface.ts | 26 | ||||
| -rw-r--r-- | src/lib/vendor/async-mutex/mutex.ts | 41 | ||||
| -rw-r--r-- | src/lib/vendor/async-mutex/semaphore.ts | 176 | ||||
| -rw-r--r-- | src/lib/vendor/async-mutex/semaphoreinterface.ts | 34 | ||||
| -rw-r--r-- | src/lib/vendor/async-mutex/tryaquire.ts | 21 | ||||
| -rw-r--r-- | src/lib/vendor/async-mutex/withtimeout.ts | 140 |
9 files changed, 451 insertions, 0 deletions
diff --git a/src/lib/vendor/async-mutex/README b/src/lib/vendor/async-mutex/README new file mode 100644 index 0000000..73714b7 --- /dev/null +++ b/src/lib/vendor/async-mutex/README @@ -0,0 +1 @@ +https://github.com/DirtyHairy/async-mutex
\ No newline at end of file diff --git a/src/lib/vendor/async-mutex/errors.ts b/src/lib/vendor/async-mutex/errors.ts new file mode 100644 index 0000000..02e3eb0 --- /dev/null +++ b/src/lib/vendor/async-mutex/errors.ts @@ -0,0 +1,5 @@ +export const E_TIMEOUT = new Error( + 'timeout while waiting for mutex to become available' +); +export const E_ALREADY_LOCKED = new Error('mutex already locked'); +export const E_CANCELED = new Error('request for lock canceled'); diff --git a/src/lib/vendor/async-mutex/index.ts b/src/lib/vendor/async-mutex/index.ts new file mode 100644 index 0000000..a309591 --- /dev/null +++ b/src/lib/vendor/async-mutex/index.ts @@ -0,0 +1,7 @@ +export { default as Mutex } from './mutex'; +export type { MutexInterface } from './interface'; +export { default as Semaphore } from './semaphore'; +export type { SemaphoreInterface } from './semaphoreinterface'; +export { withTimeout } from './withtimeout'; +export { tryAcquire } from './tryaquire'; +export * from './errors'; diff --git a/src/lib/vendor/async-mutex/interface.ts b/src/lib/vendor/async-mutex/interface.ts new file mode 100644 index 0000000..736b338 --- /dev/null +++ b/src/lib/vendor/async-mutex/interface.ts @@ -0,0 +1,26 @@ +export interface MutexInterface { + acquire(priority?: number): Promise<MutexInterface.Releaser>; + + runExclusive<T>( + callback: MutexInterface.Worker<T>, + priority?: number + ): Promise<T>; + + waitForUnlock(priority?: number): Promise<void>; + + isLocked(): boolean; + + release(): void; + + cancel(): void; +} + +export namespace MutexInterface { + export interface Releaser { + (): void; + } + + export interface Worker<T> { + (): Promise<T> | T; + } +} diff --git a/src/lib/vendor/async-mutex/mutex.ts b/src/lib/vendor/async-mutex/mutex.ts new file mode 100644 index 0000000..9fe6e7a --- /dev/null +++ b/src/lib/vendor/async-mutex/mutex.ts @@ -0,0 +1,41 @@ +import type { MutexInterface } from './interface'; +import Semaphore from './semaphore'; + +class Mutex implements MutexInterface { + constructor(cancelError?: Error) { + this._semaphore = new Semaphore(1, cancelError); + } + + async acquire(priority = 0): Promise<MutexInterface.Releaser> { + const [, releaser] = await this._semaphore.acquire(1, priority); + + return releaser; + } + + runExclusive<T>( + callback: MutexInterface.Worker<T>, + priority = 0 + ): Promise<T> { + return this._semaphore.runExclusive(() => callback(), 1, priority); + } + + isLocked(): boolean { + return this._semaphore.isLocked(); + } + + waitForUnlock(priority = 0): Promise<void> { + return this._semaphore.waitForUnlock(1, priority); + } + + release(): void { + if (this._semaphore.isLocked()) this._semaphore.release(); + } + + cancel(): void { + return this._semaphore.cancel(); + } + + private _semaphore: Semaphore; +} + +export default Mutex; diff --git a/src/lib/vendor/async-mutex/semaphore.ts b/src/lib/vendor/async-mutex/semaphore.ts new file mode 100644 index 0000000..385e814 --- /dev/null +++ b/src/lib/vendor/async-mutex/semaphore.ts @@ -0,0 +1,176 @@ +import { E_CANCELED } from './errors'; +import type { SemaphoreInterface } from './semaphoreinterface'; + +interface Priority { + priority: number; +} + +interface QueueEntry { + resolve(result: [number, SemaphoreInterface.Releaser]): void; + reject(error: unknown): void; + weight: number; + priority: number; +} + +interface Waiter { + resolve(): void; + priority: number; +} + +class Semaphore implements SemaphoreInterface { + constructor( + private _value: number, + private _cancelError: Error = E_CANCELED + ) {} + + acquire( + weight = 1, + priority = 0 + ): Promise<[number, SemaphoreInterface.Releaser]> { + if (weight <= 0) + throw new Error(`invalid weight ${weight}: must be positive`); + + return new Promise((resolve, reject) => { + const task: QueueEntry = { resolve, reject, weight, priority }; + const i = findIndexFromEnd( + this._queue, + (other) => priority <= other.priority + ); + if (i === -1 && weight <= this._value) { + // Needs immediate dispatch, skip the queue + this._dispatchItem(task); + } else { + this._queue.splice(i + 1, 0, task); + } + }); + } + + async runExclusive<T>( + callback: SemaphoreInterface.Worker<T>, + weight = 1, + priority = 0 + ): Promise<T> { + const [value, release] = await this.acquire(weight, priority); + + try { + return await callback(value); + } finally { + release(); + } + } + + waitForUnlock(weight = 1, priority = 0): Promise<void> { + if (weight <= 0) + throw new Error(`invalid weight ${weight}: must be positive`); + + if (this._couldLockImmediately(weight, priority)) { + return Promise.resolve(); + } else { + return new Promise((resolve) => { + if (!this._weightedWaiters[weight - 1]) + this._weightedWaiters[weight - 1] = []; + insertSorted(this._weightedWaiters[weight - 1], { resolve, priority }); + }); + } + } + + isLocked(): boolean { + return this._value <= 0; + } + + getValue(): number { + return this._value; + } + + setValue(value: number): void { + this._value = value; + this._dispatchQueue(); + } + + release(weight = 1): void { + if (weight <= 0) + throw new Error(`invalid weight ${weight}: must be positive`); + + this._value += weight; + this._dispatchQueue(); + } + + cancel(): void { + this._queue.forEach((entry) => entry.reject(this._cancelError)); + this._queue = []; + } + + private _dispatchQueue(): void { + this._drainUnlockWaiters(); + while (this._queue.length > 0 && this._queue[0].weight <= this._value) { + this._dispatchItem(this._queue.shift()!); + this._drainUnlockWaiters(); + } + } + + private _dispatchItem(item: QueueEntry): void { + const previousValue = this._value; + this._value -= item.weight; + item.resolve([previousValue, this._newReleaser(item.weight)]); + } + + private _newReleaser(weight: number): () => void { + let called = false; + + return () => { + if (called) return; + called = true; + + this.release(weight); + }; + } + + private _drainUnlockWaiters(): void { + if (this._queue.length === 0) { + for (let weight = this._value; weight > 0; weight--) { + const waiters = this._weightedWaiters[weight - 1]; + if (!waiters) continue; + waiters.forEach((waiter) => waiter.resolve()); + this._weightedWaiters[weight - 1] = []; + } + } else { + const queuedPriority = this._queue[0].priority; + for (let weight = this._value; weight > 0; weight--) { + const waiters = this._weightedWaiters[weight - 1]; + if (!waiters) continue; + const i = waiters.findIndex( + (waiter) => waiter.priority <= queuedPriority + ); + (i === -1 ? waiters : waiters.splice(0, i)).forEach((waiter) => + waiter.resolve() + ); + } + } + } + + private _couldLockImmediately(weight: number, priority: number) { + return ( + (this._queue.length === 0 || this._queue[0].priority < priority) && + weight <= this._value + ); + } + + private _queue: Array<QueueEntry> = []; + private _weightedWaiters: Array<Array<Waiter>> = []; +} + +function insertSorted<T extends Priority>(a: T[], v: T) { + const i = findIndexFromEnd(a, (other) => v.priority <= other.priority); + a.splice(i + 1, 0, v); +} + +function findIndexFromEnd<T>(a: T[], predicate: (e: T) => boolean): number { + for (let i = a.length - 1; i >= 0; i--) { + if (predicate(a[i])) { + return i; + } + } + return -1; +} + +export default Semaphore; diff --git a/src/lib/vendor/async-mutex/semaphoreinterface.ts b/src/lib/vendor/async-mutex/semaphoreinterface.ts new file mode 100644 index 0000000..bbbdc6b --- /dev/null +++ b/src/lib/vendor/async-mutex/semaphoreinterface.ts @@ -0,0 +1,34 @@ +export interface SemaphoreInterface { + acquire( + weight?: number, + priority?: number + ): Promise<[number, SemaphoreInterface.Releaser]>; + + runExclusive<T>( + callback: SemaphoreInterface.Worker<T>, + weight?: number, + priority?: number + ): Promise<T>; + + waitForUnlock(weight?: number, priority?: number): Promise<void>; + + isLocked(): boolean; + + getValue(): number; + + setValue(value: number): void; + + release(weight?: number): void; + + cancel(): void; +} + +export namespace SemaphoreInterface { + export interface Releaser { + (): void; + } + + export interface Worker<T> { + (value: number): Promise<T> | T; + } +} diff --git a/src/lib/vendor/async-mutex/tryaquire.ts b/src/lib/vendor/async-mutex/tryaquire.ts new file mode 100644 index 0000000..359bafa --- /dev/null +++ b/src/lib/vendor/async-mutex/tryaquire.ts @@ -0,0 +1,21 @@ +import { E_ALREADY_LOCKED } from './errors'; +import MutexInterface from './mutex'; +import type { SemaphoreInterface } from './semaphoreinterface'; +import { withTimeout } from './withtimeout'; + +export function tryAcquire( + mutex: MutexInterface, + alreadyAcquiredError?: Error +): MutexInterface; +export function tryAcquire( + semaphore: SemaphoreInterface, + alreadyAcquiredError?: Error +): SemaphoreInterface; +// eslint-disable-next-lisne @typescript-eslint/explicit-module-boundary-types +export function tryAcquire( + sync: MutexInterface | SemaphoreInterface, + alreadyAcquiredError = E_ALREADY_LOCKED +): typeof sync { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + return withTimeout(sync as any, 0, alreadyAcquiredError); +} diff --git a/src/lib/vendor/async-mutex/withtimeout.ts b/src/lib/vendor/async-mutex/withtimeout.ts new file mode 100644 index 0000000..0e115b9 --- /dev/null +++ b/src/lib/vendor/async-mutex/withtimeout.ts @@ -0,0 +1,140 @@ +/* eslint-disable @typescript-eslint/no-explicit-any */ +import { E_TIMEOUT } from './errors'; +import type { MutexInterface } from './interface'; +import type { SemaphoreInterface } from './semaphoreinterface'; + +export function withTimeout( + mutex: MutexInterface, + timeout: number, + timeoutError?: Error +): MutexInterface; +export function withTimeout( + semaphore: SemaphoreInterface, + timeout: number, + timeoutError?: Error +): SemaphoreInterface; +export function withTimeout( + sync: MutexInterface | SemaphoreInterface, + timeout: number, + timeoutError = E_TIMEOUT +): any { + return { + acquire: ( + weightOrPriority?: number, + priority?: number + ): Promise< + MutexInterface.Releaser | [number, SemaphoreInterface.Releaser] + > => { + let weight: number | undefined; + if (isSemaphore(sync)) { + weight = weightOrPriority; + } else { + weight = undefined; + priority = weightOrPriority; + } + if (weight !== undefined && weight <= 0) { + throw new Error(`invalid weight ${weight}: must be positive`); + } + + return new Promise(async (resolve, reject) => { + let isTimeout = false; + + const handle = setTimeout(() => { + isTimeout = true; + reject(timeoutError); + }, timeout); + + try { + const ticket = await (isSemaphore(sync) + ? sync.acquire(weight, priority) + : sync.acquire(priority)); + if (isTimeout) { + const release = Array.isArray(ticket) ? ticket[1] : ticket; + + release(); + } else { + clearTimeout(handle); + resolve(ticket); + } + } catch (e) { + if (!isTimeout) { + clearTimeout(handle); + + reject(e); + } + } + }); + }, + + async runExclusive<T>( + callback: (value?: number) => Promise<T> | T, + weight?: number, + priority?: number + ): Promise<T> { + let release: () => void = () => undefined; + + try { + const ticket = await this.acquire(weight, priority); + + if (Array.isArray(ticket)) { + release = ticket[1]; + + return await callback(ticket[0]); + } else { + release = ticket; + + return await callback(); + } + } finally { + release(); + } + }, + + release(weight?: number): void { + sync.release(weight); + }, + + cancel(): void { + return sync.cancel(); + }, + + waitForUnlock: ( + weightOrPriority?: number, + priority?: number + ): Promise<void> => { + let weight: number | undefined; + if (isSemaphore(sync)) { + weight = weightOrPriority; + } else { + weight = undefined; + priority = weightOrPriority; + } + if (weight !== undefined && weight <= 0) { + throw new Error(`invalid weight ${weight}: must be positive`); + } + + return new Promise((resolve, reject) => { + const handle = setTimeout(() => reject(timeoutError), timeout); + (isSemaphore(sync) + ? sync.waitForUnlock(weight, priority) + : sync.waitForUnlock(priority) + ).then(() => { + clearTimeout(handle); + resolve(); + }); + }); + }, + + isLocked: (): boolean => sync.isLocked(), + + getValue: (): number => (sync as SemaphoreInterface).getValue(), + + setValue: (value: number) => (sync as SemaphoreInterface).setValue(value), + }; +} + +function isSemaphore( + sync: SemaphoreInterface | MutexInterface +): sync is SemaphoreInterface { + return (sync as SemaphoreInterface).getValue !== undefined; +} |