diff options
Diffstat (limited to 'src/lib/vendor/lock.ts')
| -rw-r--r-- | src/lib/vendor/lock.ts | 252 |
1 files changed, 252 insertions, 0 deletions
diff --git a/src/lib/vendor/lock.ts b/src/lib/vendor/lock.ts new file mode 100644 index 0000000..43249cd --- /dev/null +++ b/src/lib/vendor/lock.ts @@ -0,0 +1,252 @@ +import { Mutex, type MutexInterface } from './async-mutex'; + +export interface RWLockInterface { + readonly readerCount: number; + readonly writerCount: number; + acquireRead(priority?: number): Promise<() => void>; + acquireWrite(priority?: number): Promise<() => void>; + withRead<T>(f: () => Promise<T>): Promise<T>; + withWrite<T>(f: () => Promise<T>): Promise<T>; + isLocked(): boolean; + waitForUnlock(priority?: number): Promise<void>; +} + +/** RWLock Helper Methods + Base Interface */ +export abstract class RWLockAbstract implements RWLockInterface { + protected abstract _readerCount: number; + protected abstract _writerCount: number; + public get readerCount(): number { + return this._readerCount; + } + public get writerCount(): number { + return this._writerCount; + } + public abstract acquireRead(priority?: number): Promise<() => void>; + public abstract acquireWrite(priority?: number): Promise<() => void>; + public async withRead<T>(f: () => Promise<T>): Promise<T> { + const release = await this.acquireRead(); + try { + return await f(); + } finally { + release(); + } + } + public async withWrite<T>(f: () => Promise<T>): Promise<T> { + const release = await this.acquireWrite(); + try { + return await f(); + } finally { + release(); + } + } + public abstract isLocked(): boolean; + public abstract waitForUnlock(priority?: number): Promise<void>; +} + +/** + * Single threaded read write lock + * + * Based on https://gist.github.com/CMCDragonkai/4de5c1526fc58dac259e321db8cf5331 + */ +export class RWLockImpl extends RWLockAbstract { + protected _readerCount: number = 0; + protected _writerCount: number = 0; + protected lock: Mutex = new Mutex(); + protected release?: MutexInterface.Releaser; + + public async acquireRead(priority?: number): Promise<() => void> { + const readerCount = ++this._readerCount; + // The first reader locks + if (readerCount === 1) { + this.release = await this.lock.acquire(priority); + } + return () => { + const readerCount = --this._readerCount; + // The last reader unlocks + if (readerCount === 0) { + if (!this.release) + throw new ReferenceError('this.release is undefined'); + this.release(); + } + }; + } + + public async acquireWrite(priority?: number): Promise<() => void> { + ++this._writerCount; + this.release = await this.lock.acquire(priority); + return () => { + --this._writerCount; + if (!this.release) throw new ReferenceError('this.release is undefined'); + this.release(); + }; + } + + public isLocked(): boolean { + return this.lock.isLocked(); + } + + public async waitForUnlock(priority?: number): Promise<void> { + return this.lock.waitForUnlock(priority); + } +} +export type RWLock = RWLockAbstract; +export const RWLock = RWLockImpl; +/** + * Joined RWLock - Joins multiple {@link RWLock}s, allowing them to be used as one; read locking will read lock all children, write locking will write lock all children. + * Note: Locking increments readerCount and writerCount by however many locks are in the join + */ +export class JoinedRWLock extends RWLockAbstract { + public constructor(...childLocks: RWLockAbstract[]) { + super(); + this._children = childLocks; + } + protected _children: RWLockAbstract[]; + protected get _readerCount() { + return this._children + .map((v) => v['_readerCount']) + .reduce((pv, cv) => pv + cv, 0); + } + protected get _writerCount() { + return this._children + .map((v) => v['_writerCount']) + .reduce((pv, cv) => pv + cv, 0); + } + /** + * Helper for acquireRead and acquireWrite + * @param priority Priority passed to acquire + * @param greedy If we should attempt to get the underlying locks whilst waiting on others. This gives us priority, but may make other operations take forever. `-1`=Never be greedy, `0`=Immediately be greedy, `>0`=Wait `greedyAfter` seconds before being greedy (leaks a Promise if timeout reached) + */ + private async acquireGeneric( + acquire: (lock: RWLock, priority?: number) => Promise<() => void>, + priority?: number, + greedyAfter = -1 + ) { + if (greedyAfter === -1) await this.waitForUnlock(priority); + else if (greedyAfter !== 0) + await Promise.race([ + this.waitForUnlock(priority), + new Promise((rs) => setTimeout(rs, greedyAfter * 1000)), + ]); + else { + // being greedy immediately, just aquire locks now + } + const _locks = await Promise.allSettled( + this._children.map((v) => acquire(v)) + ); + const errs = [] as unknown[]; + const locks = _locks.map((v) => { + if (v.status === 'rejected') { + errs.push(v.reason); + return null as never; + } else { + return v.value; + } + }); + if (errs.length) { + let unlockErrs: unknown[] = []; + _locks.forEach((v) => { + try { + if (v.status === 'fulfilled') v.value(); + } catch (error) { + unlockErrs.push(error); + } + }); + if (unlockErrs.length) + errs.push( + new AggregateError(unlockErrs, 'Errors encountered during re-unlock') + ); + throw errs.length === 1 + ? errs[0] + : new AggregateError( + errs, + `Encountered ${errs.length} errors whilst locking` + ); + } + return () => { + const errs = [] as unknown[]; + locks.forEach((v) => { + try { + v(); + } catch (error) { + errs.push(v); + } + }); + if (errs) + throw errs.length === 1 + ? errs[0] + : new AggregateError( + errs, + `Encountered ${errs.length} errors whilst unlocking` + ); + }; + } + /** + * @param priority Priority passed to {@link RWLockImpl.acquireRead RWLock.acquireRead} + * @param greedy If we should attempt to get the underlying locks whilst waiting on others. This gives us priority, but may make other operations take forever. `-1`=Never be greedy, `0`=Immediately be greedy, `>0`=Wait `greedyAfter` seconds before being greedy (leaks a Promise if timeout reached) + */ + public acquireRead(priority?: number): Promise<() => void> { + return this.acquireGeneric( + (lock, priority) => lock.acquireRead(priority), + priority, + greedyAfter + ); + } + /** + * @param priority Priority passed to {@link RWLockImpl.acquireWrite RWLock.acquireWrite} + * @param greedy If we should attempt to get the underlying locks whilst waiting on others. This gives us priority, but may make other operations take forever. `-1`=Never be greedy, `0`=Immediately be greedy, `>0`=Wait `greedyAfter` seconds before being greedy (leaks a Promise if timeout reached) + */ + public acquireWrite( + priority?: number, + greedyAfter = -1 + ): Promise<() => void> { + return this.acquireGeneric( + (lock, priority) => lock.acquireWrite(priority), + priority, + greedyAfter + ); + } + /** If any underlying lock is locked */ + public isLocked() { + if (this._children.find((v) => v.isLocked())) return true; + else return false; + } + /** + * @param priority The priority to pass to the given {@link RWLocks} + */ + public async waitForUnlock(priority?: number): Promise<void> { + while (this.isLocked()) + await this._children.find((v) => v.isLocked())?.waitForUnlock(priority); + } +} +export class LockSet extends Map<string, RWLockImpl> { + /** + * Returns a lock based on the specified name, creating it if it doesn't exist + * @returns {RWLockImpl} The created/found lock. + */ + public get(key: string): RWLockImpl { + const lock = super.get(key); + if (!lock) { + const lock = new RWLockImpl(); + super.set(key, lock); + return lock; + } else return lock; + } + public getJoined(...keys: string[]): JoinedRWLock { + return new JoinedRWLock(...keys.map((key) => this.get(key))); + } + public set(): never { + throw new Error('LockSets should never need .set()'); + } + public delete(): never { + throw new Error( + "LockSets should never need .delete() - if you're sure, use ._delete()" + ); + } + public _delete(key: string) { + return super.delete(key); + } + /** Removes all unlocked locks; a garbage collector essentially */ + public cleanEmpty() { + for (const [k, v] of this.entries()) if (!v.isLocked()) super.delete(k); + } +} |