aboutsummaryrefslogtreecommitdiffstats
path: root/src/lib/vendor/async-mutex
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/vendor/async-mutex')
-rw-r--r--src/lib/vendor/async-mutex/README1
-rw-r--r--src/lib/vendor/async-mutex/errors.ts5
-rw-r--r--src/lib/vendor/async-mutex/index.ts7
-rw-r--r--src/lib/vendor/async-mutex/interface.ts26
-rw-r--r--src/lib/vendor/async-mutex/mutex.ts41
-rw-r--r--src/lib/vendor/async-mutex/semaphore.ts176
-rw-r--r--src/lib/vendor/async-mutex/semaphoreinterface.ts34
-rw-r--r--src/lib/vendor/async-mutex/tryaquire.ts21
-rw-r--r--src/lib/vendor/async-mutex/withtimeout.ts140
9 files changed, 451 insertions, 0 deletions
diff --git a/src/lib/vendor/async-mutex/README b/src/lib/vendor/async-mutex/README
new file mode 100644
index 0000000..73714b7
--- /dev/null
+++ b/src/lib/vendor/async-mutex/README
@@ -0,0 +1 @@
+https://github.com/DirtyHairy/async-mutex \ No newline at end of file
diff --git a/src/lib/vendor/async-mutex/errors.ts b/src/lib/vendor/async-mutex/errors.ts
new file mode 100644
index 0000000..02e3eb0
--- /dev/null
+++ b/src/lib/vendor/async-mutex/errors.ts
@@ -0,0 +1,5 @@
+export const E_TIMEOUT = new Error(
+ 'timeout while waiting for mutex to become available'
+);
+export const E_ALREADY_LOCKED = new Error('mutex already locked');
+export const E_CANCELED = new Error('request for lock canceled');
diff --git a/src/lib/vendor/async-mutex/index.ts b/src/lib/vendor/async-mutex/index.ts
new file mode 100644
index 0000000..a309591
--- /dev/null
+++ b/src/lib/vendor/async-mutex/index.ts
@@ -0,0 +1,7 @@
+export { default as Mutex } from './mutex';
+export type { MutexInterface } from './interface';
+export { default as Semaphore } from './semaphore';
+export type { SemaphoreInterface } from './semaphoreinterface';
+export { withTimeout } from './withtimeout';
+export { tryAcquire } from './tryaquire';
+export * from './errors';
diff --git a/src/lib/vendor/async-mutex/interface.ts b/src/lib/vendor/async-mutex/interface.ts
new file mode 100644
index 0000000..736b338
--- /dev/null
+++ b/src/lib/vendor/async-mutex/interface.ts
@@ -0,0 +1,26 @@
+export interface MutexInterface {
+ acquire(priority?: number): Promise<MutexInterface.Releaser>;
+
+ runExclusive<T>(
+ callback: MutexInterface.Worker<T>,
+ priority?: number
+ ): Promise<T>;
+
+ waitForUnlock(priority?: number): Promise<void>;
+
+ isLocked(): boolean;
+
+ release(): void;
+
+ cancel(): void;
+}
+
+export namespace MutexInterface {
+ export interface Releaser {
+ (): void;
+ }
+
+ export interface Worker<T> {
+ (): Promise<T> | T;
+ }
+}
diff --git a/src/lib/vendor/async-mutex/mutex.ts b/src/lib/vendor/async-mutex/mutex.ts
new file mode 100644
index 0000000..9fe6e7a
--- /dev/null
+++ b/src/lib/vendor/async-mutex/mutex.ts
@@ -0,0 +1,41 @@
+import type { MutexInterface } from './interface';
+import Semaphore from './semaphore';
+
+class Mutex implements MutexInterface {
+ constructor(cancelError?: Error) {
+ this._semaphore = new Semaphore(1, cancelError);
+ }
+
+ async acquire(priority = 0): Promise<MutexInterface.Releaser> {
+ const [, releaser] = await this._semaphore.acquire(1, priority);
+
+ return releaser;
+ }
+
+ runExclusive<T>(
+ callback: MutexInterface.Worker<T>,
+ priority = 0
+ ): Promise<T> {
+ return this._semaphore.runExclusive(() => callback(), 1, priority);
+ }
+
+ isLocked(): boolean {
+ return this._semaphore.isLocked();
+ }
+
+ waitForUnlock(priority = 0): Promise<void> {
+ return this._semaphore.waitForUnlock(1, priority);
+ }
+
+ release(): void {
+ if (this._semaphore.isLocked()) this._semaphore.release();
+ }
+
+ cancel(): void {
+ return this._semaphore.cancel();
+ }
+
+ private _semaphore: Semaphore;
+}
+
+export default Mutex;
diff --git a/src/lib/vendor/async-mutex/semaphore.ts b/src/lib/vendor/async-mutex/semaphore.ts
new file mode 100644
index 0000000..385e814
--- /dev/null
+++ b/src/lib/vendor/async-mutex/semaphore.ts
@@ -0,0 +1,176 @@
+import { E_CANCELED } from './errors';
+import type { SemaphoreInterface } from './semaphoreinterface';
+
+interface Priority {
+ priority: number;
+}
+
+interface QueueEntry {
+ resolve(result: [number, SemaphoreInterface.Releaser]): void;
+ reject(error: unknown): void;
+ weight: number;
+ priority: number;
+}
+
+interface Waiter {
+ resolve(): void;
+ priority: number;
+}
+
+class Semaphore implements SemaphoreInterface {
+ constructor(
+ private _value: number,
+ private _cancelError: Error = E_CANCELED
+ ) {}
+
+ acquire(
+ weight = 1,
+ priority = 0
+ ): Promise<[number, SemaphoreInterface.Releaser]> {
+ if (weight <= 0)
+ throw new Error(`invalid weight ${weight}: must be positive`);
+
+ return new Promise((resolve, reject) => {
+ const task: QueueEntry = { resolve, reject, weight, priority };
+ const i = findIndexFromEnd(
+ this._queue,
+ (other) => priority <= other.priority
+ );
+ if (i === -1 && weight <= this._value) {
+ // Needs immediate dispatch, skip the queue
+ this._dispatchItem(task);
+ } else {
+ this._queue.splice(i + 1, 0, task);
+ }
+ });
+ }
+
+ async runExclusive<T>(
+ callback: SemaphoreInterface.Worker<T>,
+ weight = 1,
+ priority = 0
+ ): Promise<T> {
+ const [value, release] = await this.acquire(weight, priority);
+
+ try {
+ return await callback(value);
+ } finally {
+ release();
+ }
+ }
+
+ waitForUnlock(weight = 1, priority = 0): Promise<void> {
+ if (weight <= 0)
+ throw new Error(`invalid weight ${weight}: must be positive`);
+
+ if (this._couldLockImmediately(weight, priority)) {
+ return Promise.resolve();
+ } else {
+ return new Promise((resolve) => {
+ if (!this._weightedWaiters[weight - 1])
+ this._weightedWaiters[weight - 1] = [];
+ insertSorted(this._weightedWaiters[weight - 1], { resolve, priority });
+ });
+ }
+ }
+
+ isLocked(): boolean {
+ return this._value <= 0;
+ }
+
+ getValue(): number {
+ return this._value;
+ }
+
+ setValue(value: number): void {
+ this._value = value;
+ this._dispatchQueue();
+ }
+
+ release(weight = 1): void {
+ if (weight <= 0)
+ throw new Error(`invalid weight ${weight}: must be positive`);
+
+ this._value += weight;
+ this._dispatchQueue();
+ }
+
+ cancel(): void {
+ this._queue.forEach((entry) => entry.reject(this._cancelError));
+ this._queue = [];
+ }
+
+ private _dispatchQueue(): void {
+ this._drainUnlockWaiters();
+ while (this._queue.length > 0 && this._queue[0].weight <= this._value) {
+ this._dispatchItem(this._queue.shift()!);
+ this._drainUnlockWaiters();
+ }
+ }
+
+ private _dispatchItem(item: QueueEntry): void {
+ const previousValue = this._value;
+ this._value -= item.weight;
+ item.resolve([previousValue, this._newReleaser(item.weight)]);
+ }
+
+ private _newReleaser(weight: number): () => void {
+ let called = false;
+
+ return () => {
+ if (called) return;
+ called = true;
+
+ this.release(weight);
+ };
+ }
+
+ private _drainUnlockWaiters(): void {
+ if (this._queue.length === 0) {
+ for (let weight = this._value; weight > 0; weight--) {
+ const waiters = this._weightedWaiters[weight - 1];
+ if (!waiters) continue;
+ waiters.forEach((waiter) => waiter.resolve());
+ this._weightedWaiters[weight - 1] = [];
+ }
+ } else {
+ const queuedPriority = this._queue[0].priority;
+ for (let weight = this._value; weight > 0; weight--) {
+ const waiters = this._weightedWaiters[weight - 1];
+ if (!waiters) continue;
+ const i = waiters.findIndex(
+ (waiter) => waiter.priority <= queuedPriority
+ );
+ (i === -1 ? waiters : waiters.splice(0, i)).forEach((waiter) =>
+ waiter.resolve()
+ );
+ }
+ }
+ }
+
+ private _couldLockImmediately(weight: number, priority: number) {
+ return (
+ (this._queue.length === 0 || this._queue[0].priority < priority) &&
+ weight <= this._value
+ );
+ }
+
+ private _queue: Array<QueueEntry> = [];
+ private _weightedWaiters: Array<Array<Waiter>> = [];
+}
+
+function insertSorted<T extends Priority>(a: T[], v: T) {
+ const i = findIndexFromEnd(a, (other) => v.priority <= other.priority);
+ a.splice(i + 1, 0, v);
+}
+
+function findIndexFromEnd<T>(a: T[], predicate: (e: T) => boolean): number {
+ for (let i = a.length - 1; i >= 0; i--) {
+ if (predicate(a[i])) {
+ return i;
+ }
+ }
+ return -1;
+}
+
+export default Semaphore;
diff --git a/src/lib/vendor/async-mutex/semaphoreinterface.ts b/src/lib/vendor/async-mutex/semaphoreinterface.ts
new file mode 100644
index 0000000..bbbdc6b
--- /dev/null
+++ b/src/lib/vendor/async-mutex/semaphoreinterface.ts
@@ -0,0 +1,34 @@
+export interface SemaphoreInterface {
+ acquire(
+ weight?: number,
+ priority?: number
+ ): Promise<[number, SemaphoreInterface.Releaser]>;
+
+ runExclusive<T>(
+ callback: SemaphoreInterface.Worker<T>,
+ weight?: number,
+ priority?: number
+ ): Promise<T>;
+
+ waitForUnlock(weight?: number, priority?: number): Promise<void>;
+
+ isLocked(): boolean;
+
+ getValue(): number;
+
+ setValue(value: number): void;
+
+ release(weight?: number): void;
+
+ cancel(): void;
+}
+
+export namespace SemaphoreInterface {
+ export interface Releaser {
+ (): void;
+ }
+
+ export interface Worker<T> {
+ (value: number): Promise<T> | T;
+ }
+}
diff --git a/src/lib/vendor/async-mutex/tryaquire.ts b/src/lib/vendor/async-mutex/tryaquire.ts
new file mode 100644
index 0000000..359bafa
--- /dev/null
+++ b/src/lib/vendor/async-mutex/tryaquire.ts
@@ -0,0 +1,21 @@
+import { E_ALREADY_LOCKED } from './errors';
+import MutexInterface from './mutex';
+import type { SemaphoreInterface } from './semaphoreinterface';
+import { withTimeout } from './withtimeout';
+
+export function tryAcquire(
+ mutex: MutexInterface,
+ alreadyAcquiredError?: Error
+): MutexInterface;
+export function tryAcquire(
+ semaphore: SemaphoreInterface,
+ alreadyAcquiredError?: Error
+): SemaphoreInterface;
+// eslint-disable-next-lisne @typescript-eslint/explicit-module-boundary-types
+export function tryAcquire(
+ sync: MutexInterface | SemaphoreInterface,
+ alreadyAcquiredError = E_ALREADY_LOCKED
+): typeof sync {
+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
+ return withTimeout(sync as any, 0, alreadyAcquiredError);
+}
diff --git a/src/lib/vendor/async-mutex/withtimeout.ts b/src/lib/vendor/async-mutex/withtimeout.ts
new file mode 100644
index 0000000..0e115b9
--- /dev/null
+++ b/src/lib/vendor/async-mutex/withtimeout.ts
@@ -0,0 +1,140 @@
+/* eslint-disable @typescript-eslint/no-explicit-any */
+import { E_TIMEOUT } from './errors';
+import type { MutexInterface } from './interface';
+import type { SemaphoreInterface } from './semaphoreinterface';
+
+export function withTimeout(
+ mutex: MutexInterface,
+ timeout: number,
+ timeoutError?: Error
+): MutexInterface;
+export function withTimeout(
+ semaphore: SemaphoreInterface,
+ timeout: number,
+ timeoutError?: Error
+): SemaphoreInterface;
+export function withTimeout(
+ sync: MutexInterface | SemaphoreInterface,
+ timeout: number,
+ timeoutError = E_TIMEOUT
+): any {
+ return {
+ acquire: (
+ weightOrPriority?: number,
+ priority?: number
+ ): Promise<
+ MutexInterface.Releaser | [number, SemaphoreInterface.Releaser]
+ > => {
+ let weight: number | undefined;
+ if (isSemaphore(sync)) {
+ weight = weightOrPriority;
+ } else {
+ weight = undefined;
+ priority = weightOrPriority;
+ }
+ if (weight !== undefined && weight <= 0) {
+ throw new Error(`invalid weight ${weight}: must be positive`);
+ }
+
+ return new Promise(async (resolve, reject) => {
+ let isTimeout = false;
+
+ const handle = setTimeout(() => {
+ isTimeout = true;
+ reject(timeoutError);
+ }, timeout);
+
+ try {
+ const ticket = await (isSemaphore(sync)
+ ? sync.acquire(weight, priority)
+ : sync.acquire(priority));
+ if (isTimeout) {
+ const release = Array.isArray(ticket) ? ticket[1] : ticket;
+
+ release();
+ } else {
+ clearTimeout(handle);
+ resolve(ticket);
+ }
+ } catch (e) {
+ if (!isTimeout) {
+ clearTimeout(handle);
+
+ reject(e);
+ }
+ }
+ });
+ },
+
+ async runExclusive<T>(
+ callback: (value?: number) => Promise<T> | T,
+ weight?: number,
+ priority?: number
+ ): Promise<T> {
+ let release: () => void = () => undefined;
+
+ try {
+ const ticket = await this.acquire(weight, priority);
+
+ if (Array.isArray(ticket)) {
+ release = ticket[1];
+
+ return await callback(ticket[0]);
+ } else {
+ release = ticket;
+
+ return await callback();
+ }
+ } finally {
+ release();
+ }
+ },
+
+ release(weight?: number): void {
+ sync.release(weight);
+ },
+
+ cancel(): void {
+ return sync.cancel();
+ },
+
+ waitForUnlock: (
+ weightOrPriority?: number,
+ priority?: number
+ ): Promise<void> => {
+ let weight: number | undefined;
+ if (isSemaphore(sync)) {
+ weight = weightOrPriority;
+ } else {
+ weight = undefined;
+ priority = weightOrPriority;
+ }
+ if (weight !== undefined && weight <= 0) {
+ throw new Error(`invalid weight ${weight}: must be positive`);
+ }
+
+ return new Promise((resolve, reject) => {
+ const handle = setTimeout(() => reject(timeoutError), timeout);
+ (isSemaphore(sync)
+ ? sync.waitForUnlock(weight, priority)
+ : sync.waitForUnlock(priority)
+ ).then(() => {
+ clearTimeout(handle);
+ resolve();
+ });
+ });
+ },
+
+ isLocked: (): boolean => sync.isLocked(),
+
+ getValue: (): number => (sync as SemaphoreInterface).getValue(),
+
+ setValue: (value: number) => (sync as SemaphoreInterface).setValue(value),
+ };
+}
+
+function isSemaphore(
+ sync: SemaphoreInterface | MutexInterface
+): sync is SemaphoreInterface {
+ return (sync as SemaphoreInterface).getValue !== undefined;
+}