feat(scheduler): add NATS execution processor#209
Conversation
yordis
commented
Jun 8, 2026
- Scheduler execution needs durable coordination so workers can recover across restarts without losing schedule progress.
- Reconciliation needs auditable checkpoint and event state so processor ownership decisions stay explainable under failure.
- The processor architecture needs documented boundaries before additional execution behavior lands on top of it.
e6fd4f1 to
69a24b1
Compare
PR SummaryHigh Risk Overview New Reconciliation is pure: create/pause/resume/remove, stale-event no-ops, RRule unsupported, past-
Reviewed by Cursor Bugbot for commit a66706a. Bugbot is set up for automated code reviews on this repo. Configure here. |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughThis PR implements the JetStream-backed execution scheduler processor, adding reconciliation logic that converts persisted schedule-change events into execution schedule state via checkpoint storage, purge-then-publish operations, and lane-dispatched worker processing. It includes telemetry instrumentation, comprehensive test coverage with in-memory doubles, and live NATS integration tests. ChangesScheduler execution pipeline with reconciliation, storage, and lane dispatch
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Possibly related PRs
Poem
✨ Finishing Touches🧪 Generate unit tests (beta)
|
dff948f to
6bf1e57
Compare
enilsen16
left a comment
There was a problem hiding this comment.
Review: NATS execution processor — requesting changes
Solid, well-documented foundation — the per-lane serialization model, the side-effect→checkpoint→ack ordering, and the idempotent purge-then-publish upsert are all sound. I left 7 inline comments; the blocking concerns are the durability/availability ones, since the whole point of this PR is recovery under failure.
Blocking (please address before merge):
- Dispatcher panic isolation (
dispatcher.rs) — workers run as bare futures inside the singleruntask, which istokio::spawned with an unsupervisedJoinHandle. One panic (a poison-pill event, an overflow, a codec edge) aborts the entire dispatcher: all lanes stall, in-flight messages never settle. A single bad message becomes a full-processor outage. Needs per-message panic containment (spawnper worker, orcatch_unwind). list()aborts on one undecodable checkpoint (store.rs) — the recovery/rebuild primitive bails on the first corrupt record despite the doc promising failure keys are skipped. One poison record blocks rebuild for all schedules.- Corrupt checkpoint permanently Terms control events (
processor.rs) — a non-decodable checkpoint poisons every subsequent Pause/Resume/Remove, so a misbehaving schedule cannot be stopped in-band.
Should fix:
4. load_by_subject returns hard-coded revision: 0, breaking the optimistic-save contract for any future caller (test-only today, but exported pub with no guard).
5. reports_tx.send().await inside the select! couples report back-pressure to all dispatching + settlement → wedge if the consumer stalls.
Nits: O(lanes) ready-lane scan on the hot path; hand-rolled base64url that duplicates the base64 crate already in the lockfile.
One process flag: items 1–3 are precisely the failure modes the mock-based unit tests cannot reach (they need real JetStream + injected crash/corruption/panic). Worth confirming the SCHEDULER_NATS_URL integration suite actually runs before merge.
Details + suggested directions are in the inline comments.
Code Coverage SummaryDetailsDiff against mainResults for commit: a66706a Minimum allowed coverage is ♻️ This comment has been updated with latest results |
There was a problem hiding this comment.
Actionable comments posted: 6
🧹 Nitpick comments (2)
rsworkspace/crates/trogon-scheduler/src/processor/execution/nats_execution_tests.rs (1)
163-166: 💤 Low valueShared KV bucket in first test may cause cross-run interference.
The first test uses a fixed
STATE_BUCKETname while the second test uses nonce-suffixed bucket names. If the first test is run multiple times in parallel or leaves stale checkpoint data, subsequent runs could load unexpected checkpoint state for a different nonce-based schedule ID (though the key derivation from schedule ID should prevent collisions).This is acceptable for manual integration tests, but consider using nonce-suffixed bucket names consistently for full isolation.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rsworkspace/crates/trogon-scheduler/src/processor/execution/nats_execution_tests.rs` around lines 163 - 166, The test uses a fixed STATE_BUCKET which can leak state between runs; change the first test to create a nonce-suffixed bucket name (e.g., append a UUID/timestamp/nonce to STATE_BUCKET) before calling key_value so the call key_value(&context, ...) uses the unique bucket, and ensure ScheduleCheckpointStore::new(checkpoint_kv) and the ExecutionScheduleWriter instantiation continue to use that KV so checkpoints are isolated per test run.rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/codec.rs (1)
134-149: ⚡ Quick winPreserve error sources from proto conversion failures.
Lines 141-142 and 145-146 discard the typed errors from
v1::Schedule::try_fromandv1::Delivery::try_from, replacing them with a sourcelessDefinitionConversionerror. As per coding guidelines, never discard error context by converting a typed error into a string; wrap the source error as a field or variant instead.Add a source field to the
DefinitionConversionvariant or introduce separate error variants that preserve the underlyingTryFromerrors.Suggested fix to preserve error sources
Option 1: Add a source field to
DefinitionConversion:#[derive(Debug, thiserror::Error)] pub enum CheckpointCodecError { // ... other variants ... - /// The forward definition conversion to proto failed. - #[error("checkpoint definition could not be encoded to proto")] - DefinitionConversion, + /// The forward definition conversion to proto failed. + #[error("checkpoint definition could not be encoded to proto: {source}")] + DefinitionConversion { + #[source] + source: Box<dyn std::error::Error + Send + Sync>, + }, }Then in
encode_checkpoint_record:schedule: buffa::MessageField::some( v1::Schedule::try_from(&ScheduleEventSchedule::from(&record.schedule)) - .map_err(|_| CheckpointCodecError::DefinitionConversion)?, + .map_err(|e| CheckpointCodecError::DefinitionConversion { source: Box::new(e) })?, ), delivery: buffa::MessageField::some( v1::Delivery::try_from(&ScheduleEventDelivery::from(&record.delivery)) - .map_err(|_| CheckpointCodecError::DefinitionConversion)?, + .map_err(|e| CheckpointCodecError::DefinitionConversion { source: Box::new(e) })?, ),🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/codec.rs` around lines 134 - 149, The proto conversion errors from v1::Schedule::try_from and v1::Delivery::try_from are being swallowed by mapping them to CheckpointCodecError::DefinitionConversion without preserving the source; update the error type and the call sites to keep the original error: add a source field (or distinct variants) to CheckpointCodecError::DefinitionConversion that can hold the underlying TryFrom error type (or Box<dyn Error + Send + Sync>), implement/derive the appropriate From or source wiring, and change the map_err closures around v1::Schedule::try_from and v1::Delivery::try_from to wrap the original error (e.g., map_err(|e| CheckpointCodecError::DefinitionConversion { source: e.into() })). Ensure the Error impl (or thiserror attribute) preserves and returns the source so callers can inspect the original conversion error.Source: Coding guidelines
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@rsworkspace/crates/trogon-nats/src/jetstream/traits.rs`:
- Around line 110-116: The trait JetStreamSubjectPurger currently forces
discarding the SDK purge response; change it to preserve the SDK type by adding
an associated type (e.g., type PurgeResponse) and update the method signature
purge_subject_messages(&self, subject: &str) -> impl Future<Output =
Result<Self::PurgeResponse, Self::Error>> + Send so production implementations
can be zero-cost passthroughs; update the doc comment to describe that
implementations return the raw SDK purge response and leave any conversion to
callers.
- Around line 292-300: The impl for JetStreamSubjectPurger is discarding the SDK
PurgeResponse and wrapping the result to return (), violating the zero-cost
passthrough guideline; change the impl body of purge_subject_messages on
jetstream::stream::Stream to directly return the SDK call (i.e. make the
function body just self.purge().filter(subject).await) and, if needed, update
the JetStreamSubjectPurger trait signature to return the SDK's Result type (e.g.
Result<stream::PurgeResponse, stream::PurgeError>) so no response mapping or
error conversion is performed.
In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/reconcile.rs`:
- Around line 145-154: The code uses the local `current` checkpoint without
verifying it belongs to the expected `schedule_id`, so calls like
`reconcile_resumed` and `purge` can mutate/purge the wrong checkpoint; update
the checks around `current` to validate that `current.schedule_id ==
schedule_id.as_str()` (or equivalent comparison) before proceeding and return
`ReconcileError::MissingCheckpoint { schedule_id:
schedule_id.as_str().to_string() }` (or a more specific error) on mismatch;
apply the same validation in the other block referenced (the logic around lines
170-186) so both `reconcile_resumed` and `purge` only operate on checkpoints
whose `schedule_id` matches the requested `schedule_id`.
- Around line 60-62: ReconcileError's MissingCheckpoint variant (and the other
schedule-related variants in the same enum) currently use String for schedule_id
which loses the domain type; change those enum variants to use the domain type
ScheduleId instead of String, update the error message formatting to rely on
ScheduleId's Display/ToString, and fix all constructors/return sites to pass a
ScheduleId (or convert to one via the proper factory) so the type invariants are
preserved; ensure necessary imports and trait impls for ScheduleId are available
where ReconcileError is defined and where it's constructed.
In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/schedule_key.rs`:
- Around line 23-25: ScheduleKey::for_stream currently takes a raw &str allowing
wire strings to flow into routing/domain APIs; introduce a small newtype (e.g.,
StreamId or StreamRoutingId) that wraps String (or newtype over str) with
From<String>/From<&str> and an as_bytes()/as_ref() accessor to preserve the
“total over arbitrary strings” behavior, change ScheduleKey::for_stream
signature to accept that new type (e.g., pub fn for_stream(id: StreamId) ->
Self) and use id.as_bytes() when calling Uuid::new_v5, and update call sites
(notably where ScheduleKey::for_stream(&event.stream_id)) to convert the
boundary StreamEvent.stream_id into the new StreamId at the I/O/boundary (e.g.,
ScheduleKey::for_stream(event.stream_id.into())). Ensure the newtype lives near
routing/domain types and add trivial tests for conversion and that Uuid
generation unchanged.
In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/processor.rs`:
- Line 186: Replace stringifying errors before calling poison() with a typed
error wrapper: define a PoisonReason enum (e.g., variants Decode { source: E },
Deserialization { source: E }, Execution { source: E }, etc.) that implements
std::error::Error and Display, update ProcessingFailureRecord to store
PoisonReason (or Box<dyn std::error::Error + Send + Sync> / a serde-serializable
wrapper) instead of String, and change the four sites (the Err(error) arms that
currently do Err(format!("event could not be decoded: {error}")) and similar) to
wrap the original error into the appropriate PoisonReason variant (e.g.,
PoisonReason::Decode { source: error }) before passing it to poison(); ensure
PoisonReason/ProcessingFailureRecord are serializable for durable records.
---
Nitpick comments:
In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/codec.rs`:
- Around line 134-149: The proto conversion errors from v1::Schedule::try_from
and v1::Delivery::try_from are being swallowed by mapping them to
CheckpointCodecError::DefinitionConversion without preserving the source; update
the error type and the call sites to keep the original error: add a source field
(or distinct variants) to CheckpointCodecError::DefinitionConversion that can
hold the underlying TryFrom error type (or Box<dyn Error + Send + Sync>),
implement/derive the appropriate From or source wiring, and change the map_err
closures around v1::Schedule::try_from and v1::Delivery::try_from to wrap the
original error (e.g., map_err(|e| CheckpointCodecError::DefinitionConversion {
source: e.into() })). Ensure the Error impl (or thiserror attribute) preserves
and returns the source so callers can inspect the original conversion error.
In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/nats_execution_tests.rs`:
- Around line 163-166: The test uses a fixed STATE_BUCKET which can leak state
between runs; change the first test to create a nonce-suffixed bucket name
(e.g., append a UUID/timestamp/nonce to STATE_BUCKET) before calling key_value
so the call key_value(&context, ...) uses the unique bucket, and ensure
ScheduleCheckpointStore::new(checkpoint_kv) and the ExecutionScheduleWriter
instantiation continue to use that KV so checkpoints are isolated per test run.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 2223c3e2-e550-46e1-ba0c-10a4af3d7e3c
⛔ Files ignored due to path filters (1)
rsworkspace/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (33)
rsworkspace/Cargo.tomlrsworkspace/crates/trogon-decider-nats/src/store.rsrsworkspace/crates/trogon-decider-nats/src/stream_store.rsrsworkspace/crates/trogon-nats/Cargo.tomlrsworkspace/crates/trogon-nats/src/jetstream/mocks.rsrsworkspace/crates/trogon-nats/src/jetstream/mod.rsrsworkspace/crates/trogon-nats/src/jetstream/traits.rsrsworkspace/crates/trogon-scheduler/Cargo.tomlrsworkspace/crates/trogon-scheduler/src/lib.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/codec.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/failure.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/mod.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/record.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/store.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/execution_schedules/mod.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/mod.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/nats_execution_tests.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/go_duration.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/mod.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/reconcile.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/recorded_events.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/request.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/schedule_key.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/schedule_subject.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/worker/consumer.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/worker/dispatcher.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/worker/mod.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/worker/processor.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/worker/testkit.rsrsworkspace/crates/trogon-scheduler/src/processor/mod.rsrsworkspace/crates/trogon-scheduler/src/telemetry/metrics.rsrsworkspace/crates/trogon-scheduler/src/telemetry/mod.rsrsworkspace/crates/trogon-scheduler/src/telemetry/trace.rs
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/reconcile.rs (1)
154-154: 💤 Low valueAvoid unnecessary clone in schedule-id comparison.
The comparison
current.schedule_id != schedule_id.clone()allocates a newScheduleIdwhen a reference comparison suffices. Compare against the dereferenced value instead.Suggested fix
- if current.schedule_id != schedule_id.clone() { + if ¤t.schedule_id != schedule_id {Apply the same change at line 191.
Also applies to: 191-191
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/reconcile.rs` at line 154, Replace the unnecessary allocation caused by schedule_id.clone() when comparing with current.schedule_id: change the comparisons from "current.schedule_id != schedule_id.clone()" to dereference the schedule_id reference (e.g., "current.schedule_id != *schedule_id") in reconcile.rs; apply the same change for the second occurrence around the other comparison at the noted location (the two places that use current.schedule_id and schedule_id).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/consumer.rs`:
- Around line 69-85: The DeliveredMessage impl for JetStreamDeliveredMessage
currently swallows settlement errors in ack(), term(), and retry() (using `let _
= …`) which hides delivery failures; update the implementation to surface errors
instead: either change the DeliveredMessage trait (in dispatcher.rs) so its
async methods return Result<(), E> (propagate the SDK errors from
JetStreamDeliveredMessage::ack/ack_with through ack(), term(), retry()), or if
changing the trait is not feasible immediately, at minimum log the error results
from self.message.ack().await and self.message.ack_with(...).await in ack(),
term(), and retry() using the existing logging facility so failures are
observable; ensure references to DeliveredMessage, JetStreamDeliveredMessage,
ack(), term(), retry(), and the DeliveredMessage::Output signature (in
dispatcher.rs) are updated/used accordingly.
---
Nitpick comments:
In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/reconcile.rs`:
- Line 154: Replace the unnecessary allocation caused by schedule_id.clone()
when comparing with current.schedule_id: change the comparisons from
"current.schedule_id != schedule_id.clone()" to dereference the schedule_id
reference (e.g., "current.schedule_id != *schedule_id") in reconcile.rs; apply
the same change for the second occurrence around the other comparison at the
noted location (the two places that use current.schedule_id and schedule_id).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: a09b8c62-6214-4ec0-b5d1-c9863b36b6d1
📒 Files selected for processing (15)
rsworkspace/crates/trogon-nats/src/jetstream/mocks.rsrsworkspace/crates/trogon-nats/src/jetstream/traits.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/store.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/execution_schedules/mod.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/nats_execution_tests.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/mod.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/reconcile.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/schedule_key.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/worker/consumer.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/worker/dispatcher.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/worker/mod.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/worker/processor.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/worker/testkit.rsrsworkspace/crates/trogon-scheduler/src/telemetry/metrics.rsrsworkspace/crates/trogon-scheduler/src/telemetry/trace.rs
✅ Files skipped from review due to trivial changes (1)
- rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/mod.rs
🚧 Files skipped from review as they are similar to previous changes (9)
- rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/mod.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/schedule_key.rs
- rsworkspace/crates/trogon-scheduler/src/telemetry/metrics.rs
- rsworkspace/crates/trogon-scheduler/src/telemetry/trace.rs
- rsworkspace/crates/trogon-nats/src/jetstream/mocks.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/nats_execution_tests.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/testkit.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/execution_schedules/mod.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/store.rs
There was a problem hiding this comment.
🧹 Nitpick comments (1)
rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/dispatcher.rs (1)
43-48: ⚡ Quick win
DeliveredMessagesettlement methods useStringfor errors.The trait signature defines
Result<(), String>forack,term, andretry, which violates the coding guideline requiring typed errors. This propagates String-based errors throughout the settlement flow.Consider defining a
SettlementErrorenum that can capture different failure modes (network errors, protocol errors, etc.) while remainingSend + Sync. The trait could then returnResult<(), SettlementError>.As per coding guidelines: "Errors must be typed—use structs or enums, never
Stringorformat!()."🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/dispatcher.rs` around lines 43 - 48, Change the settlement methods on DeliveredMessage to return a typed error instead of String: define a Send + Sync enum (e.g. SettlementError) that models failure cases (e.g. Network, Protocol, NotFound, Internal) and implement conversions (From) from underlying error types, then update the trait signatures for ack(), term(), and retry() to return Result<(), SettlementError>; ensure SettlementError derives/send+sync bounds and update any callers/propagations to map their String-based errors into the new enum.Source: Coding guidelines
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/dispatcher.rs`:
- Around line 43-48: Change the settlement methods on DeliveredMessage to return
a typed error instead of String: define a Send + Sync enum (e.g.
SettlementError) that models failure cases (e.g. Network, Protocol, NotFound,
Internal) and implement conversions (From) from underlying error types, then
update the trait signatures for ack(), term(), and retry() to return Result<(),
SettlementError>; ensure SettlementError derives/send+sync bounds and update any
callers/propagations to map their String-based errors into the new enum.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: b71ff458-18ea-4363-9f72-29985e9cd838
📒 Files selected for processing (10)
.github/workflows/ci-rust.ymlrsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/codec.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/store.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/reconcile.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/recorded_events.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/request.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/worker/consumer.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/worker/dispatcher.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/worker/processor.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/worker/testkit.rs
🚧 Files skipped from review as they are similar to previous changes (5)
- rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/request.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/store.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/codec.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/testkit.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/reconcile.rs
d2fd067 to
402c21a
Compare
There was a problem hiding this comment.
Actionable comments posted: 5
🧹 Nitpick comments (1)
rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/codec.rs (1)
47-49: ⚡ Quick winKeep the underlying proto conversion error.
Lines 169-174 collapse both
TryFromfailures into the unitDefinitionConversion, so the save path loses which conversion failed and why. Keep separate source-carrying variants for schedule and delivery encoding instead of erasing the original typed error here.As per coding guidelines, "Never discard error context by converting a typed error into a string; wrap the source error as a field or variant instead."
Also applies to: 169-175
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/codec.rs` around lines 47 - 49, DefinitionConversion in the checkpoint codec erases the underlying proto conversion errors; change the enum to keep source-carrying variants (e.g., DefinitionScheduleConversion and DefinitionDeliveryConversion) that wrap the specific conversion error types returned by the TryFrom implementations so you don't lose context, then update the TryFrom impls that currently map both failures to DefinitionConversion to return the specific wrapped variant (or use #[from] if using thiserror) so the original typed error is preserved for both schedule and delivery encoding.Source: Coding guidelines
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@_TODO.md`:
- Around line 21-51: ScheduleCheckpoint is reusing
trogonai.scheduler.schedules.v1.Schedule, .Delivery, and .Message which couples
checkpoint wire format to the event schema; change ScheduleCheckpoint to embed
checkpoint-specific message types (e.g., ScheduleCheckpointSchedule,
ScheduleCheckpointDelivery, ScheduleCheckpointMessage) that duplicate the needed
fields from v1.Schedule/v1.Delivery/v1.Message or alternatively add a clear
comment/contract explaining version stability if you intentionally keep sharing;
update the ScheduleCheckpoint definition to reference the new checkpoint-scoped
message types and copy any required fields so checkpoint evolution is
independent of the event schema.
- Line 61: The comment references a non-existent function name; update the
mention to the actual function `decode_checkpoint_envelope` (or preserve its
behavior) and ensure any text or TODOs that say `decode_checkpoint_watermark`
are corrected to `decode_checkpoint_envelope`; if the intent was to discuss
replacing behavior, reword to “Preserve `decode_checkpoint_envelope` behavior or
replace it with an explicit recovery strategy” and ensure any surrounding
documentation or references (e.g., in checkpoints/codec.rs) point to the real
public function `decode_checkpoint_envelope`.
In `@rsworkspace/crates/trogon-scheduler/Cargo.toml`:
- Line 12: The Cargo.toml for trogon-scheduler lists async-nats with only
features = ["jetstream"] but the repository expects server_2_12 to be provided
by trogon-nats; either explicitly enable the server_2_12 feature on async-nats
in trogon-scheduler's dependency declaration (async-nats = { workspace = true,
features = ["jetstream", "server_2_12"] }) or update the layer/README comment to
state that trogon-nats supplies server_2_12 so developers know the feature is
coming from trogon-nats; adjust the trogon-scheduler Cargo.toml or its
documentation accordingly and ensure references to async_nats::HeaderMap and
async_nats::jetstream::{kv, consumer} remain valid.
In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/codec.rs`:
- Around line 121-130: StoredScheduleCheckpoint currently persists a wire-format
v1::ScheduleCreated blob (definition: Vec<u8>) which allows invalid wire shapes
into storage; replace that with a checkpoint-local DTO that mirrors the
validated domain Schedule definition (e.g., fields for schedule name,
cron/expression, timezone, etc.), update StoredScheduleCheckpoint to store that
DTO instead of Vec<u8>, change serialization/deserialization to (de)serialize
the DTO, and modify the load/restore code paths (the code handling
StoredScheduleCheckpoint and the code currently re-validating
v1::ScheduleCreated bytes) to stop decoding and re-validating proto bytes and
instead reconstruct domain objects directly from the DTO; apply the same change
to the other occurrences noted around the blocks that create/consume the
definition blob (the regions you flagged) so that wire types
(v1::ScheduleCreated) are only parsed once at the boundary and never persisted.
- Around line 200-218: The code decodes a ScheduleCreated from stored.definition
but never verifies that the embedded schedule ID matches the routing schedule_id
derived from stored.schedule_id; add a check after
v1::ScheduleCreated::decode_from_slice (after retrieving created.schedule via
created.schedule.as_option()) to compare the decoded schedule's ID (the field
accessed by schedule_from_proto input or created.schedule.schedule_id
equivalent) against the earlier schedule_id (from schedule_id_from) and return
an appropriate CheckpointCodecError::Domain (or other existing domain error) if
they differ, so ScheduleKey::derive/ ScheduleSubject::execution and the decoded
definition cannot be mismatched.
---
Nitpick comments:
In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/codec.rs`:
- Around line 47-49: DefinitionConversion in the checkpoint codec erases the
underlying proto conversion errors; change the enum to keep source-carrying
variants (e.g., DefinitionScheduleConversion and DefinitionDeliveryConversion)
that wrap the specific conversion error types returned by the TryFrom
implementations so you don't lose context, then update the TryFrom impls that
currently map both failures to DefinitionConversion to return the specific
wrapped variant (or use #[from] if using thiserror) so the original typed error
is preserved for both schedule and delivery encoding.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 803c669d-735d-42ec-8676-c34ca2ee3e2c
⛔ Files ignored due to path filters (1)
rsworkspace/Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (34)
_TODO.mdrsworkspace/Cargo.tomlrsworkspace/crates/trogon-decider-nats/src/store.rsrsworkspace/crates/trogon-decider-nats/src/stream_store.rsrsworkspace/crates/trogon-nats/Cargo.tomlrsworkspace/crates/trogon-nats/src/jetstream/mocks.rsrsworkspace/crates/trogon-nats/src/jetstream/mod.rsrsworkspace/crates/trogon-nats/src/jetstream/traits.rsrsworkspace/crates/trogon-scheduler/Cargo.tomlrsworkspace/crates/trogon-scheduler/src/lib.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/codec.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/failure.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/mod.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/record.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/store.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/execution_schedules/mod.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/mod.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/nats_execution_tests.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/go_duration.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/mod.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/reconcile.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/recorded_events.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/request.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/schedule_key.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/schedule_subject.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/worker/consumer.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/worker/dispatcher.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/worker/mod.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/worker/processor.rsrsworkspace/crates/trogon-scheduler/src/processor/execution/worker/testkit.rsrsworkspace/crates/trogon-scheduler/src/processor/mod.rsrsworkspace/crates/trogon-scheduler/src/telemetry/metrics.rsrsworkspace/crates/trogon-scheduler/src/telemetry/mod.rsrsworkspace/crates/trogon-scheduler/src/telemetry/trace.rs
✅ Files skipped from review due to trivial changes (3)
- rsworkspace/crates/trogon-nats/Cargo.toml
- rsworkspace/crates/trogon-scheduler/src/processor/mod.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/schedule_subject.rs
🚧 Files skipped from review as they are similar to previous changes (28)
- rsworkspace/crates/trogon-nats/src/jetstream/mod.rs
- rsworkspace/Cargo.toml
- rsworkspace/crates/trogon-decider-nats/src/stream_store.rs
- rsworkspace/crates/trogon-scheduler/src/lib.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/schedule_key.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/go_duration.rs
- rsworkspace/crates/trogon-decider-nats/src/store.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/mod.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/record.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/mod.rs
- rsworkspace/crates/trogon-scheduler/src/telemetry/mod.rs
- rsworkspace/crates/trogon-scheduler/src/telemetry/metrics.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/mod.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/mod.rs
- rsworkspace/crates/trogon-nats/src/jetstream/mocks.rs
- rsworkspace/crates/trogon-scheduler/src/telemetry/trace.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/failure.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/execution_schedules/mod.rs
- rsworkspace/crates/trogon-nats/src/jetstream/traits.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/consumer.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/request.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/recorded_events.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/testkit.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/nats_execution_tests.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/store.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/dispatcher.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/reconcile.rs
- rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/processor.rs
bfefc42 to
1682289
Compare
1682289 to
8e24844
Compare
8e24844 to
ac8b61c
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes using default effort and found 1 potential issue.
There are 2 total unresolved issues (including 1 from previous review).
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit ac8b61c. Configure here.
ac8b61c to
7d383cd
Compare
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
7d383cd to
a66706a
Compare
