aboutsummaryrefslogtreecommitdiffstats
path: root/src/lib/vendor/async-mutex/semaphore.ts
diff options
context:
space:
mode:
authorLibravatarLarge Libravatar memdmp <memdmpestrogenzone>2025-08-20 12:46:01 +0000
committerLibravatarLarge Libravatar memdmp <memdmpestrogenzone>2025-08-20 12:46:01 +0000
commit80d0342636056bc839517b64fd11708abea70237 (patch)
tree99c2505b924ab47bc21b215a43cdac2c37c9facf /src/lib/vendor/async-mutex/semaphore.ts
parentdddef149aea597a145e3717b2c461b251e0f6a8d (diff)
downloadcrunched-80d0342636056bc839517b64fd11708abea70237.tar.gz
crunched-80d0342636056bc839517b64fd11708abea70237.tar.bz2
crunched-80d0342636056bc839517b64fd11708abea70237.tar.lz
crunched-80d0342636056bc839517b64fd11708abea70237.zip

feat: locks :3

Diffstat (limited to 'src/lib/vendor/async-mutex/semaphore.ts')
-rw-r--r--src/lib/vendor/async-mutex/semaphore.ts176
1 files changed, 176 insertions, 0 deletions
diff --git a/src/lib/vendor/async-mutex/semaphore.ts b/src/lib/vendor/async-mutex/semaphore.ts
new file mode 100644
index 0000000..385e814
--- /dev/null
+++ b/src/lib/vendor/async-mutex/semaphore.ts
@@ -0,0 +1,176 @@
+import { E_CANCELED } from './errors';
+import type { SemaphoreInterface } from './semaphoreinterface';
+
+interface Priority {
+ priority: number;
+}
+
+interface QueueEntry {
+ resolve(result: [number, SemaphoreInterface.Releaser]): void;
+ reject(error: unknown): void;
+ weight: number;
+ priority: number;
+}
+
+interface Waiter {
+ resolve(): void;
+ priority: number;
+}
+
+class Semaphore implements SemaphoreInterface {
+ constructor(
+ private _value: number,
+ private _cancelError: Error = E_CANCELED
+ ) {}
+
+ acquire(
+ weight = 1,
+ priority = 0
+ ): Promise<[number, SemaphoreInterface.Releaser]> {
+ if (weight <= 0)
+ throw new Error(`invalid weight ${weight}: must be positive`);
+
+ return new Promise((resolve, reject) => {
+ const task: QueueEntry = { resolve, reject, weight, priority };
+ const i = findIndexFromEnd(
+ this._queue,
+ (other) => priority <= other.priority
+ );
+ if (i === -1 && weight <= this._value) {
+ // Needs immediate dispatch, skip the queue
+ this._dispatchItem(task);
+ } else {
+ this._queue.splice(i + 1, 0, task);
+ }
+ });
+ }
+
+ async runExclusive<T>(
+ callback: SemaphoreInterface.Worker<T>,
+ weight = 1,
+ priority = 0
+ ): Promise<T> {
+ const [value, release] = await this.acquire(weight, priority);
+
+ try {
+ return await callback(value);
+ } finally {
+ release();
+ }
+ }
+
+ waitForUnlock(weight = 1, priority = 0): Promise<void> {
+ if (weight <= 0)
+ throw new Error(`invalid weight ${weight}: must be positive`);
+
+ if (this._couldLockImmediately(weight, priority)) {
+ return Promise.resolve();
+ } else {
+ return new Promise((resolve) => {
+ if (!this._weightedWaiters[weight - 1])
+ this._weightedWaiters[weight - 1] = [];
+ insertSorted(this._weightedWaiters[weight - 1], { resolve, priority });
+ });
+ }
+ }
+
+ isLocked(): boolean {
+ return this._value <= 0;
+ }
+
+ getValue(): number {
+ return this._value;
+ }
+
+ setValue(value: number): void {
+ this._value = value;
+ this._dispatchQueue();
+ }
+
+ release(weight = 1): void {
+ if (weight <= 0)
+ throw new Error(`invalid weight ${weight}: must be positive`);
+
+ this._value += weight;
+ this._dispatchQueue();
+ }
+
+ cancel(): void {
+ this._queue.forEach((entry) => entry.reject(this._cancelError));
+ this._queue = [];
+ }
+
+ private _dispatchQueue(): void {
+ this._drainUnlockWaiters();
+ while (this._queue.length > 0 && this._queue[0].weight <= this._value) {
+ this._dispatchItem(this._queue.shift()!);
+ this._drainUnlockWaiters();
+ }
+ }
+
+ private _dispatchItem(item: QueueEntry): void {
+ const previousValue = this._value;
+ this._value -= item.weight;
+ item.resolve([previousValue, this._newReleaser(item.weight)]);
+ }
+
+ private _newReleaser(weight: number): () => void {
+ let called = false;
+
+ return () => {
+ if (called) return;
+ called = true;
+
+ this.release(weight);
+ };
+ }
+
+ private _drainUnlockWaiters(): void {
+ if (this._queue.length === 0) {
+ for (let weight = this._value; weight > 0; weight--) {
+ const waiters = this._weightedWaiters[weight - 1];
+ if (!waiters) continue;
+ waiters.forEach((waiter) => waiter.resolve());
+ this._weightedWaiters[weight - 1] = [];
+ }
+ } else {
+ const queuedPriority = this._queue[0].priority;
+ for (let weight = this._value; weight > 0; weight--) {
+ const waiters = this._weightedWaiters[weight - 1];
+ if (!waiters) continue;
+ const i = waiters.findIndex(
+ (waiter) => waiter.priority <= queuedPriority
+ );
+ (i === -1 ? waiters : waiters.splice(0, i)).forEach((waiter) =>
+ waiter.resolve()
+ );
+ }
+ }
+ }
+
+ private _couldLockImmediately(weight: number, priority: number) {
+ return (
+ (this._queue.length === 0 || this._queue[0].priority < priority) &&
+ weight <= this._value
+ );
+ }
+
+ private _queue: Array<QueueEntry> = [];
+ private _weightedWaiters: Array<Array<Waiter>> = [];
+}
+
+function insertSorted<T extends Priority>(a: T[], v: T) {
+ const i = findIndexFromEnd(a, (other) => v.priority <= other.priority);
+ a.splice(i + 1, 0, v);
+}
+
+function findIndexFromEnd<T>(a: T[], predicate: (e: T) => boolean): number {
+ for (let i = a.length - 1; i >= 0; i--) {
+ if (predicate(a[i])) {
+ return i;
+ }
+ }
+ return -1;
+}
+
+export default Semaphore;