feat(trogon-decider-nats): add JetStream storage adapter#173
Conversation
yordis
commented
May 21, 2026
- Enable decider streams to use durable JetStream-backed persistence.
- Preserve per-stream concurrency guarantees when multiple subjects share one stream.
- Make the adapter behavior reviewable with local and live JetStream coverage.
PR SummaryMedium Risk Overview Updates Reviewed by Cursor Bugbot for commit 497ebee. Bugbot is set up for automated code reviews on this repo. Configure here. |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughIntroduces a new NATS JetStream storage adapter crate ( ChangesJetStream Abstractions and Test Infrastructure
JetStream Storage Adapter for Decider Runtime
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Code Coverage SummaryDetailsDiff against mainResults for commit: 497ebee Minimum allowed coverage is ♻️ This comment has been updated with latest results |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (2)
rsworkspace/crates/trogon-decider-nats/src/stream_store.rs (2)
283-289: 💤 Low valueConfusing error context for timestamp conversion failure.
The error source is
message.subject.to_string(), which doesn't describe the timestamp issue. Consider including the actual timestamp value or a descriptive message.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rsworkspace/crates/trogon-decider-nats/src/stream_store.rs` around lines 283 - 289, The error created in the recorded_at timestamp conversion (DateTime::<Utc>::from_timestamp) uses message.subject as the error source which is misleading; update the StreamStoreError::read_source call in the recorded_at conversion to include the problematic timestamp (e.g., message.time.unix_timestamp() and/or message.time.nanosecond()) or a descriptive string like "invalid timestamp: <value>" instead of message.subject.to_string(), so the error context accurately reflects the timestamp conversion failure.
330-334: ⚖️ Poor tradeoffString-based error construction via
std::io::Error::other(format!(...)).These patterns lose typed error information. Consider adding dedicated error variants (e.g.,
MissingHeader { name: &'static str },MultiValueHeader { name: String }) to preserve structure and enable programmatic error handling.As per coding guidelines: "Errors must be typed—use structs or enums, never
Stringorformat!()."Also applies to: 348-352
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rsworkspace/crates/trogon-decider-nats/src/stream_store.rs` around lines 330 - 334, Replace the string-based io::Error construction with new typed StreamStoreError variants: add enum variants like MultiValueHeader { name: String } and MissingHeader { name: String } (or similar) to StreamStoreError, update the constructor helpers (e.g., the read_source usage) to return these variants instead of std::io::Error::other(format!(...)), and change the failing branches around values.as_slice() / header_name (and the similar block at the other occurrence) to produce the new typed errors so callers can match on MultiValueHeader or MissingHeader instead of parsing strings.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rsworkspace/crates/trogon-decider-nats/README.md`:
- Around line 33-34: The README uses the floating image tag "nats:latest" which
can break tests; update the docker run line to pin a concrete, tested NATS
version (e.g., replace "nats:latest" with a specific tag such as "nats:2.12" or
"nats:2.12.0") so integration tests are reproducible; modify the entry in
README.md where the docker run command appears to use the pinned tag.
In `@rsworkspace/crates/trogon-decider-nats/src/lib.rs`:
- Around line 42-47: Replace the raw String used for JetStream subjects with a
domain-validated value object: define a Subject (or ValidatedSubject) type whose
constructor enforces the JetStream subject rules, change SubjectState.subject
from String to that Subject type, and update any other structs that currently
store a subject string (the other subject-bearing type near lines 60–65) to use
the Subject type as well; ensure the GraphQL/HTTP resolver (boundary) converts
incoming subject strings into Subject exactly once (returning an error on
invalid input) so internal domain code only ever sees the validated Subject
type.
In `@rsworkspace/crates/trogon-decider-nats/src/stream_store.rs`:
- Around line 112-129: The optimistic-concurrency
(expected_last_subject_sequence) guard is applied to the first message but its
AckFuture is discarded for multi-event batches; change the loop in which
build_publish_message / js.publish_message / publish.outbound_message are used
so that when the current publish carries the expected_last_subject_sequence
(e.g., index==0 && expected_last_subject_sequence.is_some()) you await its ack
immediately and return/map any error (preserving
StreamStoreError::publish_source), and for other messages continue to publish
and only keep the final ack in batch_ack as before.
---
Nitpick comments:
In `@rsworkspace/crates/trogon-decider-nats/src/stream_store.rs`:
- Around line 283-289: The error created in the recorded_at timestamp conversion
(DateTime::<Utc>::from_timestamp) uses message.subject as the error source which
is misleading; update the StreamStoreError::read_source call in the recorded_at
conversion to include the problematic timestamp (e.g.,
message.time.unix_timestamp() and/or message.time.nanosecond()) or a descriptive
string like "invalid timestamp: <value>" instead of message.subject.to_string(),
so the error context accurately reflects the timestamp conversion failure.
- Around line 330-334: Replace the string-based io::Error construction with new
typed StreamStoreError variants: add enum variants like MultiValueHeader { name:
String } and MissingHeader { name: String } (or similar) to StreamStoreError,
update the constructor helpers (e.g., the read_source usage) to return these
variants instead of std::io::Error::other(format!(...)), and change the failing
branches around values.as_slice() / header_name (and the similar block at the
other occurrence) to produce the new typed errors so callers can match on
MultiValueHeader or MissingHeader instead of parsing strings.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 6c734416-fbfb-41f5-be9d-71654b274181
⛔ Files ignored due to path filters (1)
rsworkspace/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (6)
rsworkspace/crates/trogon-decider-nats/Cargo.tomlrsworkspace/crates/trogon-decider-nats/README.mdrsworkspace/crates/trogon-decider-nats/src/lib.rsrsworkspace/crates/trogon-decider-nats/src/snapshot_store.rsrsworkspace/crates/trogon-decider-nats/src/stream_store.rsrsworkspace/crates/trogon-decider-nats/tests/jetstream.rs
3bf7848 to
014cd2b
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rsworkspace/crates/trogon-nats/src/jetstream/mocks.rs`:
- Around line 382-387: enqueue_ack_with_sequence pushes a
MockPublishMessageOutcome::Ack(sequence) but doesn't advance the generator
state, so subsequent auto-assigned publishes can return lower sequences; update
the internal next_sequence after enqueuing an explicit ACK (e.g., set
self.next_sequence to max(self.next_sequence, sequence + 1)) while holding the
same lock used for results so sequence remains monotonic; apply the same pattern
to the other enqueue helpers in the 405-435 range that inject explicit
sequences.
- Around line 556-558: set_bucket_name currently updates the internal
bucket_name lock but status() still returns a hardcoded bucket string, making
the mock inconsistent; modify status() (the method that reads bucket name for
status output) to read and return the current value from the same bucket_name
lock (the same Arc<Mutex/...> used by set_bucket_name) so status() reflects
changes made by set_bucket_name and keeps the mock consistent.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: e3a5cad3-6b82-4b12-9407-8c2b7659510c
⛔ Files ignored due to path filters (1)
rsworkspace/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (9)
rsworkspace/crates/trogon-decider-nats/Cargo.tomlrsworkspace/crates/trogon-decider-nats/src/lib.rsrsworkspace/crates/trogon-decider-nats/src/snapshot_store.rsrsworkspace/crates/trogon-decider-nats/src/stream_store.rsrsworkspace/crates/trogon-decider-nats/tests/jetstream.rsrsworkspace/crates/trogon-nats/Cargo.tomlrsworkspace/crates/trogon-nats/src/jetstream/mocks.rsrsworkspace/crates/trogon-nats/src/jetstream/mod.rsrsworkspace/crates/trogon-nats/src/jetstream/traits.rs
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
rsworkspace/crates/trogon-nats/src/jetstream/mocks.rs (1)
384-394:⚠️ Potential issue | 🟠 Major | ⚡ Quick winMake the explicit ACK enqueue atomic with the sequence bump.
Between Line 386 and Line 391, another task can call
publish_message(), see an emptyresultsqueue, and auto-assignsequence + 1before the queued ACK is visible. That still allows11to be observed before10in concurrent tests.Proposed fix
pub fn enqueue_ack_with_sequence(&self, sequence: u64) { - { - let mut next = self.next_sequence.lock().unwrap(); - if sequence >= *next { - *next = sequence + 1; - } - } - self.results - .lock() - .unwrap() - .push_back(MockPublishMessageOutcome::Ack(sequence)); + let mut results = self.results.lock().unwrap(); + let mut next = self.next_sequence.lock().unwrap(); + if sequence >= *next { + *next = sequence + 1; + } + results.push_back(MockPublishMessageOutcome::Ack(sequence)); }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rsworkspace/crates/trogon-nats/src/jetstream/mocks.rs` around lines 384 - 394, enqueue_ack_with_sequence currently bumps next_sequence under next_sequence.lock() and then separately pushes the Ack into results under results.lock(), allowing a concurrent publish_message to observe the bumped sequence before the Ack is visible; make the bump-and-push atomic by holding a single lock while updating next_sequence and pushing MockPublishMessageOutcome::Ack(sequence) so no other task can interleave (either acquire results.lock() while holding next_sequence.lock(), or refactor to a single Mutex protecting both fields) and ensure publish_message uses the same locking scheme.rsworkspace/crates/trogon-decider-nats/src/snapshot_store.rs (1)
107-135:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftReject invalid snapshot ids before they become KV keys.
snapshot_key::<T>("")produces exactly the key shape thatstream_id_from_snapshot_key::<T>(...)rejects as invalid, so this API can construct snapshot identifiers that its own read path considers impossible. Please movestream_idto a validated value object (or another fallible constructor) and use that type throughoutSnapshotChange,read_snapshot, andwrite_snapshot.As per coding guidelines, "Prefer domain-specific value objects over primitives (e.g.,
AcpPrefixinstead ofString). Each type's factory must guarantee correctness at construction—invalid instances should be unrepresentable."Also applies to: 186-190, 212-224, 291-315
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rsworkspace/crates/trogon-decider-nats/src/snapshot_store.rs` around lines 107 - 135, The SnapshotChange API currently accepts raw String stream_ids (via SnapshotChange::upsert and ::delete) which allows invalid snapshot keys (e.g., snapshot_key::<T>("")) that the read path (stream_id_from_snapshot_key::<T>) rejects; create a validated domain value object (e.g., StreamId or StreamIdRef) with a fallible constructor/validator that enforces the valid key invariants, replace all String stream_id fields and constructors in SnapshotChange (Upsert/Delete) to accept this new type, and update read_snapshot, write_snapshot and any callers (including places referenced at lines ~186-190, 212-224, 291-315) to use the validated StreamId so invalid keys cannot be constructed anywhere in the codebase; ensure conversions from &str/String to the value object are explicit and fallible and adjust signatures accordingly.rsworkspace/crates/trogon-decider-nats/src/stream_store.rs (1)
219-220:⚠️ Potential issue | 🟠 Major | ⚡ Quick winMake event-header encoding fallible before calling
PublishMessage::header.
build_publish_messagepassesevent_header_name(...)(currently an infallibleformat!string) intopublish.header(...); inasync-nats0.47.0PublishMessage::headerpanics on invalid header names/values, so malformedevent.headerskeys can crashappend_streaminstead of returningStreamStoreError. Build a fallible NATS header name (e.g.,async_nats::HeaderName::try_from(...)) and return a typedStreamStoreErrorvariant when conversion fails (seebuild_publish_message~219-221 andevent_header_name~363-370, which already contains a TODO for this).🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rsworkspace/crates/trogon-decider-nats/src/stream_store.rs` around lines 219 - 220, build_publish_message currently calls publish.header(event_header_name(...), value.as_str()) which can panic in async-nats 0.47.0 on invalid header names/values; change this to perform fallible conversions before calling PublishMessage::header: use async_nats::HeaderName::try_from(event_header_name(...)) (and async_nats::HeaderValue::try_from(value.as_str()) if needed) and propagate failures as a StreamStoreError variant (add one if missing) so append_stream returns an Err instead of panicking; update event_header_name to return a &str or raw string and do the try_from conversion at the call site in build_publish_message (~219-221) and map conversion errors into StreamStoreError.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@rsworkspace/crates/trogon-decider-nats/src/snapshot_store.rs`:
- Around line 107-135: The SnapshotChange API currently accepts raw String
stream_ids (via SnapshotChange::upsert and ::delete) which allows invalid
snapshot keys (e.g., snapshot_key::<T>("")) that the read path
(stream_id_from_snapshot_key::<T>) rejects; create a validated domain value
object (e.g., StreamId or StreamIdRef) with a fallible constructor/validator
that enforces the valid key invariants, replace all String stream_id fields and
constructors in SnapshotChange (Upsert/Delete) to accept this new type, and
update read_snapshot, write_snapshot and any callers (including places
referenced at lines ~186-190, 212-224, 291-315) to use the validated StreamId so
invalid keys cannot be constructed anywhere in the codebase; ensure conversions
from &str/String to the value object are explicit and fallible and adjust
signatures accordingly.
In `@rsworkspace/crates/trogon-decider-nats/src/stream_store.rs`:
- Around line 219-220: build_publish_message currently calls
publish.header(event_header_name(...), value.as_str()) which can panic in
async-nats 0.47.0 on invalid header names/values; change this to perform
fallible conversions before calling PublishMessage::header: use
async_nats::HeaderName::try_from(event_header_name(...)) (and
async_nats::HeaderValue::try_from(value.as_str()) if needed) and propagate
failures as a StreamStoreError variant (add one if missing) so append_stream
returns an Err instead of panicking; update event_header_name to return a &str
or raw string and do the try_from conversion at the call site in
build_publish_message (~219-221) and map conversion errors into
StreamStoreError.
In `@rsworkspace/crates/trogon-nats/src/jetstream/mocks.rs`:
- Around line 384-394: enqueue_ack_with_sequence currently bumps next_sequence
under next_sequence.lock() and then separately pushes the Ack into results under
results.lock(), allowing a concurrent publish_message to observe the bumped
sequence before the Ack is visible; make the bump-and-push atomic by holding a
single lock while updating next_sequence and pushing
MockPublishMessageOutcome::Ack(sequence) so no other task can interleave (either
acquire results.lock() while holding next_sequence.lock(), or refactor to a
single Mutex protecting both fields) and ensure publish_message uses the same
locking scheme.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: eb7a2eb1-c1a1-4a2d-8c32-2d5b4f89f65c
📒 Files selected for processing (5)
rsworkspace/crates/trogon-decider-nats/src/snapshot_store.rsrsworkspace/crates/trogon-decider-nats/src/stream_store.rsrsworkspace/crates/trogon-decider-nats/tests/jetstream.rsrsworkspace/crates/trogon-nats/src/jetstream/mocks.rsrsworkspace/crates/trogon-nats/src/jetstream/traits.rs
💤 Files with no reviewable changes (1)
- rsworkspace/crates/trogon-nats/src/jetstream/traits.rs
6685218 to
a9e96d9
Compare
There was a problem hiding this comment.
🧹 Nitpick comments (2)
rsworkspace/crates/trogon-nats/src/jetstream/mocks.rs (1)
544-547: 💤 Low valueDocument the intentional EOF error trick.
The
empty_atomic_batch_member_ack_errorfunction usesserde_json::from_str::<PublishAck>("")to create an EOF error that simulates JetStream's atomic batch member acknowledgment behavior. While this works, the intent is non-obvious. Consider adding a brief comment explaining why this specific error type is used.📝 Suggested documentation
fn empty_atomic_batch_member_ack_error() -> PublishError { + // JetStream returns an empty response (no JSON body) for atomic batch members + // that are staged but not yet committed. The async-nats client surfaces this + // as a serde_json EOF error, which we replicate here for mock fidelity. let source = serde_json::from_str::<PublishAck>("").unwrap_err(); PublishError::with_source(PublishErrorKind::Other, source) }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rsworkspace/crates/trogon-nats/src/jetstream/mocks.rs` around lines 544 - 547, Add a brief comment above the function empty_atomic_batch_member_ack_error explaining that calling serde_json::from_str::<PublishAck>("") intentionally triggers a serde EOF/deserialize error to simulate JetStream's atomic batch member acknowledgment behavior (i.e., an empty payload producing a specific error shape), so future readers know this is deliberate and not a bug; reference the function name empty_atomic_batch_member_ack_error and the use of serde_json::from_str::<PublishAck>("") in the comment.rsworkspace/crates/trogon-decider-nats/src/stream_store.rs (1)
285-293: 💤 Low valueMinor allocation per iteration in
read_subject_range_with_stream_id.The
stream_id.to_string()is called once at line 285, but thenstream_id.clone()is called for every message in the range via the closure at line 291. For streams with many messages, consider passing a reference or usingCowto avoid repeated allocations.♻️ Suggested optimization
async fn read_subject_range_with_stream_id<S>( stream: &S, stream_id: &str, subject: &str, from_sequence: u64, to_sequence: u64, ) -> Result<Vec<StreamEvent>, StreamStoreError> where S: JetStreamGetRawMessage, { - let stream_id = stream_id.to_string(); read_message_range( stream, from_sequence, to_sequence, |message| message.subject.as_str() == subject, - |_| stream_id.clone(), + |_| stream_id.to_string(), ) .await }This removes the intermediate allocation outside the loop and makes the allocation explicit per-message, though you could also change the callback signature to return
&strand haverecord_stream_messagetake a reference if you want to avoid all allocations.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rsworkspace/crates/trogon-decider-nats/src/stream_store.rs` around lines 285 - 293, In read_subject_range_with_stream_id: avoid cloning stream_id per message by returning a reference or Cow instead of cloning inside the closure passed to read_message_range; specifically, update the closure passed to read_message_range (currently |_| stream_id.clone()) to return &stream_id (or Cow::Borrowed(&stream_id)) and adjust read_message_range/record_stream_message signatures as needed to accept &str or Cow<'_, str> so you only allocate when necessary.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@rsworkspace/crates/trogon-decider-nats/src/stream_store.rs`:
- Around line 285-293: In read_subject_range_with_stream_id: avoid cloning
stream_id per message by returning a reference or Cow instead of cloning inside
the closure passed to read_message_range; specifically, update the closure
passed to read_message_range (currently |_| stream_id.clone()) to return
&stream_id (or Cow::Borrowed(&stream_id)) and adjust
read_message_range/record_stream_message signatures as needed to accept &str or
Cow<'_, str> so you only allocate when necessary.
In `@rsworkspace/crates/trogon-nats/src/jetstream/mocks.rs`:
- Around line 544-547: Add a brief comment above the function
empty_atomic_batch_member_ack_error explaining that calling
serde_json::from_str::<PublishAck>("") intentionally triggers a serde
EOF/deserialize error to simulate JetStream's atomic batch member acknowledgment
behavior (i.e., an empty payload producing a specific error shape), so future
readers know this is deliberate and not a bug; reference the function name
empty_atomic_batch_member_ack_error and the use of
serde_json::from_str::<PublishAck>("") in the comment.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: dab09198-d988-4f20-a4f8-534c2c00ea1a
📒 Files selected for processing (3)
rsworkspace/crates/trogon-decider-nats/src/lib.rsrsworkspace/crates/trogon-decider-nats/src/stream_store.rsrsworkspace/crates/trogon-nats/src/jetstream/mocks.rs
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rsworkspace/crates/trogon-decider-nats/src/lib.rs`:
- Around line 108-117: The OptimisticConcurrencyConflictError currently sets
current_position from the value read before the publish attempt which can be
stale when a WrongExpectedVersion occurs; update the logic that constructs
OptimisticConcurrencyConflictError (the code path that detects
WrongExpectedVersion) to re-fetch the latest stream position/metadata (e.g.,
call the same helper that returns StreamPosition or head position) and populate
the current_position field with that refreshed value before returning/raising
the error so retries/assertions see the up-to-date position; apply the same
change to the other constructor/site that creates
OptimisticConcurrencyConflictError (the other occurrence referenced) so both use
the refreshed position.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 33fc4b82-ee15-4dd0-a7bf-1c5db98e94f8
📒 Files selected for processing (1)
rsworkspace/crates/trogon-decider-nats/src/lib.rs
| #[derive(Debug, Clone, PartialEq, Eq)] | ||
| /// Optimistic concurrency conflict details for a failed stream append. | ||
| pub struct OptimisticConcurrencyConflictError { | ||
| /// Domain stream id that was being appended. | ||
| pub stream_id: String, | ||
| /// Expected stream state supplied by the caller. | ||
| pub expected: StreamWritePrecondition, | ||
| /// Current stream position observed before publishing. | ||
| pub current_position: Option<StreamPosition>, | ||
| } |
There was a problem hiding this comment.
Refresh the conflict position after WrongExpectedVersion.
current_position here is the value read before the publish attempt. If another writer wins the race, this error can still report the old sequence (or None), so the new OCC details are unreliable for retries and assertions.
💡 Suggested fix
- let stream_position = append_subject_stream(
- self.as_jetstream(),
- subject_state.subject,
- expected_last_subject_sequence,
- &events,
- )
- .await
- .map_err(|source| match source {
- StreamStoreError::WrongExpectedVersion => {
- JetStreamStoreError::OptimisticConcurrencyConflict(OptimisticConcurrencyConflictError {
- stream_id: stream_id.to_string(),
- expected: expected_state,
- current_position,
- })
- }
- other => JetStreamStoreError::AppendStream(other),
- })?;
+ let stream_position = match append_subject_stream(
+ self.as_jetstream(),
+ subject_state.subject,
+ expected_last_subject_sequence,
+ &events,
+ )
+ .await
+ {
+ Ok(stream_position) => stream_position,
+ Err(StreamStoreError::WrongExpectedVersion) => {
+ let current_position = self
+ .subject_resolver
+ .resolve_subject_state(self.events_stream(), stream_id)
+ .await
+ .map_err(JetStreamStoreError::ResolveSubject)?
+ .current_position;
+
+ return Err(JetStreamStoreError::OptimisticConcurrencyConflict(
+ OptimisticConcurrencyConflictError {
+ stream_id: stream_id.to_string(),
+ expected: expected_state,
+ current_position,
+ },
+ ));
+ }
+ Err(other) => return Err(JetStreamStoreError::AppendStream(other)),
+ };Also applies to: 319-325
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@rsworkspace/crates/trogon-decider-nats/src/lib.rs` around lines 108 - 117,
The OptimisticConcurrencyConflictError currently sets current_position from the
value read before the publish attempt which can be stale when a
WrongExpectedVersion occurs; update the logic that constructs
OptimisticConcurrencyConflictError (the code path that detects
WrongExpectedVersion) to re-fetch the latest stream position/metadata (e.g.,
call the same helper that returns StreamPosition or head position) and populate
the current_position field with that refreshed value before returning/raising
the error so retries/assertions see the up-to-date position; apply the same
change to the other constructor/site that creates
OptimisticConcurrencyConflictError (the other occurrence referenced) so both use
the refreshed position.
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
…Stream Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
… details Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
ef33af1 to
1c95bbc
Compare
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 808dad5. Configure here.
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
