import { Mutex, type MutexInterface } from './async-mutex'; export interface RWLockInterface { readonly readerCount: number; readonly writerCount: number; acquireRead(priority?: number): Promise<() => void>; acquireWrite(priority?: number): Promise<() => void>; withRead(f: () => Promise): Promise; withWrite(f: () => Promise): Promise; isLocked(): boolean; waitForUnlock(priority?: number): Promise; } /** RWLock Helper Methods + Base Interface */ export abstract class RWLockAbstract implements RWLockInterface { protected abstract _readerCount: number; protected abstract _writerCount: number; public get readerCount(): number { return this._readerCount; } public get writerCount(): number { return this._writerCount; } public abstract acquireRead(priority?: number): Promise<() => void>; public abstract acquireWrite(priority?: number): Promise<() => void>; public async withRead(f: () => Promise): Promise { const release = await this.acquireRead(); try { return await f(); } finally { release(); } } public async withWrite(f: () => Promise): Promise { const release = await this.acquireWrite(); try { return await f(); } finally { release(); } } public abstract isLocked(): boolean; public abstract waitForUnlock(priority?: number): Promise; } /** * Single threaded read write lock * * Based on https://gist.github.com/CMCDragonkai/4de5c1526fc58dac259e321db8cf5331 */ export class RWLockImpl extends RWLockAbstract { protected _readerCount: number = 0; protected _writerCount: number = 0; protected lock: Mutex = new Mutex(); protected release?: MutexInterface.Releaser; public async acquireRead(priority?: number): Promise<() => void> { const readerCount = ++this._readerCount; // The first reader locks if (readerCount === 1) { this.release = await this.lock.acquire(priority); } return () => { const readerCount = --this._readerCount; // The last reader unlocks if (readerCount === 0) { if (!this.release) throw new ReferenceError('this.release is undefined'); this.release(); } }; } public async acquireWrite(priority?: number): Promise<() => void> { ++this._writerCount; this.release = await this.lock.acquire(priority); return () => { --this._writerCount; if (!this.release) throw new ReferenceError('this.release is undefined'); this.release(); }; } public isLocked(): boolean { return this.lock.isLocked(); } public async waitForUnlock(priority?: number): Promise { return this.lock.waitForUnlock(priority); } } export type RWLock = RWLockAbstract; export const RWLock = RWLockImpl; /** * Joined RWLock - Joins multiple {@link RWLock}s, allowing them to be used as one; read locking will read lock all children, write locking will write lock all children. * Note: Locking increments readerCount and writerCount by however many locks are in the join */ export class JoinedRWLock extends RWLockAbstract { public constructor(...childLocks: RWLockAbstract[]) { super(); this._children = childLocks; } protected _children: RWLockAbstract[]; protected get _readerCount() { return this._children .map((v) => v['_readerCount']) .reduce((pv, cv) => pv + cv, 0); } protected get _writerCount() { return this._children .map((v) => v['_writerCount']) .reduce((pv, cv) => pv + cv, 0); } /** * Helper for acquireRead and acquireWrite * @param priority Priority passed to acquire * @param greedy If we should attempt to get the underlying locks whilst waiting on others. This gives us priority, but may make other operations take forever. `-1`=Never be greedy, `0`=Immediately be greedy, `>0`=Wait `greedyAfter` seconds before being greedy (leaks a Promise if timeout reached) */ private async acquireGeneric( acquire: (lock: RWLock, priority?: number) => Promise<() => void>, priority?: number, greedyAfter = -1 ) { if (greedyAfter === -1) await this.waitForUnlock(priority); else if (greedyAfter !== 0) await Promise.race([ this.waitForUnlock(priority), new Promise((rs) => setTimeout(rs, greedyAfter * 1000)), ]); else { // being greedy immediately, just aquire locks now } const _locks = await Promise.allSettled( this._children.map((v) => acquire(v)) ); const errs = [] as unknown[]; const locks = _locks.map((v) => { if (v.status === 'rejected') { errs.push(v.reason); return null as never; } else { return v.value; } }); if (errs.length) { let unlockErrs: unknown[] = []; _locks.forEach((v) => { try { if (v.status === 'fulfilled') v.value(); } catch (error) { unlockErrs.push(error); } }); if (unlockErrs.length) errs.push( new AggregateError(unlockErrs, 'Errors encountered during re-unlock') ); throw errs.length === 1 ? errs[0] : new AggregateError( errs, `Encountered ${errs.length} errors whilst locking` ); } return () => { const errs = [] as unknown[]; locks.forEach((v) => { try { v(); } catch (error) { errs.push(v); } }); if (errs) throw errs.length === 1 ? errs[0] : new AggregateError( errs, `Encountered ${errs.length} errors whilst unlocking` ); }; } /** * @param priority Priority passed to {@link RWLockImpl.acquireRead RWLock.acquireRead} * @param greedy If we should attempt to get the underlying locks whilst waiting on others. This gives us priority, but may make other operations take forever. `-1`=Never be greedy, `0`=Immediately be greedy, `>0`=Wait `greedyAfter` seconds before being greedy (leaks a Promise if timeout reached) */ public acquireRead(priority?: number, greedyAfter = -1): Promise<() => void> { return this.acquireGeneric( (lock, priority) => lock.acquireRead(priority), priority, greedyAfter ); } /** * @param priority Priority passed to {@link RWLockImpl.acquireWrite RWLock.acquireWrite} * @param greedy If we should attempt to get the underlying locks whilst waiting on others. This gives us priority, but may make other operations take forever. `-1`=Never be greedy, `0`=Immediately be greedy, `>0`=Wait `greedyAfter` seconds before being greedy (leaks a Promise if timeout reached) */ public acquireWrite( priority?: number, greedyAfter = -1 ): Promise<() => void> { return this.acquireGeneric( (lock, priority) => lock.acquireWrite(priority), priority, greedyAfter ); } /** If any underlying lock is locked */ public isLocked() { if (this._children.find((v) => v.isLocked())) return true; else return false; } /** * @param priority The priority to pass to the given {@link RWLocks} */ public async waitForUnlock(priority?: number): Promise { while (this.isLocked()) await this._children.find((v) => v.isLocked())?.waitForUnlock(priority); } } export class LockSet extends Map { /** * Returns a lock based on the specified name, creating it if it doesn't exist * @returns {RWLockImpl} The created/found lock. */ public get(key: string): RWLockImpl { const lock = super.get(key); if (!lock) { const lock = new RWLockImpl(); super.set(key, lock); return lock; } else return lock; } public getJoined(...keys: string[]): JoinedRWLock { return new JoinedRWLock(...keys.map((key) => this.get(key))); } public set(): never { throw new Error('LockSets should never need .set()'); } public delete(): never { throw new Error( "LockSets should never need .delete() - if you're sure, use ._delete()" ); } public _delete(key: string) { return super.delete(key); } /** Removes all unlocked locks; a garbage collector essentially */ public cleanEmpty() { for (const [k, v] of this.entries()) if (!v.isLocked()) super.delete(k); } }