Skip to main content

ExclusiveProcess

ExclusiveProcess is an in-process primitive that ensures a named async computation runs at most once at a time while still being invocable from many independent call-sites, and forwards per-run progress events to all concurrent callers through a built-in callback fan-out.


Motivation

Several long-running operations in Volodyslav can be triggered from two independent places:

OperationTriggered by scheduled jobTriggered by frontend
Diary-summary pipelinehourly job in jobs/all.jsPOST /diary-summary/run
Synchourly job in jobs/all.jsPOST /sync

Before ExclusiveProcess existed, these two trigger-paths were completely independent. A second concurrent invocation could therefore:

  1. Start a parallel run (wasting resources and potentially corrupting shared state), or
  2. Queue a redundant run via the old mutex (so the operation ran twice in sequence even though only one run was needed).

Concept

An ExclusiveProcess wraps a single, re-runnable async computation with three type parameters:

  • A — type of the single argument accepted by each invocation.
  • T — return type of the computation.
  • C — type of each progress event broadcast by the computation.
ExclusiveProcess<A, T, C>

├─ invoke(arg, cb?) ─ first caller → starts run, becomes INITIATOR
│ cb registered in fan-out list

└─ invoke(arg, cb?) ─ second caller → conflictor decides:
"attach" → coalesces onto running run
"queue" → waits for a fresh run

When the procedure calls fanOut(event), every registered caller callback receives the event — including callbacks from attachers that joined after the run started.

After the computation finishes (success or error) the ExclusiveProcess resets to idle, so the next invoke starts a fresh run.


API

makeExclusiveProcess<A, T, C>({ procedure, conflictor }) → ExclusiveProcess<A, T, C>

Creates a new, idle ExclusiveProcess.

procedure(fanOut, arg) — the async computation to run. Must return a Promise<T>.

  • fanOut: (cbArg: C) => void — class-managed wrapper; call this to broadcast progress events to all current callers. If a caller's callback throws, the error is caught and logged via console.error; fan-out continues to the remaining callbacks uninterrupted.
  • arg: A — per-invocation argument passed by the caller.

The procedure is called fresh on each new run.

conflictor(initiating, attaching) → "attach" | "queue" — called when invoke arrives while a run is already in progress.

  • Return "attach" to coalesce the new call onto the current run. The new caller's callerCallback is added to the fan-out list and the new caller shares the current run's result promise.
  • Return "queue" to queue the new call behind the current run. The new caller waits for a fresh run that starts after the current one ends.

To always attach (never queue), pass conflictor: () => "attach".


exclusiveProcess.invoke(arg, callerCallback?) → ExclusiveProcessHandle<T>

State before callconflictor decisionBehaviour
IdleStarts the run with arg; caller is the initiator
Running"attach"Attaches; caller becomes an attacher
Running"queue"Queues behind the current run

callerCallback is registered in the class-managed fan-out list for the current run (or for the queued run, if queuing). It will be called every time the procedure calls fanOut(event) for the remainder of the run.

Queuing semantics (when conflictor returns "queue"):

  • Last-write-wins on arg: the most-recently queued arg is used when the queued run starts.
  • All queued callers' callbacks are composed: every queued caller receives fan-out events from the queued run, even if their arg was overwritten.

handle.isInitiator: boolean

true if this particular call started the computation; false if it attached to an already-running one (or is waiting for a queued run).


handle.result: Promise<T>

A promise shared by the initiator and all attachers for the same run. It resolves with the return value of the procedure on success, or rejects with the thrown error on failure.


Guarantees

Progress events reach all concurrent callers

Because all handles for the same run share the same fan-out list, every event emitted via fanOut is delivered to every registered callback — including callbacks registered by attachers that joined after the run started.

const ep = makeExclusiveProcess({
procedure: (fanOut, arg) => {
fanOut("step-1");
fanOut("step-2");
return Promise.resolve("done");
},
conflictor: () => "attach",
});

const steps1 = [];
const steps2 = [];

const h1 = ep.invoke(undefined, (e) => steps1.push(e)); // initiator
const h2 = ep.invoke(undefined, (e) => steps2.push(e)); // attacher

await Promise.all([h1.result, h2.result]);

// Both callers received every event
console.log(steps1); // ["step-1", "step-2"]
console.log(steps2); // ["step-1", "step-2"]

Errors propagate to all callers

Because all handles (initiator + every attacher) share the same Promise object, a rejection is seen by every awaiter — not just the one that started the computation.

const ep = makeExclusiveProcess({
procedure: (_fanOut, _arg) => Promise.reject(new Error("oops")),
conflictor: () => "attach",
});

const h1 = ep.invoke(undefined); // initiator
const h2 = ep.invoke(undefined); // attacher

await Promise.all([
h1.result.catch(e => console.error("h1:", e.message)), // "oops"
h2.result.catch(e => console.error("h2:", e.message)), // "oops"
]);

Errors do not prevent future runs

_currentPromise is cleared in the rejection handler before the rejection propagates, so the next invoke always sees the process as idle and starts a fresh computation.

const ep = makeExclusiveProcess({
procedure: (_fanOut, _arg) => Promise.reject(new Error("first failure")),
conflictor: () => "attach",
});
await ep.invoke(undefined).result.catch(() => {});
// ep is now idle again
const h = ep.invoke(undefined, ...);
console.log(h.isInitiator); // true

Usage pattern

Shared singleton per subsystem

Create one ExclusiveProcess instance per long-running operation. The instance must be accessible from every call-site that participates in the exclusion (typically both the route handler and the scheduled job).

Non-parametric dependencies (such as capabilities) can be included in the arg object. The conflictor should inspect only the fields that matter for queuing and ignore the rest.

// backend/src/jobs/diary_summary.js
const { makeExclusiveProcess } = require("../exclusive_process");

/**
* @typedef {{ type: "entryQueued", path: string }
* | { type: "entryProcessed", path: string, status: "success" | "error" }
* } DiarySummaryEvent
*/

const diarySummaryExclusiveProcess = makeExclusiveProcess({
// procedure receives fanOut and arg directly
procedure: (fanOut, { capabilities }) => {
return _runPipelineUnlocked(capabilities, {
onEntryQueued: (path) => fanOut({ type: "entryQueued", path }),
onEntryProcessed: (path, status) => fanOut({ type: "entryProcessed", path, status }),
});
},
// All concurrent calls attach to the same run — no queuing needed.
conflictor: () => "attach",
});

function runDiarySummaryPipeline(capabilities, callbacks) {
const callerCallback = callbacks
? (event) => {
if (event.type === "entryQueued") callbacks.onEntryQueued?.(event.path);
else if (event.type === "entryProcessed") callbacks.onEntryProcessed?.(event.path, event.status);
}
: undefined;
return diarySummaryExclusiveProcess.invoke({ capabilities }, callerCallback).result;
}

module.exports = { runDiarySummaryPipeline, diarySummaryExclusiveProcess };

Options queuing (sync use-case)

When different callers may supply incompatible arguments, use conflictor to ensure conflicting calls are never silently dropped:

// backend/src/sync/index.js
const synchronizeAllExclusiveProcess = makeExclusiveProcess({
procedure: (fanOut, { capabilities, options }) => {
return _synchronizeAllUnlocked(capabilities, options, fanOut);
},
// conflictor ignores capabilities; only inspects resetToHostname
conflictor: (initiating, attaching) => {
const incomingReset = attaching.options?.resetToHostname;
if (incomingReset === undefined) return "attach";
return incomingReset !== initiating.options?.resetToHostname ? "queue" : "attach";
},
});

function synchronizeAll(capabilities, options, onStepComplete) {
return synchronizeAllExclusiveProcess.invoke({ capabilities, options }, onStepComplete).result;
}

Relation to withMutex

withMutex serialises callers: the second caller waits for the first to finish, then starts its own run from scratch. ExclusiveProcess coalesces callers: the second caller attaches to the first's run, so there is never more than one execution.

PropertywithMutexExclusiveProcess
Max concurrent runs11
Second caller behaviourQueued; runs after firstAttached; shares first's result
Error propagationOnly to the failed run's callerAll current callers
Progress eventsPer-callerShared fan-out to all callers
Total runs (N callers)N1