diff options
feat: locks :3
| -rw-r--r-- | src/lib/vendor/async-mutex/README | 1 | ||||
| -rw-r--r-- | src/lib/vendor/async-mutex/errors.ts | 5 | ||||
| -rw-r--r-- | src/lib/vendor/async-mutex/index.ts | 7 | ||||
| -rw-r--r-- | src/lib/vendor/async-mutex/interface.ts | 26 | ||||
| -rw-r--r-- | src/lib/vendor/async-mutex/mutex.ts | 41 | ||||
| -rw-r--r-- | src/lib/vendor/async-mutex/semaphore.ts | 176 | ||||
| -rw-r--r-- | src/lib/vendor/async-mutex/semaphoreinterface.ts | 34 | ||||
| -rw-r--r-- | src/lib/vendor/async-mutex/tryaquire.ts | 21 | ||||
| -rw-r--r-- | src/lib/vendor/async-mutex/withtimeout.ts | 140 | ||||
| -rw-r--r-- | src/lib/vendor/lock.ts | 252 |
10 files changed, 703 insertions, 0 deletions
diff --git a/src/lib/vendor/async-mutex/README b/src/lib/vendor/async-mutex/README new file mode 100644 index 0000000..73714b7 --- /dev/null +++ b/src/lib/vendor/async-mutex/README @@ -0,0 +1 @@ +https://github.com/DirtyHairy/async-mutex
\ No newline at end of file diff --git a/src/lib/vendor/async-mutex/errors.ts b/src/lib/vendor/async-mutex/errors.ts new file mode 100644 index 0000000..02e3eb0 --- /dev/null +++ b/src/lib/vendor/async-mutex/errors.ts @@ -0,0 +1,5 @@ +export const E_TIMEOUT = new Error( + 'timeout while waiting for mutex to become available' +); +export const E_ALREADY_LOCKED = new Error('mutex already locked'); +export const E_CANCELED = new Error('request for lock canceled'); diff --git a/src/lib/vendor/async-mutex/index.ts b/src/lib/vendor/async-mutex/index.ts new file mode 100644 index 0000000..a309591 --- /dev/null +++ b/src/lib/vendor/async-mutex/index.ts @@ -0,0 +1,7 @@ +export { default as Mutex } from './mutex'; +export type { MutexInterface } from './interface'; +export { default as Semaphore } from './semaphore'; +export type { SemaphoreInterface } from './semaphoreinterface'; +export { withTimeout } from './withtimeout'; +export { tryAcquire } from './tryaquire'; +export * from './errors'; diff --git a/src/lib/vendor/async-mutex/interface.ts b/src/lib/vendor/async-mutex/interface.ts new file mode 100644 index 0000000..736b338 --- /dev/null +++ b/src/lib/vendor/async-mutex/interface.ts @@ -0,0 +1,26 @@ +export interface MutexInterface { + acquire(priority?: number): Promise<MutexInterface.Releaser>; + + runExclusive<T>( + callback: MutexInterface.Worker<T>, + priority?: number + ): Promise<T>; + + waitForUnlock(priority?: number): Promise<void>; + + isLocked(): boolean; + + release(): void; + + cancel(): void; +} + +export namespace MutexInterface { + export interface Releaser { + (): void; + } + + export interface Worker<T> { + (): Promise<T> | T; + } +} diff --git a/src/lib/vendor/async-mutex/mutex.ts b/src/lib/vendor/async-mutex/mutex.ts new file mode 100644 index 0000000..9fe6e7a --- /dev/null +++ b/src/lib/vendor/async-mutex/mutex.ts @@ -0,0 +1,41 @@ +import type { MutexInterface } from './interface'; +import Semaphore from './semaphore'; + +class Mutex implements MutexInterface { + constructor(cancelError?: Error) { + this._semaphore = new Semaphore(1, cancelError); + } + + async acquire(priority = 0): Promise<MutexInterface.Releaser> { + const [, releaser] = await this._semaphore.acquire(1, priority); + + return releaser; + } + + runExclusive<T>( + callback: MutexInterface.Worker<T>, + priority = 0 + ): Promise<T> { + return this._semaphore.runExclusive(() => callback(), 1, priority); + } + + isLocked(): boolean { + return this._semaphore.isLocked(); + } + + waitForUnlock(priority = 0): Promise<void> { + return this._semaphore.waitForUnlock(1, priority); + } + + release(): void { + if (this._semaphore.isLocked()) this._semaphore.release(); + } + + cancel(): void { + return this._semaphore.cancel(); + } + + private _semaphore: Semaphore; +} + +export default Mutex; diff --git a/src/lib/vendor/async-mutex/semaphore.ts b/src/lib/vendor/async-mutex/semaphore.ts new file mode 100644 index 0000000..385e814 --- /dev/null +++ b/src/lib/vendor/async-mutex/semaphore.ts @@ -0,0 +1,176 @@ +import { E_CANCELED } from './errors'; +import type { SemaphoreInterface } from './semaphoreinterface'; + +interface Priority { + priority: number; +} + +interface QueueEntry { + resolve(result: [number, SemaphoreInterface.Releaser]): void; + reject(error: unknown): void; + weight: number; + priority: number; +} + +interface Waiter { + resolve(): void; + priority: number; +} + +class Semaphore implements SemaphoreInterface { + constructor( + private _value: number, + private _cancelError: Error = E_CANCELED + ) {} + + acquire( + weight = 1, + priority = 0 + ): Promise<[number, SemaphoreInterface.Releaser]> { + if (weight <= 0) + throw new Error(`invalid weight ${weight}: must be positive`); + + return new Promise((resolve, reject) => { + const task: QueueEntry = { resolve, reject, weight, priority }; + const i = findIndexFromEnd( + this._queue, + (other) => priority <= other.priority + ); + if (i === -1 && weight <= this._value) { + // Needs immediate dispatch, skip the queue + this._dispatchItem(task); + } else { + this._queue.splice(i + 1, 0, task); + } + }); + } + + async runExclusive<T>( + callback: SemaphoreInterface.Worker<T>, + weight = 1, + priority = 0 + ): Promise<T> { + const [value, release] = await this.acquire(weight, priority); + + try { + return await callback(value); + } finally { + release(); + } + } + + waitForUnlock(weight = 1, priority = 0): Promise<void> { + if (weight <= 0) + throw new Error(`invalid weight ${weight}: must be positive`); + + if (this._couldLockImmediately(weight, priority)) { + return Promise.resolve(); + } else { + return new Promise((resolve) => { + if (!this._weightedWaiters[weight - 1]) + this._weightedWaiters[weight - 1] = []; + insertSorted(this._weightedWaiters[weight - 1], { resolve, priority }); + }); + } + } + + isLocked(): boolean { + return this._value <= 0; + } + + getValue(): number { + return this._value; + } + + setValue(value: number): void { + this._value = value; + this._dispatchQueue(); + } + + release(weight = 1): void { + if (weight <= 0) + throw new Error(`invalid weight ${weight}: must be positive`); + + this._value += weight; + this._dispatchQueue(); + } + + cancel(): void { + this._queue.forEach((entry) => entry.reject(this._cancelError)); + this._queue = []; + } + + private _dispatchQueue(): void { + this._drainUnlockWaiters(); + while (this._queue.length > 0 && this._queue[0].weight <= this._value) { + this._dispatchItem(this._queue.shift()!); + this._drainUnlockWaiters(); + } + } + + private _dispatchItem(item: QueueEntry): void { + const previousValue = this._value; + this._value -= item.weight; + item.resolve([previousValue, this._newReleaser(item.weight)]); + } + + private _newReleaser(weight: number): () => void { + let called = false; + + return () => { + if (called) return; + called = true; + + this.release(weight); + }; + } + + private _drainUnlockWaiters(): void { + if (this._queue.length === 0) { + for (let weight = this._value; weight > 0; weight--) { + const waiters = this._weightedWaiters[weight - 1]; + if (!waiters) continue; + waiters.forEach((waiter) => waiter.resolve()); + this._weightedWaiters[weight - 1] = []; + } + } else { + const queuedPriority = this._queue[0].priority; + for (let weight = this._value; weight > 0; weight--) { + const waiters = this._weightedWaiters[weight - 1]; + if (!waiters) continue; + const i = waiters.findIndex( + (waiter) => waiter.priority <= queuedPriority + ); + (i === -1 ? waiters : waiters.splice(0, i)).forEach((waiter) => + waiter.resolve() + ); + } + } + } + + private _couldLockImmediately(weight: number, priority: number) { + return ( + (this._queue.length === 0 || this._queue[0].priority < priority) && + weight <= this._value + ); + } + + private _queue: Array<QueueEntry> = []; + private _weightedWaiters: Array<Array<Waiter>> = []; +} + +function insertSorted<T extends Priority>(a: T[], v: T) { + const i = findIndexFromEnd(a, (other) => v.priority <= other.priority); + a.splice(i + 1, 0, v); +} + +function findIndexFromEnd<T>(a: T[], predicate: (e: T) => boolean): number { + for (let i = a.length - 1; i >= 0; i--) { + if (predicate(a[i])) { + return i; + } + } + return -1; +} + +export default Semaphore; diff --git a/src/lib/vendor/async-mutex/semaphoreinterface.ts b/src/lib/vendor/async-mutex/semaphoreinterface.ts new file mode 100644 index 0000000..bbbdc6b --- /dev/null +++ b/src/lib/vendor/async-mutex/semaphoreinterface.ts @@ -0,0 +1,34 @@ +export interface SemaphoreInterface { + acquire( + weight?: number, + priority?: number + ): Promise<[number, SemaphoreInterface.Releaser]>; + + runExclusive<T>( + callback: SemaphoreInterface.Worker<T>, + weight?: number, + priority?: number + ): Promise<T>; + + waitForUnlock(weight?: number, priority?: number): Promise<void>; + + isLocked(): boolean; + + getValue(): number; + + setValue(value: number): void; + + release(weight?: number): void; + + cancel(): void; +} + +export namespace SemaphoreInterface { + export interface Releaser { + (): void; + } + + export interface Worker<T> { + (value: number): Promise<T> | T; + } +} diff --git a/src/lib/vendor/async-mutex/tryaquire.ts b/src/lib/vendor/async-mutex/tryaquire.ts new file mode 100644 index 0000000..359bafa --- /dev/null +++ b/src/lib/vendor/async-mutex/tryaquire.ts @@ -0,0 +1,21 @@ +import { E_ALREADY_LOCKED } from './errors'; +import MutexInterface from './mutex'; +import type { SemaphoreInterface } from './semaphoreinterface'; +import { withTimeout } from './withtimeout'; + +export function tryAcquire( + mutex: MutexInterface, + alreadyAcquiredError?: Error +): MutexInterface; +export function tryAcquire( + semaphore: SemaphoreInterface, + alreadyAcquiredError?: Error +): SemaphoreInterface; +// eslint-disable-next-lisne @typescript-eslint/explicit-module-boundary-types +export function tryAcquire( + sync: MutexInterface | SemaphoreInterface, + alreadyAcquiredError = E_ALREADY_LOCKED +): typeof sync { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + return withTimeout(sync as any, 0, alreadyAcquiredError); +} diff --git a/src/lib/vendor/async-mutex/withtimeout.ts b/src/lib/vendor/async-mutex/withtimeout.ts new file mode 100644 index 0000000..0e115b9 --- /dev/null +++ b/src/lib/vendor/async-mutex/withtimeout.ts @@ -0,0 +1,140 @@ +/* eslint-disable @typescript-eslint/no-explicit-any */ +import { E_TIMEOUT } from './errors'; +import type { MutexInterface } from './interface'; +import type { SemaphoreInterface } from './semaphoreinterface'; + +export function withTimeout( + mutex: MutexInterface, + timeout: number, + timeoutError?: Error +): MutexInterface; +export function withTimeout( + semaphore: SemaphoreInterface, + timeout: number, + timeoutError?: Error +): SemaphoreInterface; +export function withTimeout( + sync: MutexInterface | SemaphoreInterface, + timeout: number, + timeoutError = E_TIMEOUT +): any { + return { + acquire: ( + weightOrPriority?: number, + priority?: number + ): Promise< + MutexInterface.Releaser | [number, SemaphoreInterface.Releaser] + > => { + let weight: number | undefined; + if (isSemaphore(sync)) { + weight = weightOrPriority; + } else { + weight = undefined; + priority = weightOrPriority; + } + if (weight !== undefined && weight <= 0) { + throw new Error(`invalid weight ${weight}: must be positive`); + } + + return new Promise(async (resolve, reject) => { + let isTimeout = false; + + const handle = setTimeout(() => { + isTimeout = true; + reject(timeoutError); + }, timeout); + + try { + const ticket = await (isSemaphore(sync) + ? sync.acquire(weight, priority) + : sync.acquire(priority)); + if (isTimeout) { + const release = Array.isArray(ticket) ? ticket[1] : ticket; + + release(); + } else { + clearTimeout(handle); + resolve(ticket); + } + } catch (e) { + if (!isTimeout) { + clearTimeout(handle); + + reject(e); + } + } + }); + }, + + async runExclusive<T>( + callback: (value?: number) => Promise<T> | T, + weight?: number, + priority?: number + ): Promise<T> { + let release: () => void = () => undefined; + + try { + const ticket = await this.acquire(weight, priority); + + if (Array.isArray(ticket)) { + release = ticket[1]; + + return await callback(ticket[0]); + } else { + release = ticket; + + return await callback(); + } + } finally { + release(); + } + }, + + release(weight?: number): void { + sync.release(weight); + }, + + cancel(): void { + return sync.cancel(); + }, + + waitForUnlock: ( + weightOrPriority?: number, + priority?: number + ): Promise<void> => { + let weight: number | undefined; + if (isSemaphore(sync)) { + weight = weightOrPriority; + } else { + weight = undefined; + priority = weightOrPriority; + } + if (weight !== undefined && weight <= 0) { + throw new Error(`invalid weight ${weight}: must be positive`); + } + + return new Promise((resolve, reject) => { + const handle = setTimeout(() => reject(timeoutError), timeout); + (isSemaphore(sync) + ? sync.waitForUnlock(weight, priority) + : sync.waitForUnlock(priority) + ).then(() => { + clearTimeout(handle); + resolve(); + }); + }); + }, + + isLocked: (): boolean => sync.isLocked(), + + getValue: (): number => (sync as SemaphoreInterface).getValue(), + + setValue: (value: number) => (sync as SemaphoreInterface).setValue(value), + }; +} + +function isSemaphore( + sync: SemaphoreInterface | MutexInterface +): sync is SemaphoreInterface { + return (sync as SemaphoreInterface).getValue !== undefined; +} diff --git a/src/lib/vendor/lock.ts b/src/lib/vendor/lock.ts new file mode 100644 index 0000000..43249cd --- /dev/null +++ b/src/lib/vendor/lock.ts @@ -0,0 +1,252 @@ +import { Mutex, type MutexInterface } from './async-mutex'; + +export interface RWLockInterface { + readonly readerCount: number; + readonly writerCount: number; + acquireRead(priority?: number): Promise<() => void>; + acquireWrite(priority?: number): Promise<() => void>; + withRead<T>(f: () => Promise<T>): Promise<T>; + withWrite<T>(f: () => Promise<T>): Promise<T>; + isLocked(): boolean; + waitForUnlock(priority?: number): Promise<void>; +} + +/** RWLock Helper Methods + Base Interface */ +export abstract class RWLockAbstract implements RWLockInterface { + protected abstract _readerCount: number; + protected abstract _writerCount: number; + public get readerCount(): number { + return this._readerCount; + } + public get writerCount(): number { + return this._writerCount; + } + public abstract acquireRead(priority?: number): Promise<() => void>; + public abstract acquireWrite(priority?: number): Promise<() => void>; + public async withRead<T>(f: () => Promise<T>): Promise<T> { + const release = await this.acquireRead(); + try { + return await f(); + } finally { + release(); + } + } + public async withWrite<T>(f: () => Promise<T>): Promise<T> { + const release = await this.acquireWrite(); + try { + return await f(); + } finally { + release(); + } + } + public abstract isLocked(): boolean; + public abstract waitForUnlock(priority?: number): Promise<void>; +} + +/** + * Single threaded read write lock + * + * Based on https://gist.github.com/CMCDragonkai/4de5c1526fc58dac259e321db8cf5331 + */ +export class RWLockImpl extends RWLockAbstract { + protected _readerCount: number = 0; + protected _writerCount: number = 0; + protected lock: Mutex = new Mutex(); + protected release?: MutexInterface.Releaser; + + public async acquireRead(priority?: number): Promise<() => void> { + const readerCount = ++this._readerCount; + // The first reader locks + if (readerCount === 1) { + this.release = await this.lock.acquire(priority); + } + return () => { + const readerCount = --this._readerCount; + // The last reader unlocks + if (readerCount === 0) { + if (!this.release) + throw new ReferenceError('this.release is undefined'); + this.release(); + } + }; + } + + public async acquireWrite(priority?: number): Promise<() => void> { + ++this._writerCount; + this.release = await this.lock.acquire(priority); + return () => { + --this._writerCount; + if (!this.release) throw new ReferenceError('this.release is undefined'); + this.release(); + }; + } + + public isLocked(): boolean { + return this.lock.isLocked(); + } + + public async waitForUnlock(priority?: number): Promise<void> { + return this.lock.waitForUnlock(priority); + } +} +export type RWLock = RWLockAbstract; +export const RWLock = RWLockImpl; +/** + * Joined RWLock - Joins multiple {@link RWLock}s, allowing them to be used as one; read locking will read lock all children, write locking will write lock all children. + * Note: Locking increments readerCount and writerCount by however many locks are in the join + */ +export class JoinedRWLock extends RWLockAbstract { + public constructor(...childLocks: RWLockAbstract[]) { + super(); + this._children = childLocks; + } + protected _children: RWLockAbstract[]; + protected get _readerCount() { + return this._children + .map((v) => v['_readerCount']) + .reduce((pv, cv) => pv + cv, 0); + } + protected get _writerCount() { + return this._children + .map((v) => v['_writerCount']) + .reduce((pv, cv) => pv + cv, 0); + } + /** + * Helper for acquireRead and acquireWrite + * @param priority Priority passed to acquire + * @param greedy If we should attempt to get the underlying locks whilst waiting on others. This gives us priority, but may make other operations take forever. `-1`=Never be greedy, `0`=Immediately be greedy, `>0`=Wait `greedyAfter` seconds before being greedy (leaks a Promise if timeout reached) + */ + private async acquireGeneric( + acquire: (lock: RWLock, priority?: number) => Promise<() => void>, + priority?: number, + greedyAfter = -1 + ) { + if (greedyAfter === -1) await this.waitForUnlock(priority); + else if (greedyAfter !== 0) + await Promise.race([ + this.waitForUnlock(priority), + new Promise((rs) => setTimeout(rs, greedyAfter * 1000)), + ]); + else { + // being greedy immediately, just aquire locks now + } + const _locks = await Promise.allSettled( + this._children.map((v) => acquire(v)) + ); + const errs = [] as unknown[]; + const locks = _locks.map((v) => { + if (v.status === 'rejected') { + errs.push(v.reason); + return null as never; + } else { + return v.value; + } + }); + if (errs.length) { + let unlockErrs: unknown[] = []; + _locks.forEach((v) => { + try { + if (v.status === 'fulfilled') v.value(); + } catch (error) { + unlockErrs.push(error); + } + }); + if (unlockErrs.length) + errs.push( + new AggregateError(unlockErrs, 'Errors encountered during re-unlock') + ); + throw errs.length === 1 + ? errs[0] + : new AggregateError( + errs, + `Encountered ${errs.length} errors whilst locking` + ); + } + return () => { + const errs = [] as unknown[]; + locks.forEach((v) => { + try { + v(); + } catch (error) { + errs.push(v); + } + }); + if (errs) + throw errs.length === 1 + ? errs[0] + : new AggregateError( + errs, + `Encountered ${errs.length} errors whilst unlocking` + ); + }; + } + /** + * @param priority Priority passed to {@link RWLockImpl.acquireRead RWLock.acquireRead} + * @param greedy If we should attempt to get the underlying locks whilst waiting on others. This gives us priority, but may make other operations take forever. `-1`=Never be greedy, `0`=Immediately be greedy, `>0`=Wait `greedyAfter` seconds before being greedy (leaks a Promise if timeout reached) + */ + public acquireRead(priority?: number): Promise<() => void> { + return this.acquireGeneric( + (lock, priority) => lock.acquireRead(priority), + priority, + greedyAfter + ); + } + /** + * @param priority Priority passed to {@link RWLockImpl.acquireWrite RWLock.acquireWrite} + * @param greedy If we should attempt to get the underlying locks whilst waiting on others. This gives us priority, but may make other operations take forever. `-1`=Never be greedy, `0`=Immediately be greedy, `>0`=Wait `greedyAfter` seconds before being greedy (leaks a Promise if timeout reached) + */ + public acquireWrite( + priority?: number, + greedyAfter = -1 + ): Promise<() => void> { + return this.acquireGeneric( + (lock, priority) => lock.acquireWrite(priority), + priority, + greedyAfter + ); + } + /** If any underlying lock is locked */ + public isLocked() { + if (this._children.find((v) => v.isLocked())) return true; + else return false; + } + /** + * @param priority The priority to pass to the given {@link RWLocks} + */ + public async waitForUnlock(priority?: number): Promise<void> { + while (this.isLocked()) + await this._children.find((v) => v.isLocked())?.waitForUnlock(priority); + } +} +export class LockSet extends Map<string, RWLockImpl> { + /** + * Returns a lock based on the specified name, creating it if it doesn't exist + * @returns {RWLockImpl} The created/found lock. + */ + public get(key: string): RWLockImpl { + const lock = super.get(key); + if (!lock) { + const lock = new RWLockImpl(); + super.set(key, lock); + return lock; + } else return lock; + } + public getJoined(...keys: string[]): JoinedRWLock { + return new JoinedRWLock(...keys.map((key) => this.get(key))); + } + public set(): never { + throw new Error('LockSets should never need .set()'); + } + public delete(): never { + throw new Error( + "LockSets should never need .delete() - if you're sure, use ._delete()" + ); + } + public _delete(key: string) { + return super.delete(key); + } + /** Removes all unlocked locks; a garbage collector essentially */ + public cleanEmpty() { + for (const [k, v] of this.entries()) if (!v.isLocked()) super.delete(k); + } +} |