Skip to content

feat!: M14 Conduit — ObservabilityPort, streaming restore, parallel I/O (v4.0.0)#10

Merged
flyingrobots merged 3 commits intomainfrom
feat/m14-conduit
Feb 28, 2026
Merged

feat!: M14 Conduit — ObservabilityPort, streaming restore, parallel I/O (v4.0.0)#10
flyingrobots merged 3 commits intomainfrom
feat/m14-conduit

Conversation

@flyingrobots
Copy link
Member

@flyingrobots flyingrobots commented Feb 27, 2026

Summary

  • ObservabilityPort replaces extends EventEmitter on CasService — domain layer no longer depends on Node's event infrastructure. Three adapters: SilentObserver (no-op default), EventEmitterObserver (backward-compat event bridge), StatsCollector (metric accumulator).
  • Streaming restore via restoreStream()AsyncIterable<Buffer> with O(chunkSize) memory for unencrypted files. restoreFile() rewritten to use createWriteStream + pipeline.
  • Parallel chunk I/Oconcurrency option with counting Semaphore. Store writes and restore reads are gated for configurable parallelism. Default concurrency: 1 preserves sequential behavior.
  • CLI migrated to create EventEmitterObserver per command, attach progress to observer instead of service.

Breaking Changes

  • CasService no longer extends EventEmitter — use EventEmitterObserver adapter for event subscriptions
  • observability is now a required constructor port for CasService (facade defaults to SilentObserver)

Files

  • Created: 12 files (port, 3 adapters, semaphore, 7 test files)
  • Modified: ~27 files (CasService, facade, CLI, types, 15 test files, package.json, jsr.json, CHANGELOG, README)

Test plan

  • npm test — 567 tests pass (524 existing + 43 new)
  • npx eslint . — clean
  • npx jsr publish --dry-run --allow-dirty — passes
  • Manual: git cas store ./testfile --slug test --tree → progress bar works
  • Manual: git cas restore --slug test --out ./restored → progress bar works, file matches
  • Manual: git cas vault dashboard → renders correctly

Summary by CodeRabbit

  • New Features

    • Streaming restore: memory-efficient AsyncIterable restore and file streaming
    • Parallel chunk I/O: configurable concurrency for faster store/restore
    • Observability adapters: SilentObserver (no-op), EventEmitterObserver (event bridge), StatsCollector (metrics summary)
  • Breaking Changes

    • Service no longer emits events directly; an observability port is required (EventEmitterObserver remains backward-compatible)
  • Documentation

    • Updated README, roadmap, and examples for streaming, concurrency, and observability
  • Tests

    • Expanded unit coverage (new tests for streaming, concurrency, semaphore, and adapters)

BREAKING CHANGE: CasService no longer extends EventEmitter. The new
ObservabilityPort replaces all emit() calls with metric(channel, data).
EventEmitterObserver provides backward-compatible event bridging.

- Add ObservabilityPort, SilentObserver, EventEmitterObserver, StatsCollector
- Add restoreStream() async generator with O(chunkSize) streaming
- Add concurrency option with Semaphore-gated parallel chunk I/O
- Rewrite restoreFile() to use createWriteStream + pipeline
- Migrate CLI progress tracking to EventEmitterObserver
- 567 tests passing, eslint clean, JSR validated
@coderabbitai
Copy link

coderabbitai bot commented Feb 27, 2026

Warning

Rate limit exceeded

@flyingrobots has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 25 minutes and 11 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between 11eed5f and 54ebfc0.

📒 Files selected for processing (8)
  • ROADMAP.md
  • src/domain/services/CasService.js
  • src/domain/services/Semaphore.js
  • src/infrastructure/adapters/StatsCollector.js
  • test/unit/domain/services/CasService.events.test.js
  • test/unit/domain/services/CasService.test.js
  • test/unit/domain/services/Semaphore.test.js
  • test/unit/infrastructure/adapters/StatsCollector.test.js
📝 Walkthrough

Walkthrough

This PR implements v4.0.0 (Conduit): removes EventEmitter from CasService in favor of a required ObservabilityPort and adapters (SilentObserver, EventEmitterObserver, StatsCollector); adds restoreStream() async generator for streaming restores, Semaphore-based concurrency for parallel chunk I/O, streaming restoreFile(), and updates CLI/examples/tests to use the new observability pattern.

Changes

Cohort / File(s) Summary
Version & packaging
jsr.json, package.json
Bump version to 4.0.0.
Changelog & Docs
CHANGELOG.md, README.md, ROADMAP.md
Document v4.0.0 breaking changes, observability port, streaming restore, and parallel chunking; roadmap updated with M14 Conduit tasks.
Public surface & exports
index.js, index.d.ts
Export ObservabilityPort, SilentObserver, EventEmitterObserver, StatsCollector; ContentAddressableStore options add observability? and concurrency; add restoreStream to public API.
Core service
src/domain/services/CasService.js, src/domain/services/CasService.d.ts
Remove EventEmitter inheritance; constructor now requires/accepts observability and concurrency; replace emit() with observability.metric(); add restoreStream async generator; refactor _chunkAndStore for semaphore-gated parallel writes; add encryption/compression/key-resolution helpers.
Observability port & adapters
src/ports/ObservabilityPort.js, src/infrastructure/adapters/SilentObserver.js, src/infrastructure/adapters/EventEmitterObserver.js, src/infrastructure/adapters/StatsCollector.js
Add ObservabilityPort abstract interface; provide SilentObserver (no-op), EventEmitterObserver (bridges metric→EventEmitter with on/removeListener/listenerCount), and StatsCollector (accumulates metrics, exposes summary).
Concurrency primitive
src/domain/services/Semaphore.js
Add counting Semaphore with acquire()/release() and validation of positive integer concurrency.
Facade & streaming I/O
index.js, bin/git-cas.js, bin/ui/progress.js
Wire observability and concurrency through ContentAddressableStore; add restoreStream usage in restoreFile (Readable + Transform + pipeline); CLI now creates EventEmitterObserver for progress and passes observer to CAS; JSDoc updated to reflect observer attach signature.
Examples
examples/progress-tracking.js
Migrate example to create EventEmitterObserver and subscribe to observer events rather than listening on service EventEmitter.
Tests — observability & adapters
test/unit/infrastructure/adapters/*, test/unit/ports/ObservabilityPort.test.js
Add tests for ObservabilityPort abstract behavior, SilentObserver, EventEmitterObserver routing/listener behavior, and StatsCollector summary/throughput.
Tests — CasService & concurrency
test/unit/domain/services/*.test.js, test/unit/domain/services/CasService.parallel.test.js, test/unit/domain/services/CasService.restoreStream.test.js
Update many CasService tests to inject SilentObserver; add 43 tests including parallel/concurrency scenarios, restoreStream coverage, and stream-error expectations.
Tests — Semaphore
test/unit/domain/services/Semaphore.test.js
Add tests validating concurrency limits, ordering, and constructor validation.
CLI tests
test/unit/cli/progress.test.js
Update tests to use EventEmitterObserver and metric(...) instead of direct EventEmitter.emit(...).

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant CasService
    participant Semaphore
    participant Persistence
    participant ObservabilityPort

    Client->>CasService: store(manifest, buffer, options)
    activate CasService

    loop for each chunk
        CasService->>Semaphore: acquire()
        activate Semaphore
        Semaphore-->>CasService: permit
        deactivate Semaphore

        par write chunk (concurrency-limited)
            CasService->>Persistence: writeBlob(chunk)
            activate Persistence
            Persistence-->>CasService: blobRef
            deactivate Persistence

            CasService->>ObservabilityPort: metric('chunk',{action:'stored',...})
            activate ObservabilityPort
            ObservabilityPort-->>CasService: void
            deactivate ObservabilityPort
        and
            CasService->>Semaphore: release()
            activate Semaphore
            Semaphore-->>Semaphore: wake next waiter
            deactivate Semaphore
        end
    end

    CasService->>Persistence: writeTree(manifest)
    CasService-->>Client: manifest
    deactivate CasService
Loading
sequenceDiagram
    participant Client
    participant CasService
    participant Persistence
    participant Crypto
    participant ObservabilityPort

    Client->>CasService: restoreStream({manifest, encryptionKey})
    activate CasService

    loop each chunkRef in manifest
        CasService->>Persistence: readBlob(blobRef)
        activate Persistence
        Persistence-->>CasService: data
        deactivate Persistence

        alt encrypted
            CasService->>Crypto: decrypt(data, key)
            activate Crypto
            Crypto-->>CasService: plaintext
            deactivate Crypto
        end

        alt compressed
            CasService->>CasService: decompress(plaintext)
        end

        CasService->>ObservabilityPort: metric('chunk',{action:'restored',...})
        activate ObservabilityPort
        ObservabilityPort-->>CasService: void
        deactivate ObservabilityPort

        CasService-->>Client: yield chunk (AsyncIterable<Buffer>)
    end

    CasService->>ObservabilityPort: metric('file',{action:'restored',...})
    deactivate CasService
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~70 minutes

Possibly related PRs

Poem

🐰 A hop into Conduit’s stream,

no events to snag at the seam.
Observers watch, stats quietly hum,
semaphores guard each chunk that’s come.
Hooray — v4 hops lightly, swift and neat!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 52.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main changes: introduction of ObservabilityPort, streaming restore, and parallel I/O with a major version bump to 4.0.0.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/m14-conduit

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (5)
test/unit/domain/services/Semaphore.test.js (1)

46-58: Add a regression test for extra release() calls.

Coverage is good, but adding an invariant test for over-release would prevent future underflow regressions.

🧪 Suggested additional test
 describe('Semaphore – validation', () => {
@@
   it('throws on concurrency: 1.5', () => {
     expect(() => new Semaphore(1.5)).toThrow();
   });
+
+  it('throws when release is called without an active permit', () => {
+    const sem = new Semaphore(1);
+    expect(() => sem.release()).toThrow();
+  });
 });
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@test/unit/domain/services/Semaphore.test.js` around lines 46 - 58, Add a
regression unit test that asserts over-releasing the Semaphore is not allowed:
create a Semaphore instance (use new Semaphore(1)), acquire() once, release()
once, then assert that a second release() either throws (expect(() =>
semaphore.release()).toThrow()) or otherwise fails the invariant; name the test
something like "throws on extra release()" and reference the Semaphore class and
its release()/acquire() methods so future changes cannot underflow the internal
counter.
examples/progress-tracking.js (1)

28-31: Remove or consolidate unused cas instance.

The cas variable created on line 31 is never used for any operations — all store/restore calls use cas2 (lines 113, 136, 160, 197). This creates confusion about which instance should be used.

♻️ Consolidate to single CAS instance
-// Initialize plumbing and CAS with an EventEmitterObserver
+// Initialize plumbing with an EventEmitterObserver
 const plumbing = GitPlumbing.createDefault({ cwd: repoDir });
 const observer = new EventEmitterObserver();
-const cas = ContentAddressableStore.createJson({ plumbing, chunkSize: 128 * 1024 }); // 128 KB chunks
+const cas = new ContentAddressableStore({
+  plumbing,
+  chunkSize: 128 * 1024,
+  observability: observer,
+});

Then remove lines 105-109 and use cas throughout instead of cas2.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@examples/progress-tracking.js` around lines 28 - 31, The file defines two
ContentAddressableStore instances (cas and cas2) but only cas2 is used later;
remove the unused duplicate by deleting the cas2 creation (the call to
ContentAddressableStore.createJson that assigns cas2) and replace all references
to cas2 with cas so the single instance created as cas (using plumbing and
EventEmitterObserver) is used for all store/restore operations; ensure any
configuration (chunkSize, plumbing, observer) applied to cas2 is preserved on
the cas creation if needed.
src/infrastructure/adapters/EventEmitterObserver.js (1)

22-32: Consider defensive handling for missing action field.

If metric() is called with data that doesn't include an action property, the event name becomes "${channel}:undefined", which could cause unexpected behavior for listeners expecting well-formed event names.

🛡️ Optional defensive check
   metric(channel, data) {
+    if (!data?.action) {
+      return; // Skip metrics without action
+    }
     const eventName = `${channel}:${data.action}`;
     if (channel === 'error') {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/infrastructure/adapters/EventEmitterObserver.js` around lines 22 - 32,
The metric() method builds an event name from data.action but doesn't guard
against missing action; update metric(channel, data) to first validate that data
&& typeof data.action === 'string' (or at least that 'action' in data) and
return early (or emit an error) when it's absent to avoid emitting events like
"channel:undefined"; locate the metric function and modify the eventName
construction and payload creation (the variables eventName and payload and the
this.#emitter emits) so you bail out or handle a default action before calling
this.#emitter.emit.
index.js (1)

170-184: Factory methods don't forward observability/concurrency options.

createJson() and createCbor() don't accept or forward observability or concurrency options. This is fine for simple use cases since the defaults (SilentObserver, concurrency: 1) are applied, but consumers needing custom observability must use the constructor directly.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@index.js` around lines 170 - 184, The factory methods createJson and
createCbor should accept and forward observability and concurrency options to
the ContentAddressableStore constructor; update the signatures of createJson({
plumbing, chunkSize, policy, observability, concurrency }) and createCbor({
plumbing, chunkSize, policy, observability, concurrency }) (or use defaults) and
include observability and concurrency in the options passed into new
ContentAddressableStore({ plumbing, chunkSize, codec: new JsonCodec()/new
CborCodec(), policy, observability, concurrency }) so callers can supply custom
observers and concurrency settings.
test/unit/domain/services/CasService.events.test.js (1)

180-201: Consider reusing the setup() helper to reduce duplication.

setupSilent() duplicates the persistence mock and service instantiation from setup(). You could parameterize setup() to accept an optional observer:

♻️ Proposed refactor
-function setup() {
+function setup(observer = new EventEmitterObserver()) {
   const crypto = new NodeCryptoAdapter();
   const blobStore = new Map();
-  const observer = new EventEmitterObserver();
   // ... rest of setup
 }

-function setupSilent() {
-  const crypto = new NodeCryptoAdapter();
-  const blobStore = new Map();
-  const mockPersistence = {
-    writeBlob: vi.fn().mockImplementation(async (content) => {
-      const buf = Buffer.isBuffer(content) ? content : Buffer.from(content);
-      const oid = await crypto.sha256(buf);
-      blobStore.set(oid, buf);
-      return oid;
-    }),
-    writeTree: vi.fn().mockResolvedValue('mock-tree-oid'),
-    readBlob: vi.fn().mockImplementation(async (oid) => {
-      const buf = blobStore.get(oid);
-      if (!buf) { throw new Error(`Blob not found: ${oid}`); }
-      return buf;
-    }),
-  };
-  return new CasService({
-    persistence: mockPersistence, crypto, codec: new JsonCodec(),
-    observability: new SilentObserver(), chunkSize: 1024,
-  });
-}
+function setupSilent() {
+  return setup(new SilentObserver()).service;
+}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@test/unit/domain/services/CasService.events.test.js` around lines 180 - 201,
The setupSilent() helper duplicates the persistence mock and CasService
construction; refactor so setup accepts an optional observer parameter and reuse
it: move the mockPersistence creation and NodeCryptoAdapter/JsonCodec/CasService
instantiation into setup (or have setup return { service, mockPersistence }) and
update setupSilent to call setup(new SilentObserver()) or setup({ observer: new
SilentObserver() }); ensure references to setupSilent, setup, mockPersistence,
CasService, NodeCryptoAdapter, JsonCodec, and SilentObserver are updated so
tests use the single shared setup factory and remove the duplicated mock
implementation.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/domain/services/Semaphore.js`:
- Around line 36-42: The release() method in class Semaphore can decrement the
private field `#active` below zero when called without a held permit; update
Semaphore.release to guard against underflow by checking if this.#queue.length
=== 0 then only decrement this.#active if this.#active > 0, otherwise either
clamp to zero or throw/log a clear error/warning about releasing without a
permit; keep the existing branch that shifts and invokes next() on this.#queue,
and reference the Semaphore.release method and private fields `#queue` and `#active`
when making the change.

In `@src/infrastructure/adapters/StatsCollector.js`:
- Around line 14-17: Guard against missing or non-numeric chunk payloads before
updating metrics: in the StatsCollector logic that handles channel === 'chunk'
(where it currently does this.#chunksProcessed++ and this.#bytesTotal +=
data.size || 0), first safely read and normalize size using a safe access (e.g.
data?.size), coerce to a Number and validate with Number.isFinite (fall back to
0 for undefined/NaN/Infinity), then increment this.#chunksProcessed and add the
validated numeric size to this.#bytesTotal so no exception is thrown and
non-numeric values cannot corrupt the total.

---

Nitpick comments:
In `@examples/progress-tracking.js`:
- Around line 28-31: The file defines two ContentAddressableStore instances (cas
and cas2) but only cas2 is used later; remove the unused duplicate by deleting
the cas2 creation (the call to ContentAddressableStore.createJson that assigns
cas2) and replace all references to cas2 with cas so the single instance created
as cas (using plumbing and EventEmitterObserver) is used for all store/restore
operations; ensure any configuration (chunkSize, plumbing, observer) applied to
cas2 is preserved on the cas creation if needed.

In `@index.js`:
- Around line 170-184: The factory methods createJson and createCbor should
accept and forward observability and concurrency options to the
ContentAddressableStore constructor; update the signatures of createJson({
plumbing, chunkSize, policy, observability, concurrency }) and createCbor({
plumbing, chunkSize, policy, observability, concurrency }) (or use defaults) and
include observability and concurrency in the options passed into new
ContentAddressableStore({ plumbing, chunkSize, codec: new JsonCodec()/new
CborCodec(), policy, observability, concurrency }) so callers can supply custom
observers and concurrency settings.

In `@src/infrastructure/adapters/EventEmitterObserver.js`:
- Around line 22-32: The metric() method builds an event name from data.action
but doesn't guard against missing action; update metric(channel, data) to first
validate that data && typeof data.action === 'string' (or at least that 'action'
in data) and return early (or emit an error) when it's absent to avoid emitting
events like "channel:undefined"; locate the metric function and modify the
eventName construction and payload creation (the variables eventName and payload
and the this.#emitter emits) so you bail out or handle a default action before
calling this.#emitter.emit.

In `@test/unit/domain/services/CasService.events.test.js`:
- Around line 180-201: The setupSilent() helper duplicates the persistence mock
and CasService construction; refactor so setup accepts an optional observer
parameter and reuse it: move the mockPersistence creation and
NodeCryptoAdapter/JsonCodec/CasService instantiation into setup (or have setup
return { service, mockPersistence }) and update setupSilent to call setup(new
SilentObserver()) or setup({ observer: new SilentObserver() }); ensure
references to setupSilent, setup, mockPersistence, CasService,
NodeCryptoAdapter, JsonCodec, and SilentObserver are updated so tests use the
single shared setup factory and remove the duplicated mock implementation.

In `@test/unit/domain/services/Semaphore.test.js`:
- Around line 46-58: Add a regression unit test that asserts over-releasing the
Semaphore is not allowed: create a Semaphore instance (use new Semaphore(1)),
acquire() once, release() once, then assert that a second release() either
throws (expect(() => semaphore.release()).toThrow()) or otherwise fails the
invariant; name the test something like "throws on extra release()" and
reference the Semaphore class and its release()/acquire() methods so future
changes cannot underflow the internal counter.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b458797 and 502c92d.

📒 Files selected for processing (39)
  • CHANGELOG.md
  • README.md
  • ROADMAP.md
  • bin/git-cas.js
  • examples/progress-tracking.js
  • index.d.ts
  • index.js
  • jsr.json
  • package.json
  • src/domain/services/CasService.d.ts
  • src/domain/services/CasService.js
  • src/domain/services/Semaphore.js
  • src/infrastructure/adapters/EventEmitterObserver.js
  • src/infrastructure/adapters/SilentObserver.js
  • src/infrastructure/adapters/StatsCollector.js
  • src/ports/ObservabilityPort.js
  • test/unit/cli/progress.test.js
  • test/unit/domain/services/CasService.codec.test.js
  • test/unit/domain/services/CasService.compression.test.js
  • test/unit/domain/services/CasService.crypto.test.js
  • test/unit/domain/services/CasService.deleteAsset.test.js
  • test/unit/domain/services/CasService.empty-file.test.js
  • test/unit/domain/services/CasService.errors.test.js
  • test/unit/domain/services/CasService.events.test.js
  • test/unit/domain/services/CasService.findOrphanedChunks.test.js
  • test/unit/domain/services/CasService.kdf.test.js
  • test/unit/domain/services/CasService.key-validation.test.js
  • test/unit/domain/services/CasService.merkle.test.js
  • test/unit/domain/services/CasService.parallel.test.js
  • test/unit/domain/services/CasService.readManifest.test.js
  • test/unit/domain/services/CasService.restore.test.js
  • test/unit/domain/services/CasService.restoreStream.test.js
  • test/unit/domain/services/CasService.stream-error.test.js
  • test/unit/domain/services/CasService.test.js
  • test/unit/domain/services/Semaphore.test.js
  • test/unit/infrastructure/adapters/EventEmitterObserver.test.js
  • test/unit/infrastructure/adapters/SilentObserver.test.js
  • test/unit/infrastructure/adapters/StatsCollector.test.js
  • test/unit/ports/ObservabilityPort.test.js

Code review fixes (11 issues):
- C1: drain in-flight writes with Promise.allSettled on stream error
- M1: remove dead cas/plumbing variables in progress-tracking example
- M2: fix stale JSDoc return types in progress.js
- m1: merge duplicate node:fs imports in index.js
- m2: document restoreStream yielded-chunk semantics
- m3: rename chunksWritten → chunksDispatched in STREAM_ERROR
- m5: add parallel stream-error test (concurrency: 4)
- N1: rename N → readAhead in _restoreStreaming
- N3: move eventName after error guard in EventEmitterObserver

Roadmap additions:
- 6 visions: snapshot trees, portable bundles, manifest diff,
  CompressionPort, watch mode, interactive passphrase prompt
- 7 concerns with mitigations and defensive test specs: memory
  amplification, orphaned blobs, chunk size cap, Web Crypto
  buffering, passphrase exposure, KDF rate limiting, GCM nonces
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/domain/services/CasService.js (1)

107-116: ⚠️ Potential issue | 🟠 Major

Drain in-flight writes before rethrowing CasError from the source stream.

If the source stream throws a CasError (line 108), the code rethrows immediately without awaiting Promise.allSettled(pending), leaving launched writes unresolved. This can create nondeterministic side effects after the caller handles the exception.

Suggested fix
    } catch (err) {
-      if (err instanceof CasError) { throw err; }
-      const casErr = new CasError(
-        `Stream error during store: ${err.message}`,
-        'STREAM_ERROR',
-        { chunksDispatched: nextIndex, originalError: err },
-      );
       await Promise.allSettled(pending);
+      if (err instanceof CasError) { throw err; }
+      const casErr = new CasError(
+        `Stream error during store: ${err.message}`,
+        'STREAM_ERROR',
+        { chunksDispatched: nextIndex, originalError: err },
+      );
       this.observability.metric('error', { code: casErr.code, message: casErr.message });
       throw casErr;
    }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/domain/services/CasService.js` around lines 107 - 116, The catch
currently rethrows immediately when err instanceof CasError, leaving pending
writes unresolved; change the flow so that in the catch block you always await
Promise.allSettled(pending) before rethrowing (i.e., if err instanceof CasError:
await Promise.allSettled(pending) then throw err), and for the non-CasError path
keep creating casErr, await Promise.allSettled(pending) (as already done), emit
the observability.metric, and then throw; this ensures pending (the variable
used for launched writes) is drained before any throw from this method.
🧹 Nitpick comments (2)
test/unit/domain/services/CasService.parallel.test.js (2)

61-106: Add assertions that concurrency is actually exercised (not just correctness).

These cases verify round-trip integrity, but they don’t prove the semaphore is enforcing/using concurrency. A fully sequential implementation could still pass. Consider instrumenting writeBlob/readBlob with small async delays and tracking max in-flight operations, then assert:

  • maxInFlight > 1 when concurrency: 4
  • maxInFlight <= configuredConcurrency

Also, the concurrency: 10 test currently uses a 1-chunk payload, so it can’t exercise contention.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@test/unit/domain/services/CasService.parallel.test.js` around lines 61 - 106,
Instrument the CAS I/O in the tests to assert concurrency is actually used: wrap
or monkeypatch the service methods writeBlob and/or readBlob inside the tests
(where setup(), storeBuffer(), restoreStream(), and service are available) to
introduce a small async delay and track an in-flight counter that updates
before/after each call; record maxInFlight and add assertions that maxInFlight >
1 and maxInFlight <= configured concurrency for the "concurrency: 4" cases. Also
modify the "concurrency: 10" test to use a larger payload that produces multiple
chunks (instead of the 1-chunk 512-byte buffer) so it can exercise contention,
then assert maxInFlight respects the configuredConcurrency there as well. Ensure
you restore/undo any monkeypatch after each test.

109-142: Strengthen stream-error test to verify in-flight drain behavior.

This test validates error mapping and metadata well, but it doesn’t assert that already-dispatched async writes are settled before rejection. Adding that assertion would protect the regression this PR explicitly fixes.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@test/unit/domain/services/CasService.parallel.test.js` around lines 109 -
142, Enhance the "concurrency: 4 — STREAM_ERROR" test to also assert that any
already-dispatched async writes have settled before the store() rejection: after
catching the CasError from service.store (the failingSource helper), verify via
an observable on the service (e.g., a public counter or promise that indicates
settled writes) or a spy on the underlying write handler that the number of
completed writes matches err.meta.chunksDispatched; if needed add a small helper
on the test-only service fixture to await writes completion and assert it
resolves before finishing the test.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@ROADMAP.md`:
- Line 2406: Several fenced code blocks in ROADMAP.md are using bare triple
backticks (```) which triggers markdownlint rule MD040; update each of those
fences (the occurrences using ```) to include an explicit language identifier
(for example `text`, `json`, `js`, or `shell`) so they read like ```text or
```json etc.; scan for all triple-backtick blocks in ROADMAP.md and replace the
opening fence tokens to include the appropriate language tag to satisfy MD040
and improve readability.
- Line 2520: Escape the unescaped pipe characters inside inline code that are
breaking table columns; specifically update occurrences such as `compression: {
algorithm: 'gzip' | 'zstd' }` (and any other inline code in the same table
cells) to escape the pipe as `\|` so the Markdown table parser treats it as
text, and verify similar occurrences near the
`this.compression.compress(source)` and `this.compression.decompress(buffer)`
mentions are fixed as well (escape any `|` inside inline code in those cells).

In `@src/domain/services/CasService.js`:
- Around line 32-40: The constructor of CasService should validate the
observability adapter at initialization and fail fast: in the constructor ({
persistence, codec, crypto, observability, ... }) check that observability is
present and implements the required methods used by the class (e.g., typeof
observability.metric === 'function' or other expected methods like
counter/timer) and throw a clear Error if not; update the constructor in
src/domain/services/CasService.js to perform this guard before assigning
this.observability so later calls to metric(...) don't throw unexpectedly.
- Around line 484-486: In the restore path that currently returns early on an
empty manifest (checking manifest.chunks.length === 0) emit the same completion
observability metric/handler used for non-empty restores (the "file:restored"
metric) before returning; locate the early-return in CasService.js (the block
referencing manifest.chunks) and either call the existing metric emission helper
(or this.metrics/observability method used elsewhere in CasService) right before
the return or refactor to centralize final completion emission so empty-manifest
restores also trigger "file:restored".
- Around line 543-557: The prefetched promise array (ahead) produced by
readAndVerify() can leak if an awaited promise (await ahead[i % readAhead])
rejects and the generator exits; wrap the main restore loop that consumes ahead
(the for loop iterating over chunks) in a try...finally so that in the finally
block you await settlement of any remaining in-flight promises in ahead (e.g.,
await Promise.allSettled(ahead.filter(Boolean))) to ensure all readAndVerify()
operations finish; keep using readAhead, chunks, ahead, and readAndVerify names
so the fix is easy to locate.

---

Outside diff comments:
In `@src/domain/services/CasService.js`:
- Around line 107-116: The catch currently rethrows immediately when err
instanceof CasError, leaving pending writes unresolved; change the flow so that
in the catch block you always await Promise.allSettled(pending) before
rethrowing (i.e., if err instanceof CasError: await Promise.allSettled(pending)
then throw err), and for the non-CasError path keep creating casErr, await
Promise.allSettled(pending) (as already done), emit the observability.metric,
and then throw; this ensures pending (the variable used for launched writes) is
drained before any throw from this method.

---

Nitpick comments:
In `@test/unit/domain/services/CasService.parallel.test.js`:
- Around line 61-106: Instrument the CAS I/O in the tests to assert concurrency
is actually used: wrap or monkeypatch the service methods writeBlob and/or
readBlob inside the tests (where setup(), storeBuffer(), restoreStream(), and
service are available) to introduce a small async delay and track an in-flight
counter that updates before/after each call; record maxInFlight and add
assertions that maxInFlight > 1 and maxInFlight <= configured concurrency for
the "concurrency: 4" cases. Also modify the "concurrency: 10" test to use a
larger payload that produces multiple chunks (instead of the 1-chunk 512-byte
buffer) so it can exercise contention, then assert maxInFlight respects the
configuredConcurrency there as well. Ensure you restore/undo any monkeypatch
after each test.
- Around line 109-142: Enhance the "concurrency: 4 — STREAM_ERROR" test to also
assert that any already-dispatched async writes have settled before the store()
rejection: after catching the CasError from service.store (the failingSource
helper), verify via an observable on the service (e.g., a public counter or
promise that indicates settled writes) or a spy on the underlying write handler
that the number of completed writes matches err.meta.chunksDispatched; if needed
add a small helper on the test-only service fixture to await writes completion
and assert it resolves before finishing the test.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 502c92d and 11eed5f.

📒 Files selected for processing (8)
  • ROADMAP.md
  • bin/ui/progress.js
  • examples/progress-tracking.js
  • index.js
  • src/domain/services/CasService.js
  • src/infrastructure/adapters/EventEmitterObserver.js
  • test/unit/domain/services/CasService.parallel.test.js
  • test/unit/domain/services/CasService.stream-error.test.js
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/infrastructure/adapters/EventEmitterObserver.js

Major fixes:
- Semaphore: guard against permit underflow in release()
- StatsCollector: harden chunk metric against malformed payloads
- CasService: validate observability port at construction time
- CasService: drain in-flight writes before rethrowing CasError
- CasService: drain prefetched reads on restore failure (finally block)

Minor fixes:
- CasService: emit file:restored metric for empty manifests
- ROADMAP: add language tags to fenced code blocks (MD040)
- ROADMAP: escape pipe characters in table cells (MD056)
@flyingrobots
Copy link
Member Author

Re: Drain in-flight writes before rethrowing CasError (outside-diff comment on CasService.js:107-116)

✅ Addressed in commit 54ebfc0. Moved await Promise.allSettled(pending) above the if (err instanceof CasError) guard so all in-flight writes are drained before any rethrow — not just the STREAM_ERROR path.

@flyingrobots
Copy link
Member Author

Re: Semaphore over-release regression test (nit in review body)

✅ Also addressed in commit 54ebfc0. Both the guard (release() throws when #active === 0) and the regression test were added.

@flyingrobots
Copy link
Member Author

@coderabbitai review please 🥕

@coderabbitai
Copy link

coderabbitai bot commented Feb 27, 2026

@flyingrobots, thank you for addressing those two review comments in commit 54ebfc0! I'll review the changes now.

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

@flyingrobots flyingrobots merged commit 3654c6a into main Feb 28, 2026
6 checks passed
@flyingrobots flyingrobots deleted the feat/m14-conduit branch February 28, 2026 00:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant