aboutsummaryrefslogtreecommitdiffstats
path: root/src/lib/vendor/lock.ts
diff options
context:
space:
mode:
authorLibravatarLarge Libravatar memdmp <memdmpestrogenzone>2025-08-20 12:46:01 +0000
committerLibravatarLarge Libravatar memdmp <memdmpestrogenzone>2025-08-20 12:46:01 +0000
commit80d0342636056bc839517b64fd11708abea70237 (patch)
tree99c2505b924ab47bc21b215a43cdac2c37c9facf /src/lib/vendor/lock.ts
parentdddef149aea597a145e3717b2c461b251e0f6a8d (diff)
downloadcrunched-80d0342636056bc839517b64fd11708abea70237.tar.gz
crunched-80d0342636056bc839517b64fd11708abea70237.tar.bz2
crunched-80d0342636056bc839517b64fd11708abea70237.tar.lz
crunched-80d0342636056bc839517b64fd11708abea70237.zip

feat: locks :3

Diffstat (limited to 'src/lib/vendor/lock.ts')
-rw-r--r--src/lib/vendor/lock.ts252
1 files changed, 252 insertions, 0 deletions
diff --git a/src/lib/vendor/lock.ts b/src/lib/vendor/lock.ts
new file mode 100644
index 0000000..43249cd
--- /dev/null
+++ b/src/lib/vendor/lock.ts
@@ -0,0 +1,252 @@
+import { Mutex, type MutexInterface } from './async-mutex';
+
+export interface RWLockInterface {
+ readonly readerCount: number;
+ readonly writerCount: number;
+ acquireRead(priority?: number): Promise<() => void>;
+ acquireWrite(priority?: number): Promise<() => void>;
+ withRead<T>(f: () => Promise<T>): Promise<T>;
+ withWrite<T>(f: () => Promise<T>): Promise<T>;
+ isLocked(): boolean;
+ waitForUnlock(priority?: number): Promise<void>;
+}
+
+/** RWLock Helper Methods + Base Interface */
+export abstract class RWLockAbstract implements RWLockInterface {
+ protected abstract _readerCount: number;
+ protected abstract _writerCount: number;
+ public get readerCount(): number {
+ return this._readerCount;
+ }
+ public get writerCount(): number {
+ return this._writerCount;
+ }
+ public abstract acquireRead(priority?: number): Promise<() => void>;
+ public abstract acquireWrite(priority?: number): Promise<() => void>;
+ public async withRead<T>(f: () => Promise<T>): Promise<T> {
+ const release = await this.acquireRead();
+ try {
+ return await f();
+ } finally {
+ release();
+ }
+ }
+ public async withWrite<T>(f: () => Promise<T>): Promise<T> {
+ const release = await this.acquireWrite();
+ try {
+ return await f();
+ } finally {
+ release();
+ }
+ }
+ public abstract isLocked(): boolean;
+ public abstract waitForUnlock(priority?: number): Promise<void>;
+}
+
+/**
+ * Single threaded read write lock
+ *
+ * Based on https://gist.github.com/CMCDragonkai/4de5c1526fc58dac259e321db8cf5331
+ */
+export class RWLockImpl extends RWLockAbstract {
+ protected _readerCount: number = 0;
+ protected _writerCount: number = 0;
+ protected lock: Mutex = new Mutex();
+ protected release?: MutexInterface.Releaser;
+
+ public async acquireRead(priority?: number): Promise<() => void> {
+ const readerCount = ++this._readerCount;
+ // The first reader locks
+ if (readerCount === 1) {
+ this.release = await this.lock.acquire(priority);
+ }
+ return () => {
+ const readerCount = --this._readerCount;
+ // The last reader unlocks
+ if (readerCount === 0) {
+ if (!this.release)
+ throw new ReferenceError('this.release is undefined');
+ this.release();
+ }
+ };
+ }
+
+ public async acquireWrite(priority?: number): Promise<() => void> {
+ ++this._writerCount;
+ this.release = await this.lock.acquire(priority);
+ return () => {
+ --this._writerCount;
+ if (!this.release) throw new ReferenceError('this.release is undefined');
+ this.release();
+ };
+ }
+
+ public isLocked(): boolean {
+ return this.lock.isLocked();
+ }
+
+ public async waitForUnlock(priority?: number): Promise<void> {
+ return this.lock.waitForUnlock(priority);
+ }
+}
+export type RWLock = RWLockAbstract;
+export const RWLock = RWLockImpl;
+/**
+ * Joined RWLock - Joins multiple {@link RWLock}s, allowing them to be used as one; read locking will read lock all children, write locking will write lock all children.
+ * Note: Locking increments readerCount and writerCount by however many locks are in the join
+ */
+export class JoinedRWLock extends RWLockAbstract {
+ public constructor(...childLocks: RWLockAbstract[]) {
+ super();
+ this._children = childLocks;
+ }
+ protected _children: RWLockAbstract[];
+ protected get _readerCount() {
+ return this._children
+ .map((v) => v['_readerCount'])
+ .reduce((pv, cv) => pv + cv, 0);
+ }
+ protected get _writerCount() {
+ return this._children
+ .map((v) => v['_writerCount'])
+ .reduce((pv, cv) => pv + cv, 0);
+ }
+ /**
+ * Helper for acquireRead and acquireWrite
+ * @param priority Priority passed to acquire
+ * @param greedy If we should attempt to get the underlying locks whilst waiting on others. This gives us priority, but may make other operations take forever. `-1`=Never be greedy, `0`=Immediately be greedy, `>0`=Wait `greedyAfter` seconds before being greedy (leaks a Promise if timeout reached)
+ */
+ private async acquireGeneric(
+ acquire: (lock: RWLock, priority?: number) => Promise<() => void>,
+ priority?: number,
+ greedyAfter = -1
+ ) {
+ if (greedyAfter === -1) await this.waitForUnlock(priority);
+ else if (greedyAfter !== 0)
+ await Promise.race([
+ this.waitForUnlock(priority),
+ new Promise((rs) => setTimeout(rs, greedyAfter * 1000)),
+ ]);
+ else {
+ // being greedy immediately, just aquire locks now
+ }
+ const _locks = await Promise.allSettled(
+ this._children.map((v) => acquire(v))
+ );
+ const errs = [] as unknown[];
+ const locks = _locks.map((v) => {
+ if (v.status === 'rejected') {
+ errs.push(v.reason);
+ return null as never;
+ } else {
+ return v.value;
+ }
+ });
+ if (errs.length) {
+ let unlockErrs: unknown[] = [];
+ _locks.forEach((v) => {
+ try {
+ if (v.status === 'fulfilled') v.value();
+ } catch (error) {
+ unlockErrs.push(error);
+ }
+ });
+ if (unlockErrs.length)
+ errs.push(
+ new AggregateError(unlockErrs, 'Errors encountered during re-unlock')
+ );
+ throw errs.length === 1
+ ? errs[0]
+ : new AggregateError(
+ errs,
+ `Encountered ${errs.length} errors whilst locking`
+ );
+ }
+ return () => {
+ const errs = [] as unknown[];
+ locks.forEach((v) => {
+ try {
+ v();
+ } catch (error) {
+ errs.push(v);
+ }
+ });
+ if (errs)
+ throw errs.length === 1
+ ? errs[0]
+ : new AggregateError(
+ errs,
+ `Encountered ${errs.length} errors whilst unlocking`
+ );
+ };
+ }
+ /**
+ * @param priority Priority passed to {@link RWLockImpl.acquireRead RWLock.acquireRead}
+ * @param greedy If we should attempt to get the underlying locks whilst waiting on others. This gives us priority, but may make other operations take forever. `-1`=Never be greedy, `0`=Immediately be greedy, `>0`=Wait `greedyAfter` seconds before being greedy (leaks a Promise if timeout reached)
+ */
+ public acquireRead(priority?: number): Promise<() => void> {
+ return this.acquireGeneric(
+ (lock, priority) => lock.acquireRead(priority),
+ priority,
+ greedyAfter
+ );
+ }
+ /**
+ * @param priority Priority passed to {@link RWLockImpl.acquireWrite RWLock.acquireWrite}
+ * @param greedy If we should attempt to get the underlying locks whilst waiting on others. This gives us priority, but may make other operations take forever. `-1`=Never be greedy, `0`=Immediately be greedy, `>0`=Wait `greedyAfter` seconds before being greedy (leaks a Promise if timeout reached)
+ */
+ public acquireWrite(
+ priority?: number,
+ greedyAfter = -1
+ ): Promise<() => void> {
+ return this.acquireGeneric(
+ (lock, priority) => lock.acquireWrite(priority),
+ priority,
+ greedyAfter
+ );
+ }
+ /** If any underlying lock is locked */
+ public isLocked() {
+ if (this._children.find((v) => v.isLocked())) return true;
+ else return false;
+ }
+ /**
+ * @param priority The priority to pass to the given {@link RWLocks}
+ */
+ public async waitForUnlock(priority?: number): Promise<void> {
+ while (this.isLocked())
+ await this._children.find((v) => v.isLocked())?.waitForUnlock(priority);
+ }
+}
+export class LockSet extends Map<string, RWLockImpl> {
+ /**
+ * Returns a lock based on the specified name, creating it if it doesn't exist
+ * @returns {RWLockImpl} The created/found lock.
+ */
+ public get(key: string): RWLockImpl {
+ const lock = super.get(key);
+ if (!lock) {
+ const lock = new RWLockImpl();
+ super.set(key, lock);
+ return lock;
+ } else return lock;
+ }
+ public getJoined(...keys: string[]): JoinedRWLock {
+ return new JoinedRWLock(...keys.map((key) => this.get(key)));
+ }
+ public set(): never {
+ throw new Error('LockSets should never need .set()');
+ }
+ public delete(): never {
+ throw new Error(
+ "LockSets should never need .delete() - if you're sure, use ._delete()"
+ );
+ }
+ public _delete(key: string) {
+ return super.delete(key);
+ }
+ /** Removes all unlocked locks; a garbage collector essentially */
+ public cleanEmpty() {
+ for (const [k, v] of this.entries()) if (!v.isLocked()) super.delete(k);
+ }
+}