feat!: M14 Conduit — ObservabilityPort, streaming restore, parallel I/O (v4.0.0)#10
feat!: M14 Conduit — ObservabilityPort, streaming restore, parallel I/O (v4.0.0)#10flyingrobots merged 3 commits intomainfrom
Conversation
BREAKING CHANGE: CasService no longer extends EventEmitter. The new ObservabilityPort replaces all emit() calls with metric(channel, data). EventEmitterObserver provides backward-compatible event bridging. - Add ObservabilityPort, SilentObserver, EventEmitterObserver, StatsCollector - Add restoreStream() async generator with O(chunkSize) streaming - Add concurrency option with Semaphore-gated parallel chunk I/O - Rewrite restoreFile() to use createWriteStream + pipeline - Migrate CLI progress tracking to EventEmitterObserver - 567 tests passing, eslint clean, JSR validated
|
Warning Rate limit exceeded
⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (8)
📝 WalkthroughWalkthroughThis PR implements v4.0.0 (Conduit): removes EventEmitter from CasService in favor of a required ObservabilityPort and adapters (SilentObserver, EventEmitterObserver, StatsCollector); adds restoreStream() async generator for streaming restores, Semaphore-based concurrency for parallel chunk I/O, streaming restoreFile(), and updates CLI/examples/tests to use the new observability pattern. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant CasService
participant Semaphore
participant Persistence
participant ObservabilityPort
Client->>CasService: store(manifest, buffer, options)
activate CasService
loop for each chunk
CasService->>Semaphore: acquire()
activate Semaphore
Semaphore-->>CasService: permit
deactivate Semaphore
par write chunk (concurrency-limited)
CasService->>Persistence: writeBlob(chunk)
activate Persistence
Persistence-->>CasService: blobRef
deactivate Persistence
CasService->>ObservabilityPort: metric('chunk',{action:'stored',...})
activate ObservabilityPort
ObservabilityPort-->>CasService: void
deactivate ObservabilityPort
and
CasService->>Semaphore: release()
activate Semaphore
Semaphore-->>Semaphore: wake next waiter
deactivate Semaphore
end
end
CasService->>Persistence: writeTree(manifest)
CasService-->>Client: manifest
deactivate CasService
sequenceDiagram
participant Client
participant CasService
participant Persistence
participant Crypto
participant ObservabilityPort
Client->>CasService: restoreStream({manifest, encryptionKey})
activate CasService
loop each chunkRef in manifest
CasService->>Persistence: readBlob(blobRef)
activate Persistence
Persistence-->>CasService: data
deactivate Persistence
alt encrypted
CasService->>Crypto: decrypt(data, key)
activate Crypto
Crypto-->>CasService: plaintext
deactivate Crypto
end
alt compressed
CasService->>CasService: decompress(plaintext)
end
CasService->>ObservabilityPort: metric('chunk',{action:'restored',...})
activate ObservabilityPort
ObservabilityPort-->>CasService: void
deactivate ObservabilityPort
CasService-->>Client: yield chunk (AsyncIterable<Buffer>)
end
CasService->>ObservabilityPort: metric('file',{action:'restored',...})
deactivate CasService
Estimated code review effort🎯 4 (Complex) | ⏱️ ~70 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (5)
test/unit/domain/services/Semaphore.test.js (1)
46-58: Add a regression test for extrarelease()calls.Coverage is good, but adding an invariant test for over-release would prevent future underflow regressions.
🧪 Suggested additional test
describe('Semaphore – validation', () => { @@ it('throws on concurrency: 1.5', () => { expect(() => new Semaphore(1.5)).toThrow(); }); + + it('throws when release is called without an active permit', () => { + const sem = new Semaphore(1); + expect(() => sem.release()).toThrow(); + }); });🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@test/unit/domain/services/Semaphore.test.js` around lines 46 - 58, Add a regression unit test that asserts over-releasing the Semaphore is not allowed: create a Semaphore instance (use new Semaphore(1)), acquire() once, release() once, then assert that a second release() either throws (expect(() => semaphore.release()).toThrow()) or otherwise fails the invariant; name the test something like "throws on extra release()" and reference the Semaphore class and its release()/acquire() methods so future changes cannot underflow the internal counter.examples/progress-tracking.js (1)
28-31: Remove or consolidate unusedcasinstance.The
casvariable created on line 31 is never used for any operations — all store/restore calls usecas2(lines 113, 136, 160, 197). This creates confusion about which instance should be used.♻️ Consolidate to single CAS instance
-// Initialize plumbing and CAS with an EventEmitterObserver +// Initialize plumbing with an EventEmitterObserver const plumbing = GitPlumbing.createDefault({ cwd: repoDir }); const observer = new EventEmitterObserver(); -const cas = ContentAddressableStore.createJson({ plumbing, chunkSize: 128 * 1024 }); // 128 KB chunks +const cas = new ContentAddressableStore({ + plumbing, + chunkSize: 128 * 1024, + observability: observer, +});Then remove lines 105-109 and use
casthroughout instead ofcas2.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@examples/progress-tracking.js` around lines 28 - 31, The file defines two ContentAddressableStore instances (cas and cas2) but only cas2 is used later; remove the unused duplicate by deleting the cas2 creation (the call to ContentAddressableStore.createJson that assigns cas2) and replace all references to cas2 with cas so the single instance created as cas (using plumbing and EventEmitterObserver) is used for all store/restore operations; ensure any configuration (chunkSize, plumbing, observer) applied to cas2 is preserved on the cas creation if needed.src/infrastructure/adapters/EventEmitterObserver.js (1)
22-32: Consider defensive handling for missingactionfield.If
metric()is called withdatathat doesn't include anactionproperty, the event name becomes"${channel}:undefined", which could cause unexpected behavior for listeners expecting well-formed event names.🛡️ Optional defensive check
metric(channel, data) { + if (!data?.action) { + return; // Skip metrics without action + } const eventName = `${channel}:${data.action}`; if (channel === 'error') {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/infrastructure/adapters/EventEmitterObserver.js` around lines 22 - 32, The metric() method builds an event name from data.action but doesn't guard against missing action; update metric(channel, data) to first validate that data && typeof data.action === 'string' (or at least that 'action' in data) and return early (or emit an error) when it's absent to avoid emitting events like "channel:undefined"; locate the metric function and modify the eventName construction and payload creation (the variables eventName and payload and the this.#emitter emits) so you bail out or handle a default action before calling this.#emitter.emit.index.js (1)
170-184: Factory methods don't forward observability/concurrency options.
createJson()andcreateCbor()don't accept or forwardobservabilityorconcurrencyoptions. This is fine for simple use cases since the defaults (SilentObserver, concurrency: 1) are applied, but consumers needing custom observability must use the constructor directly.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@index.js` around lines 170 - 184, The factory methods createJson and createCbor should accept and forward observability and concurrency options to the ContentAddressableStore constructor; update the signatures of createJson({ plumbing, chunkSize, policy, observability, concurrency }) and createCbor({ plumbing, chunkSize, policy, observability, concurrency }) (or use defaults) and include observability and concurrency in the options passed into new ContentAddressableStore({ plumbing, chunkSize, codec: new JsonCodec()/new CborCodec(), policy, observability, concurrency }) so callers can supply custom observers and concurrency settings.test/unit/domain/services/CasService.events.test.js (1)
180-201: Consider reusing thesetup()helper to reduce duplication.
setupSilent()duplicates the persistence mock and service instantiation fromsetup(). You could parameterizesetup()to accept an optional observer:♻️ Proposed refactor
-function setup() { +function setup(observer = new EventEmitterObserver()) { const crypto = new NodeCryptoAdapter(); const blobStore = new Map(); - const observer = new EventEmitterObserver(); // ... rest of setup } -function setupSilent() { - const crypto = new NodeCryptoAdapter(); - const blobStore = new Map(); - const mockPersistence = { - writeBlob: vi.fn().mockImplementation(async (content) => { - const buf = Buffer.isBuffer(content) ? content : Buffer.from(content); - const oid = await crypto.sha256(buf); - blobStore.set(oid, buf); - return oid; - }), - writeTree: vi.fn().mockResolvedValue('mock-tree-oid'), - readBlob: vi.fn().mockImplementation(async (oid) => { - const buf = blobStore.get(oid); - if (!buf) { throw new Error(`Blob not found: ${oid}`); } - return buf; - }), - }; - return new CasService({ - persistence: mockPersistence, crypto, codec: new JsonCodec(), - observability: new SilentObserver(), chunkSize: 1024, - }); -} +function setupSilent() { + return setup(new SilentObserver()).service; +}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@test/unit/domain/services/CasService.events.test.js` around lines 180 - 201, The setupSilent() helper duplicates the persistence mock and CasService construction; refactor so setup accepts an optional observer parameter and reuse it: move the mockPersistence creation and NodeCryptoAdapter/JsonCodec/CasService instantiation into setup (or have setup return { service, mockPersistence }) and update setupSilent to call setup(new SilentObserver()) or setup({ observer: new SilentObserver() }); ensure references to setupSilent, setup, mockPersistence, CasService, NodeCryptoAdapter, JsonCodec, and SilentObserver are updated so tests use the single shared setup factory and remove the duplicated mock implementation.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/domain/services/Semaphore.js`:
- Around line 36-42: The release() method in class Semaphore can decrement the
private field `#active` below zero when called without a held permit; update
Semaphore.release to guard against underflow by checking if this.#queue.length
=== 0 then only decrement this.#active if this.#active > 0, otherwise either
clamp to zero or throw/log a clear error/warning about releasing without a
permit; keep the existing branch that shifts and invokes next() on this.#queue,
and reference the Semaphore.release method and private fields `#queue` and `#active`
when making the change.
In `@src/infrastructure/adapters/StatsCollector.js`:
- Around line 14-17: Guard against missing or non-numeric chunk payloads before
updating metrics: in the StatsCollector logic that handles channel === 'chunk'
(where it currently does this.#chunksProcessed++ and this.#bytesTotal +=
data.size || 0), first safely read and normalize size using a safe access (e.g.
data?.size), coerce to a Number and validate with Number.isFinite (fall back to
0 for undefined/NaN/Infinity), then increment this.#chunksProcessed and add the
validated numeric size to this.#bytesTotal so no exception is thrown and
non-numeric values cannot corrupt the total.
---
Nitpick comments:
In `@examples/progress-tracking.js`:
- Around line 28-31: The file defines two ContentAddressableStore instances (cas
and cas2) but only cas2 is used later; remove the unused duplicate by deleting
the cas2 creation (the call to ContentAddressableStore.createJson that assigns
cas2) and replace all references to cas2 with cas so the single instance created
as cas (using plumbing and EventEmitterObserver) is used for all store/restore
operations; ensure any configuration (chunkSize, plumbing, observer) applied to
cas2 is preserved on the cas creation if needed.
In `@index.js`:
- Around line 170-184: The factory methods createJson and createCbor should
accept and forward observability and concurrency options to the
ContentAddressableStore constructor; update the signatures of createJson({
plumbing, chunkSize, policy, observability, concurrency }) and createCbor({
plumbing, chunkSize, policy, observability, concurrency }) (or use defaults) and
include observability and concurrency in the options passed into new
ContentAddressableStore({ plumbing, chunkSize, codec: new JsonCodec()/new
CborCodec(), policy, observability, concurrency }) so callers can supply custom
observers and concurrency settings.
In `@src/infrastructure/adapters/EventEmitterObserver.js`:
- Around line 22-32: The metric() method builds an event name from data.action
but doesn't guard against missing action; update metric(channel, data) to first
validate that data && typeof data.action === 'string' (or at least that 'action'
in data) and return early (or emit an error) when it's absent to avoid emitting
events like "channel:undefined"; locate the metric function and modify the
eventName construction and payload creation (the variables eventName and payload
and the this.#emitter emits) so you bail out or handle a default action before
calling this.#emitter.emit.
In `@test/unit/domain/services/CasService.events.test.js`:
- Around line 180-201: The setupSilent() helper duplicates the persistence mock
and CasService construction; refactor so setup accepts an optional observer
parameter and reuse it: move the mockPersistence creation and
NodeCryptoAdapter/JsonCodec/CasService instantiation into setup (or have setup
return { service, mockPersistence }) and update setupSilent to call setup(new
SilentObserver()) or setup({ observer: new SilentObserver() }); ensure
references to setupSilent, setup, mockPersistence, CasService,
NodeCryptoAdapter, JsonCodec, and SilentObserver are updated so tests use the
single shared setup factory and remove the duplicated mock implementation.
In `@test/unit/domain/services/Semaphore.test.js`:
- Around line 46-58: Add a regression unit test that asserts over-releasing the
Semaphore is not allowed: create a Semaphore instance (use new Semaphore(1)),
acquire() once, release() once, then assert that a second release() either
throws (expect(() => semaphore.release()).toThrow()) or otherwise fails the
invariant; name the test something like "throws on extra release()" and
reference the Semaphore class and its release()/acquire() methods so future
changes cannot underflow the internal counter.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (39)
CHANGELOG.mdREADME.mdROADMAP.mdbin/git-cas.jsexamples/progress-tracking.jsindex.d.tsindex.jsjsr.jsonpackage.jsonsrc/domain/services/CasService.d.tssrc/domain/services/CasService.jssrc/domain/services/Semaphore.jssrc/infrastructure/adapters/EventEmitterObserver.jssrc/infrastructure/adapters/SilentObserver.jssrc/infrastructure/adapters/StatsCollector.jssrc/ports/ObservabilityPort.jstest/unit/cli/progress.test.jstest/unit/domain/services/CasService.codec.test.jstest/unit/domain/services/CasService.compression.test.jstest/unit/domain/services/CasService.crypto.test.jstest/unit/domain/services/CasService.deleteAsset.test.jstest/unit/domain/services/CasService.empty-file.test.jstest/unit/domain/services/CasService.errors.test.jstest/unit/domain/services/CasService.events.test.jstest/unit/domain/services/CasService.findOrphanedChunks.test.jstest/unit/domain/services/CasService.kdf.test.jstest/unit/domain/services/CasService.key-validation.test.jstest/unit/domain/services/CasService.merkle.test.jstest/unit/domain/services/CasService.parallel.test.jstest/unit/domain/services/CasService.readManifest.test.jstest/unit/domain/services/CasService.restore.test.jstest/unit/domain/services/CasService.restoreStream.test.jstest/unit/domain/services/CasService.stream-error.test.jstest/unit/domain/services/CasService.test.jstest/unit/domain/services/Semaphore.test.jstest/unit/infrastructure/adapters/EventEmitterObserver.test.jstest/unit/infrastructure/adapters/SilentObserver.test.jstest/unit/infrastructure/adapters/StatsCollector.test.jstest/unit/ports/ObservabilityPort.test.js
Code review fixes (11 issues): - C1: drain in-flight writes with Promise.allSettled on stream error - M1: remove dead cas/plumbing variables in progress-tracking example - M2: fix stale JSDoc return types in progress.js - m1: merge duplicate node:fs imports in index.js - m2: document restoreStream yielded-chunk semantics - m3: rename chunksWritten → chunksDispatched in STREAM_ERROR - m5: add parallel stream-error test (concurrency: 4) - N1: rename N → readAhead in _restoreStreaming - N3: move eventName after error guard in EventEmitterObserver Roadmap additions: - 6 visions: snapshot trees, portable bundles, manifest diff, CompressionPort, watch mode, interactive passphrase prompt - 7 concerns with mitigations and defensive test specs: memory amplification, orphaned blobs, chunk size cap, Web Crypto buffering, passphrase exposure, KDF rate limiting, GCM nonces
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/domain/services/CasService.js (1)
107-116:⚠️ Potential issue | 🟠 MajorDrain in-flight writes before rethrowing
CasErrorfrom the source stream.If the source stream throws a
CasError(line 108), the code rethrows immediately without awaitingPromise.allSettled(pending), leaving launched writes unresolved. This can create nondeterministic side effects after the caller handles the exception.Suggested fix
} catch (err) { - if (err instanceof CasError) { throw err; } - const casErr = new CasError( - `Stream error during store: ${err.message}`, - 'STREAM_ERROR', - { chunksDispatched: nextIndex, originalError: err }, - ); await Promise.allSettled(pending); + if (err instanceof CasError) { throw err; } + const casErr = new CasError( + `Stream error during store: ${err.message}`, + 'STREAM_ERROR', + { chunksDispatched: nextIndex, originalError: err }, + ); this.observability.metric('error', { code: casErr.code, message: casErr.message }); throw casErr; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/domain/services/CasService.js` around lines 107 - 116, The catch currently rethrows immediately when err instanceof CasError, leaving pending writes unresolved; change the flow so that in the catch block you always await Promise.allSettled(pending) before rethrowing (i.e., if err instanceof CasError: await Promise.allSettled(pending) then throw err), and for the non-CasError path keep creating casErr, await Promise.allSettled(pending) (as already done), emit the observability.metric, and then throw; this ensures pending (the variable used for launched writes) is drained before any throw from this method.
🧹 Nitpick comments (2)
test/unit/domain/services/CasService.parallel.test.js (2)
61-106: Add assertions that concurrency is actually exercised (not just correctness).These cases verify round-trip integrity, but they don’t prove the semaphore is enforcing/using
concurrency. A fully sequential implementation could still pass. Consider instrumentingwriteBlob/readBlobwith small async delays and tracking max in-flight operations, then assert:
maxInFlight > 1whenconcurrency: 4maxInFlight <= configuredConcurrencyAlso, the
concurrency: 10test currently uses a 1-chunk payload, so it can’t exercise contention.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@test/unit/domain/services/CasService.parallel.test.js` around lines 61 - 106, Instrument the CAS I/O in the tests to assert concurrency is actually used: wrap or monkeypatch the service methods writeBlob and/or readBlob inside the tests (where setup(), storeBuffer(), restoreStream(), and service are available) to introduce a small async delay and track an in-flight counter that updates before/after each call; record maxInFlight and add assertions that maxInFlight > 1 and maxInFlight <= configured concurrency for the "concurrency: 4" cases. Also modify the "concurrency: 10" test to use a larger payload that produces multiple chunks (instead of the 1-chunk 512-byte buffer) so it can exercise contention, then assert maxInFlight respects the configuredConcurrency there as well. Ensure you restore/undo any monkeypatch after each test.
109-142: Strengthen stream-error test to verify in-flight drain behavior.This test validates error mapping and metadata well, but it doesn’t assert that already-dispatched async writes are settled before rejection. Adding that assertion would protect the regression this PR explicitly fixes.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@test/unit/domain/services/CasService.parallel.test.js` around lines 109 - 142, Enhance the "concurrency: 4 — STREAM_ERROR" test to also assert that any already-dispatched async writes have settled before the store() rejection: after catching the CasError from service.store (the failingSource helper), verify via an observable on the service (e.g., a public counter or promise that indicates settled writes) or a spy on the underlying write handler that the number of completed writes matches err.meta.chunksDispatched; if needed add a small helper on the test-only service fixture to await writes completion and assert it resolves before finishing the test.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@ROADMAP.md`:
- Line 2406: Several fenced code blocks in ROADMAP.md are using bare triple
backticks (```) which triggers markdownlint rule MD040; update each of those
fences (the occurrences using ```) to include an explicit language identifier
(for example `text`, `json`, `js`, or `shell`) so they read like ```text or
```json etc.; scan for all triple-backtick blocks in ROADMAP.md and replace the
opening fence tokens to include the appropriate language tag to satisfy MD040
and improve readability.
- Line 2520: Escape the unescaped pipe characters inside inline code that are
breaking table columns; specifically update occurrences such as `compression: {
algorithm: 'gzip' | 'zstd' }` (and any other inline code in the same table
cells) to escape the pipe as `\|` so the Markdown table parser treats it as
text, and verify similar occurrences near the
`this.compression.compress(source)` and `this.compression.decompress(buffer)`
mentions are fixed as well (escape any `|` inside inline code in those cells).
In `@src/domain/services/CasService.js`:
- Around line 32-40: The constructor of CasService should validate the
observability adapter at initialization and fail fast: in the constructor ({
persistence, codec, crypto, observability, ... }) check that observability is
present and implements the required methods used by the class (e.g., typeof
observability.metric === 'function' or other expected methods like
counter/timer) and throw a clear Error if not; update the constructor in
src/domain/services/CasService.js to perform this guard before assigning
this.observability so later calls to metric(...) don't throw unexpectedly.
- Around line 484-486: In the restore path that currently returns early on an
empty manifest (checking manifest.chunks.length === 0) emit the same completion
observability metric/handler used for non-empty restores (the "file:restored"
metric) before returning; locate the early-return in CasService.js (the block
referencing manifest.chunks) and either call the existing metric emission helper
(or this.metrics/observability method used elsewhere in CasService) right before
the return or refactor to centralize final completion emission so empty-manifest
restores also trigger "file:restored".
- Around line 543-557: The prefetched promise array (ahead) produced by
readAndVerify() can leak if an awaited promise (await ahead[i % readAhead])
rejects and the generator exits; wrap the main restore loop that consumes ahead
(the for loop iterating over chunks) in a try...finally so that in the finally
block you await settlement of any remaining in-flight promises in ahead (e.g.,
await Promise.allSettled(ahead.filter(Boolean))) to ensure all readAndVerify()
operations finish; keep using readAhead, chunks, ahead, and readAndVerify names
so the fix is easy to locate.
---
Outside diff comments:
In `@src/domain/services/CasService.js`:
- Around line 107-116: The catch currently rethrows immediately when err
instanceof CasError, leaving pending writes unresolved; change the flow so that
in the catch block you always await Promise.allSettled(pending) before
rethrowing (i.e., if err instanceof CasError: await Promise.allSettled(pending)
then throw err), and for the non-CasError path keep creating casErr, await
Promise.allSettled(pending) (as already done), emit the observability.metric,
and then throw; this ensures pending (the variable used for launched writes) is
drained before any throw from this method.
---
Nitpick comments:
In `@test/unit/domain/services/CasService.parallel.test.js`:
- Around line 61-106: Instrument the CAS I/O in the tests to assert concurrency
is actually used: wrap or monkeypatch the service methods writeBlob and/or
readBlob inside the tests (where setup(), storeBuffer(), restoreStream(), and
service are available) to introduce a small async delay and track an in-flight
counter that updates before/after each call; record maxInFlight and add
assertions that maxInFlight > 1 and maxInFlight <= configured concurrency for
the "concurrency: 4" cases. Also modify the "concurrency: 10" test to use a
larger payload that produces multiple chunks (instead of the 1-chunk 512-byte
buffer) so it can exercise contention, then assert maxInFlight respects the
configuredConcurrency there as well. Ensure you restore/undo any monkeypatch
after each test.
- Around line 109-142: Enhance the "concurrency: 4 — STREAM_ERROR" test to also
assert that any already-dispatched async writes have settled before the store()
rejection: after catching the CasError from service.store (the failingSource
helper), verify via an observable on the service (e.g., a public counter or
promise that indicates settled writes) or a spy on the underlying write handler
that the number of completed writes matches err.meta.chunksDispatched; if needed
add a small helper on the test-only service fixture to await writes completion
and assert it resolves before finishing the test.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
ROADMAP.mdbin/ui/progress.jsexamples/progress-tracking.jsindex.jssrc/domain/services/CasService.jssrc/infrastructure/adapters/EventEmitterObserver.jstest/unit/domain/services/CasService.parallel.test.jstest/unit/domain/services/CasService.stream-error.test.js
🚧 Files skipped from review as they are similar to previous changes (1)
- src/infrastructure/adapters/EventEmitterObserver.js
Major fixes: - Semaphore: guard against permit underflow in release() - StatsCollector: harden chunk metric against malformed payloads - CasService: validate observability port at construction time - CasService: drain in-flight writes before rethrowing CasError - CasService: drain prefetched reads on restore failure (finally block) Minor fixes: - CasService: emit file:restored metric for empty manifests - ROADMAP: add language tags to fenced code blocks (MD040) - ROADMAP: escape pipe characters in table cells (MD056)
|
Re: Drain in-flight writes before rethrowing CasError (outside-diff comment on ✅ Addressed in commit 54ebfc0. Moved |
|
Re: Semaphore over-release regression test (nit in review body) ✅ Also addressed in commit 54ebfc0. Both the guard ( |
|
@coderabbitai review please 🥕 |
|
✅ Actions performedReview triggered.
|
Summary
extends EventEmitteronCasService— domain layer no longer depends on Node's event infrastructure. Three adapters:SilentObserver(no-op default),EventEmitterObserver(backward-compat event bridge),StatsCollector(metric accumulator).restoreStream()—AsyncIterable<Buffer>with O(chunkSize) memory for unencrypted files.restoreFile()rewritten to usecreateWriteStream+pipeline.concurrencyoption with countingSemaphore. Store writes and restore reads are gated for configurable parallelism. Defaultconcurrency: 1preserves sequential behavior.EventEmitterObserverper command, attach progress to observer instead of service.Breaking Changes
CasServiceno longer extendsEventEmitter— useEventEmitterObserveradapter for event subscriptionsobservabilityis now a required constructor port forCasService(facade defaults toSilentObserver)Files
Test plan
npm test— 567 tests pass (524 existing + 43 new)npx eslint .— cleannpx jsr publish --dry-run --allow-dirty— passesgit cas store ./testfile --slug test --tree→ progress bar worksgit cas restore --slug test --out ./restored→ progress bar works, file matchesgit cas vault dashboard→ renders correctlySummary by CodeRabbit
New Features
Breaking Changes
Documentation
Tests