Skip to content

feat(scheduler): add NATS execution processor#209

Merged
yordis merged 1 commit into
mainfrom
yordis/nats-scheduler-plan
Jun 10, 2026
Merged

feat(scheduler): add NATS execution processor#209
yordis merged 1 commit into
mainfrom
yordis/nats-scheduler-plan

Conversation

@yordis

@yordis yordis commented Jun 8, 2026

Copy link
Copy Markdown
Member
  • Scheduler execution needs durable coordination so workers can recover across restarts without losing schedule progress.
  • Reconciliation needs auditable checkpoint and event state so processor ownership decisions stay explainable under failure.
  • The processor architecture needs documented boundaries before additional execution behavior lands on top of it.

@yordis yordis force-pushed the yordis/nats-scheduler-plan branch from e6fd4f1 to 69a24b1 Compare June 8, 2026 20:23
@cursor

cursor Bot commented Jun 8, 2026

Copy link
Copy Markdown

PR Summary

High Risk
Introduces core scheduling infrastructure (JetStream publishes/purges, optimistic KV checkpoints, poison/retry semantics) where bugs could duplicate or drop scheduled work.

Overview
Adds a NATS-first execution processor in trogon-scheduler that consumes persisted schedule events, reconciles them into JetStream message schedules, and keeps a rebuildable KV checkpoint per schedule (stream position, status, definition, last reconcile outcome).

New checkpoints_v1 protobuf types define the checkpoint wire format (decoupled from the command event stream). Rust codec maps between event protos and checkpoint protos, can scan corrupt blobs for watermark/event id for idempotency, and persists JSON failure records for undecodable payloads before ack.

Reconciliation is pure: create/pause/resume/remove, stale-event no-ops, RRule unsupported, past-@at expiry with a grace window, purge on pause/remove. Execution writes use purge-then-publish on deterministic subjects so exactly one scheduled message exists regardless of Nats-Msg-Id duplicate windows.

trogon-nats gains JetStreamSubjectPurger (real stream purge + mock). Workspace picks up NATS server_2_12, testcontainers for optional live NATS integration tests (SCHEDULER_NATS_URL).

Reviewed by Cursor Bugbot for commit a66706a. Bugbot is set up for automated code reviews on this repo. Configure here.

@coderabbitai

coderabbitai Bot commented Jun 8, 2026

Copy link
Copy Markdown

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

This PR implements the JetStream-backed execution scheduler processor, adding reconciliation logic that converts persisted schedule-change events into execution schedule state via checkpoint storage, purge-then-publish operations, and lane-dispatched worker processing. It includes telemetry instrumentation, comprehensive test coverage with in-memory doubles, and live NATS integration tests.

Changes

Scheduler execution pipeline with reconciliation, storage, and lane dispatch

Layer / File(s) Summary
Workspace dependencies, coverage guards, and module structure
rsworkspace/Cargo.toml, rsworkspace/crates/trogon-scheduler/Cargo.toml, rsworkspace/crates/trogon-decider-nats/src/*.rs, rsworkspace/crates/trogon-scheduler/src/lib.rs, rsworkspace/crates/trogon-scheduler/src/processor/mod.rs
Adds base64 workspace dependency; gates test-only decider-nats reads behind cfg(any(test, not(coverage))); expands scheduler dependencies for JetStream, async runtime, telemetry, and testing; declares processor and telemetry module entrypoints.
JetStream purge abstraction and test support
rsworkspace/crates/trogon-nats/src/jetstream/traits.rs, rsworkspace/crates/trogon-nats/src/jetstream/mocks.rs, rsworkspace/crates/trogon-nats/src/jetstream/mod.rs, rsworkspace/crates/trogon-nats/Cargo.toml
Introduces JetStreamSubjectPurger trait with purge_subject_messages, implements it for Stream, adds MockJetStreamPurger test double with failure injection and purged_subjects tracking, wires re-exports, and enables async-nats server_2_12 feature.
Schedule key and subject primitives
rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/schedule_key.rs, rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/schedule_subject.rs, rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/mod.rs
Defines deterministic UUIDv5-based ScheduleKey and StreamRoutingId for schedule aggregation; adds ScheduleSubject wrapping NATS tokens with execution and event namespace prefixes; wires reconciliation module entrypoint with crate-visible re-exports.
Event decoding from proto to domain changes
rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/go_duration.rs, rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/recorded_events.rs
Implements Go-duration formatting with nanosecond overflow validation; decodes v1::ScheduleEvent proto into domain ScheduleChange with all schedule kinds (At/Every/Cron/RRule), delivery (NATS route + optional TTL/sampling), message (UTF-8 + headers), and timezone/protobuf conversion helpers.
NATS schedule request construction and header validation
rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/request.rs
Builds validated ScheduleRequest with scheduler-owned NATS headers (Nats-Schedule, Nats-Schedule-Target, Nats-Message-ID, correlation headers) and Go-duration TTL/Every encoding; rejects reserved headers case-insensitively and validates delivery-target differs from schedule subject.
Schedule reconciliation state machine
rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/reconcile.rs
Implements reconcile function that converts schedule-change events into Publish/Purge/CheckpointOnly actions with checkpoint advancement; detects stale streams, paused creation, past At expiry, unsupported RRule, and resume-from-removed/corrupt placeholders.
Checkpoint record types and JSON codec
rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/record.rs, rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/codec.rs, rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/failure.rs, rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/mod.rs
Defines ScheduleCheckpointRecord with schedule identity/state/outcome tracking; encodes/decodes checkpoints with embedded buffa-serialized definitions and watermark/event-id metadata; adds ProcessingFailureRecord for durable failure tracking.
KV-backed checkpoint store with optimistic concurrency
rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/store.rs
Implements ScheduleCheckpointStore for load-by-key/id/subject, save (create for new, update for existing), list with filtering, and idempotent failure recording; distinguishes transient KV errors from codec errors with corrupt metadata extraction.
Execution schedule publishing operations
rsworkspace/crates/trogon-scheduler/src/processor/execution/execution_schedules/mod.rs
Implements ExecutionScheduleWriter that purges then publishes schedules with merged trace headers and Nats-Msg-Id; classifies all write errors (Purge/Publish/Ack) as transient.
Metrics and trace propagation
rsworkspace/crates/trogon-scheduler/src/telemetry/metrics.rs, rsworkspace/crates/trogon-scheduler/src/telemetry/trace.rs, rsworkspace/crates/trogon-scheduler/src/telemetry/mod.rs
Adds ProcessorMetrics with OpenTelemetry counters for outcomes/publishes/purges/redeliveries; implements W3C trace-context extraction/injection for propagating headers into execution publishes.
JetStream consumer configuration and delivery abstraction
rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/consumer.rs
Defines scheduler pull-consumer contract with stream/filter/durable/bucket constants; adds JetStreamDeliveredMessage wrapper implementing DeliveredMessage with redelivery detection and settlement routing (ack/term/retry).
Schedule event processor orchestration
rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/processor.rs
Implements ScheduleProcessor with end-to-end flow: decode/load checkpoint/reconcile/apply with OpenTelemetry span linking, outcome metrics, transient/poison classification, corrupt-checkpoint recovery, and durable failure recording.
Per-lane concurrent event dispatcher
rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/dispatcher.rs, rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/mod.rs
Implements spawn_dispatcher creating a lane-based FIFO queue per schedule key with bounded concurrency, strict per-lane serial processing, settlement panic handling, and per-lane report draining; wires worker module re-exports.
In-memory NATS doubles for testing
rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/testkit.rs
Provides InMemoryKv with transient failure injection, corruption controls, and watermark rewriting; InMemoryExecution with per-subject message history; stream event fixture builders with deterministic event IDs and timestamps.
Live NATS/JetStream end-to-end tests
rsworkspace/crates/trogon-scheduler/src/processor/execution/nats_execution_tests.rs
Adds ignored-by-default tests verifying purge-then-publish message convergence, checkpoint persistence, and command-event-to-execution-schedule workflow against live NATS/JetStream infrastructure.

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Possibly related PRs

  • TrogonStack/trogonai#187: Scheduler proto codec error type (ScheduleEventPayloadError) used in event decoding module re-exports.
  • TrogonStack/trogonai#183: Defines v1 schedule proto types (ScheduleCreated, delivery, message structures) embedded in checkpoint codec and decoded by event layer.
  • TrogonStack/trogonai#65: Extends JetStream abstractions in trogon-nats module with additional trait/mock patterns similar to the new JetStreamSubjectPurger.

Poem

🐇 Hops through the scheduler's winding path,
Events decoded, reconciled with care,
Checkpoints saved in ordered rows of state,
Lanes dispatch in concert, none despair—
A rabbit's work made durable and fair!

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch yordis/nats-scheduler-plan

@yordis yordis force-pushed the yordis/nats-scheduler-plan branch 2 times, most recently from dff948f to 6bf1e57 Compare June 8, 2026 20:25
Comment thread rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/store.rs Outdated
Comment thread rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/store.rs Outdated
Comment thread rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/store.rs Outdated
Comment thread rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/processor.rs Outdated
Comment thread rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/dispatcher.rs Outdated
Comment thread rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/dispatcher.rs Outdated

@enilsen16 enilsen16 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review: NATS execution processor — requesting changes

Solid, well-documented foundation — the per-lane serialization model, the side-effect→checkpoint→ack ordering, and the idempotent purge-then-publish upsert are all sound. I left 7 inline comments; the blocking concerns are the durability/availability ones, since the whole point of this PR is recovery under failure.

Blocking (please address before merge):

  1. Dispatcher panic isolation (dispatcher.rs) — workers run as bare futures inside the single run task, which is tokio::spawned with an unsupervised JoinHandle. One panic (a poison-pill event, an overflow, a codec edge) aborts the entire dispatcher: all lanes stall, in-flight messages never settle. A single bad message becomes a full-processor outage. Needs per-message panic containment (spawn per worker, or catch_unwind).
  2. list() aborts on one undecodable checkpoint (store.rs) — the recovery/rebuild primitive bails on the first corrupt record despite the doc promising failure keys are skipped. One poison record blocks rebuild for all schedules.
  3. Corrupt checkpoint permanently Terms control events (processor.rs) — a non-decodable checkpoint poisons every subsequent Pause/Resume/Remove, so a misbehaving schedule cannot be stopped in-band.

Should fix:
4. load_by_subject returns hard-coded revision: 0, breaking the optimistic-save contract for any future caller (test-only today, but exported pub with no guard).
5. reports_tx.send().await inside the select! couples report back-pressure to all dispatching + settlement → wedge if the consumer stalls.

Nits: O(lanes) ready-lane scan on the hot path; hand-rolled base64url that duplicates the base64 crate already in the lockfile.

One process flag: items 1–3 are precisely the failure modes the mock-based unit tests cannot reach (they need real JetStream + injected crash/corruption/panic). Worth confirming the SCHEDULER_NATS_URL integration suite actually runs before merge.

Details + suggested directions are in the inline comments.

Comment thread rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/processor.rs Outdated
Comment thread rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/processor.rs Outdated
@github-actions

github-actions Bot commented Jun 8, 2026

Copy link
Copy Markdown

badge

Code Coverage Summary

Details
Filename                                                                              Stmts    Miss  Cover    Missing
----------------------------------------------------------------------------------  -------  ------  -------  ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
crates/mcp-nats/src/nats/subjects/mod.rs                                                 89       0  100.00%
crates/trogon-gateway/src/source/telegram/config.rs                                      89       0  100.00%
crates/trogon-gateway/src/source/telegram/registration.rs                               313       0  100.00%
crates/trogon-gateway/src/source/telegram/signature.rs                                   27       0  100.00%
crates/trogon-gateway/src/source/telegram/server.rs                                     339       0  100.00%
crates/mcp-nats/src/telemetry/transport.rs                                                6       0  100.00%
crates/trogon-nats/src/mocks.rs                                                         314       0  100.00%
crates/trogon-nats/src/token.rs                                                           6       0  100.00%
crates/trogon-nats/src/auth.rs                                                          114       0  100.00%
crates/trogon-nats/src/nats_token.rs                                                    157       0  100.00%
crates/trogon-nats/src/client.rs                                                         22      22  0.00%    50-86
crates/trogon-nats/src/subject_token_violation.rs                                        11       0  100.00%
crates/trogon-nats/src/messaging.rs                                                     534       2  99.63%   144, 154
crates/trogon-nats/src/connect.rs                                                        82       6  92.68%   41-46
crates/trogon-scheduler/src/processor/execution/checkpoints/failure.rs                   38       0  100.00%
crates/trogon-scheduler/src/processor/execution/checkpoints/store.rs                    407      17  95.82%   102, 120, 124, 132, 224-230, 236, 279-283
crates/trogon-scheduler/src/processor/execution/checkpoints/codec.rs                    641      68  89.39%   134, 140, 149, 192, 208-210, 227, 244-246, 415, 417-419, 453-464, 480-481, 486-487, 493-494, 507-508, 513-514, 519-523, 529-530, 545-546, 551-552, 558-559, 566-567, 572-573, 585-589, 595-597, 612-618, 626, 631-633, 643, 648
crates/trogon-scheduler/src/processor/execution/checkpoints/record.rs                     6       0  100.00%
crates/acp-nats-server/src/acp_connection_id.rs                                          37       0  100.00%
crates/acp-nats-server/src/main.rs                                                      896      10  98.88%   100, 231-238, 437
crates/acp-nats-server/src/config.rs                                                    126       3  97.62%   41-43
crates/acp-nats-server/src/connection.rs                                                182      36  80.22%   95-102, 107-122, 138, 140-141, 146, 155-156, 161, 165, 169, 172, 180, 184, 187, 190-194, 232
crates/acp-nats-server/src/transport.rs                                                1915     106  94.46%   253, 512, 530, 557, 611, 616, 635, 647, 766, 789-791, 843, 860-863, 958-961, 1035, 1038, 1041, 1050, 1054, 1057, 1060-1063, 1082, 1114-1117, 1125-1130, 1142-1146, 1150-1159, 1171-1172, 1190-1191, 1201, 1217-1221, 1249-1255, 1275-1277, 1282-1286, 1289-1294, 1311, 1313-1314, 1396-1397, 1409-1410, 1430-1431, 1483-1499, 2204, 2248, 2301, 2357, 2370
crates/trogon-gateway/src/source/linear/server.rs                                       386       0  100.00%
crates/trogon-gateway/src/source/linear/signature.rs                                     54       1  98.15%   16
crates/trogon-gateway/src/source/linear/config.rs                                        17       0  100.00%
crates/trogon-std/src/fs/mem.rs                                                         216      10  95.37%   61-63, 77-79, 132-134, 157
crates/trogon-std/src/fs/system.rs                                                       92       0  100.00%
crates/acp-nats/src/nats/subjects/client_ops/session_request_permission.rs               12       0  100.00%
crates/acp-nats/src/nats/subjects/client_ops/fs_read_text_file.rs                        12       0  100.00%
crates/acp-nats/src/nats/subjects/client_ops/fs_write_text_file.rs                       12       0  100.00%
crates/acp-nats/src/nats/subjects/client_ops/terminal_create.rs                          12       0  100.00%
crates/acp-nats/src/nats/subjects/client_ops/terminal_wait_for_exit.rs                   12       0  100.00%
crates/acp-nats/src/nats/subjects/client_ops/terminal_output.rs                          12       0  100.00%
crates/acp-nats/src/nats/subjects/client_ops/terminal_kill.rs                            12       0  100.00%
crates/acp-nats/src/nats/subjects/client_ops/terminal_release.rs                         12       0  100.00%
crates/acp-nats/src/nats/subjects/client_ops/session_update.rs                           12       0  100.00%
crates/mcp-nats-server/src/config.rs                                                    257       0  100.00%
crates/mcp-nats-server/src/main.rs                                                      357     127  64.43%   149-166, 202-204, 214, 220-221, 228-231, 255-257, 261-270, 292-305, 310-358, 489, 492, 500-542
crates/mcp-nats-server/src/allowed_host.rs                                               87       0  100.00%
crates/trogon-decider-runtime/src/execution.rs                                         1304       0  100.00%
crates/trogon-std/src/telemetry/http.rs                                                 217       0  100.00%
crates/trogon-gateway/src/source/standard_webhooks.rs                                   138       0  100.00%
crates/trogon-gateway/src/source/twitter/config.rs                                       17       0  100.00%
crates/trogon-gateway/src/source/twitter/server.rs                                      525       0  100.00%
crates/trogon-gateway/src/source/twitter/signature.rs                                    58       0  100.00%
crates/trogon-scheduler/src/commands/domain/message.rs                                  219       0  100.00%
crates/trogon-scheduler/src/commands/domain/schedule.rs                                 638       0  100.00%
crates/trogon-scheduler/src/commands/domain/schedule_event_delivery.rs                   25       0  100.00%
crates/trogon-scheduler/src/commands/domain/schedule_event_sampling_source.rs            20       0  100.00%
crates/trogon-scheduler/src/commands/domain/schedule_event_schedule.rs                   83       0  100.00%
crates/trogon-scheduler/src/commands/domain/schedule_event_status.rs                     10       0  100.00%
crates/trogon-scheduler/src/commands/domain/schedule_id.rs                               81       0  100.00%
crates/trogon-scheduler/src/processor/execution/reconciliation/recorded_events.rs       565      13  97.70%   194-199, 240, 246, 252, 285, 295, 313, 397, 482, 490, 767, 875
crates/trogon-scheduler/src/processor/execution/reconciliation/request.rs               326       3  99.08%   100, 115, 121
crates/trogon-scheduler/src/processor/execution/reconciliation/schedule_key.rs           67       0  100.00%
crates/trogon-scheduler/src/processor/execution/reconciliation/schedule_subject.rs       48       0  100.00%
crates/trogon-scheduler/src/processor/execution/reconciliation/go_duration.rs            59       0  100.00%
crates/trogon-scheduler/src/processor/execution/reconciliation/reconcile.rs             502       0  100.00%
crates/trogon-decider/src/act.rs                                                         62       0  100.00%
crates/trogon-decider/src/testing.rs                                                    654       0  100.00%
crates/trogon-decider/src/events.rs                                                      49       0  100.00%
crates/trogon-decider/src/decision.rs                                                    27       0  100.00%
crates/trogon-decider/src/lib.rs                                                        138       0  100.00%
crates/trogon-service-config/src/lib.rs                                                  92       0  100.00%
crates/trogon-std/src/signal.rs                                                          26      12  53.85%   6-11, 18-25, 34
crates/trogon-std/src/json.rs                                                            30       0  100.00%
crates/trogon-std/src/uuid.rs                                                             7       0  100.00%
crates/trogon-std/src/args.rs                                                            19       9  52.63%   11-28
crates/trogon-std/src/http.rs                                                            19       0  100.00%
crates/trogon-std/src/secret_string.rs                                                   32       0  100.00%
crates/trogon-std/src/duration.rs                                                        42       0  100.00%
crates/acp-nats-agent/src/connection.rs                                                1260       1  99.92%   590
crates/mcp-nats-stdio/src/main.rs                                                       204       0  100.00%
crates/mcp-nats-stdio/src/config.rs                                                     149       0  100.00%
crates/trogon-gateway/src/source/incidentio/incidentio_signing_secret.rs                 56       0  100.00%
crates/trogon-gateway/src/source/incidentio/config.rs                                    16       0  100.00%
crates/trogon-gateway/src/source/incidentio/incidentio_event_type.rs                     62       0  100.00%
crates/trogon-gateway/src/source/incidentio/server.rs                                   343       0  100.00%
crates/trogon-gateway/src/source/incidentio/signature.rs                                206       0  100.00%
crates/trogonai-proto/src/convert.rs                                                    120       0  100.00%
crates/trogonai-proto/src/codec.rs                                                       16       0  100.00%
crates/trogon-gateway/src/source/microsoft_graph/client_state.rs                         30       0  100.00%
crates/trogon-gateway/src/source/microsoft_graph/server.rs                              325       0  100.00%
crates/trogonai-proto/src/scheduler/schedules/codec.rs                                  248       0  100.00%
crates/trogon-scheduler/src/telemetry/trace.rs                                           41       0  100.00%
crates/trogon-scheduler/src/telemetry/metrics.rs                                         52       0  100.00%
crates/acp-nats/src/nats/subjects/global/ext_notify.rs                                    9       0  100.00%
crates/acp-nats/src/nats/subjects/global/ext.rs                                           9       0  100.00%
crates/acp-nats/src/nats/subjects/global/session_list.rs                                  6       0  100.00%
crates/acp-nats/src/nats/subjects/global/authenticate.rs                                  6       0  100.00%
crates/acp-nats/src/nats/subjects/global/initialize.rs                                    6       0  100.00%
crates/acp-nats/src/nats/subjects/global/logout.rs                                        6       0  100.00%
crates/acp-nats/src/nats/subjects/global/session_new.rs                                   6       0  100.00%
crates/trogon-decider-runtime/src/snapshot/mod.rs                                         3       0  100.00%
crates/trogon-decider-runtime/src/snapshot/snapshot_type.rs                              73       0  100.00%
crates/trogon-decider-runtime/src/snapshot/read_snapshot.rs                              11       0  100.00%
crates/trogon-decider-runtime/src/headers/header_map.rs                                  54       3  94.44%   20-22
crates/trogon-decider-runtime/src/headers/header_value.rs                                34       0  100.00%
crates/trogon-decider-runtime/src/headers/header_name.rs                                 28       0  100.00%
crates/trogon-decider-runtime/src/headers/mod.rs                                         74       0  100.00%
crates/trogon-gateway/src/source/slack/config.rs                                         58       0  100.00%
crates/trogon-gateway/src/source/slack/signature.rs                                      66       0  100.00%
crates/trogon-gateway/src/source/slack/server.rs                                        854       0  100.00%
crates/trogon-gateway/src/source/slack/socket_mode.rs                                   716       0  100.00%
crates/trogon-gateway/src/source/gitlab/gitlab_signing_token.rs                          61       0  100.00%
crates/trogon-gateway/src/source/gitlab/server.rs                                       460       0  100.00%
crates/trogon-gateway/src/source/gitlab/signature.rs                                    165       0  100.00%
crates/trogon-decider-nats/src/snapshot_store.rs                                        861      27  96.86%   208-210, 248-250, 361-367, 449, 585, 590, 686-688, 694-696, 730-731, 741-742, 761, 789-790
crates/trogon-decider-nats/src/store.rs                                                 127      47  62.99%   35-160
crates/trogon-decider-nats/src/stream_store.rs                                          659      18  97.27%   70-72, 245, 273-274, 277, 293-297, 464-465, 506, 519-523
crates/acp-nats/src/nats/subjects/mod.rs                                                362       0  100.00%
crates/acp-nats/src/nats/subjects/stream.rs                                              56       0  100.00%
crates/acp-nats-stdio/src/config.rs                                                      66       0  100.00%
crates/acp-nats-stdio/src/main.rs                                                       135      25  81.48%   65, 113-120, 126-128, 145, 174-193
crates/trogon-decider-runtime/src/snapshot/codec/snapshot_payload_decode.rs               3       0  100.00%
crates/trogon-decider-runtime/src/snapshot/codec/snapshot_envelope_encode_error.rs       14       0  100.00%
crates/trogon-decider-runtime/src/snapshot/codec/snapshot_decode_error.rs                49       0  100.00%
crates/trogon-decider-runtime/src/snapshot/codec/encoded_snapshot.rs                    117       0  100.00%
crates/trogon-decider-runtime/src/snapshot/codec/snapshot_encode_error.rs                36       0  100.00%
crates/trogon-decider-runtime/src/snapshot/codec/snapshot_envelope_decode_error.rs       28       0  100.00%
crates/acp-nats/src/nats/subjects/subscriptions/one_client.rs                            15       0  100.00%
crates/acp-nats/src/nats/subjects/subscriptions/all_agent.rs                              9       0  100.00%
crates/acp-nats/src/nats/subjects/subscriptions/all_agent_ext.rs                          9       0  100.00%
crates/acp-nats/src/nats/subjects/subscriptions/all_client.rs                             9       0  100.00%
crates/acp-nats/src/nats/subjects/subscriptions/all_session.rs                            9       0  100.00%
crates/acp-nats/src/nats/subjects/subscriptions/global_all.rs                             9       0  100.00%
crates/acp-nats/src/nats/subjects/subscriptions/one_agent.rs                             15       0  100.00%
crates/acp-nats/src/nats/subjects/subscriptions/prompt_wildcard.rs                        9       0  100.00%
crates/acp-nats/src/nats/subjects/subscriptions/one_session.rs                           12       0  100.00%
crates/trogon-gateway/src/source/discord/config.rs                                      105       0  100.00%
crates/trogon-gateway/src/source/discord/gateway.rs                                     426       1  99.77%   137
crates/acp-nats/src/agent/resume_session.rs                                              90       0  100.00%
crates/acp-nats/src/agent/load_session.rs                                                89       0  100.00%
crates/acp-nats/src/agent/authenticate.rs                                                49       0  100.00%
crates/acp-nats/src/agent/test_support.rs                                               267       0  100.00%
crates/acp-nats/src/agent/logout.rs                                                      49       0  100.00%
crates/acp-nats/src/agent/set_session_model.rs                                           67       0  100.00%
crates/acp-nats/src/agent/set_session_mode.rs                                            67       0  100.00%
crates/acp-nats/src/agent/fork_session.rs                                                94       0  100.00%
crates/acp-nats/src/agent/bridge.rs                                                     123       4  96.75%   108-111
crates/acp-nats/src/agent/ext_method.rs                                                  82       0  100.00%
crates/acp-nats/src/agent/prompt.rs                                                     471       0  100.00%
crates/acp-nats/src/agent/cancel.rs                                                     101       0  100.00%
crates/acp-nats/src/agent/set_session_config_option.rs                                   67       0  100.00%
crates/acp-nats/src/agent/list_sessions.rs                                               47       0  100.00%
crates/acp-nats/src/agent/initialize.rs                                                  79       0  100.00%
crates/acp-nats/src/agent/close_session.rs                                               63       0  100.00%
crates/acp-nats/src/agent/new_session.rs                                                 82       0  100.00%
crates/acp-nats/src/agent/mod.rs                                                         65       0  100.00%
crates/acp-nats/src/agent/ext_notification.rs                                            82       0  100.00%
crates/acp-nats/src/agent/js_request.rs                                                 283       0  100.00%
crates/acp-nats/src/client/rpc_reply.rs                                                  64       0  100.00%
crates/acp-nats/src/client/terminal_release.rs                                          335       0  100.00%
crates/acp-nats/src/client/ext.rs                                                       296       8  97.30%   146-155, 172-181
crates/acp-nats/src/client/fs_read_text_file.rs                                         346       0  100.00%
crates/acp-nats/src/client/fs_write_text_file.rs                                        408       0  100.00%
crates/acp-nats/src/client/terminal_create.rs                                           264       0  100.00%
crates/acp-nats/src/client/mod.rs                                                      2851       0  100.00%
crates/acp-nats/src/client/ext_session_prompt_response.rs                               135       0  100.00%
crates/acp-nats/src/client/terminal_kill.rs                                             278       0  100.00%
crates/acp-nats/src/client/terminal_output.rs                                           206       0  100.00%
crates/acp-nats/src/client/request_permission.rs                                        298       0  100.00%
crates/acp-nats/src/client/terminal_wait_for_exit.rs                                    364       0  100.00%
crates/acp-nats/src/client/session_update.rs                                             55       0  100.00%
crates/acp-nats/src/nats/parsing.rs                                                     278       1  99.64%   151
crates/acp-nats/src/nats/extensions.rs                                                    3       0  100.00%
crates/acp-nats/src/nats/mod.rs                                                          23       0  100.00%
crates/trogon-gateway/src/source_status.rs                                               24       0  100.00%
crates/trogon-gateway/src/http.rs                                                       145       0  100.00%
crates/trogon-gateway/src/source_integration_id.rs                                       55       2  96.36%   58, 60
crates/trogon-gateway/src/streams.rs                                                    129       0  100.00%
crates/trogon-gateway/src/config.rs                                                    2591      42  98.38%   84, 668, 671, 831, 888, 971, 974, 977, 981, 1065-1072, 1149, 1152, 1155, 1160, 1218, 1221, 1224, 1303, 1306, 1309, 1313, 1377, 1380, 1383, 1446, 1449, 1452, 1457, 1532, 1535, 1538, 1543, 1601, 1604, 1607, 1820-1822
crates/trogon-gateway/src/source_plugin.rs                                              269       3  98.88%   82, 141-142
crates/trogon-gateway/src/main.rs                                                       111       0  100.00%
crates/trogon-decider-runtime/src/stream/mod.rs                                          38       0  100.00%
crates/trogon-decider-runtime/src/stream/append_stream.rs                                 5       0  100.00%
crates/trogon-decider-runtime/src/stream/read_stream.rs                                   7       0  100.00%
crates/trogon-decider-runtime/src/stream/stream_position.rs                              26       0  100.00%
crates/trogon-gateway/src/source/sentry/sentry_client_secret.rs                          17       0  100.00%
crates/trogon-gateway/src/source/sentry/server.rs                                       308       0  100.00%
crates/trogon-gateway/src/source/sentry/signature.rs                                     42       0  100.00%
crates/mcp-nats/src/nats/subjects/server/get_task.rs                                     12       0  100.00%
crates/mcp-nats/src/nats/subjects/server/initialize.rs                                   12       0  100.00%
crates/mcp-nats/src/nats/subjects/server/list_tasks.rs                                   12       0  100.00%
crates/mcp-nats/src/nats/subjects/server/list_prompts.rs                                 12       0  100.00%
crates/mcp-nats/src/nats/subjects/server/list_resource_templates.rs                      12       0  100.00%
crates/mcp-nats/src/nats/subjects/server/list_tools.rs                                   12       0  100.00%
crates/mcp-nats/src/nats/subjects/server/logging_message.rs                              12       0  100.00%
crates/mcp-nats/src/nats/subjects/server/ping.rs                                          9       0  100.00%
crates/mcp-nats/src/nats/subjects/server/resource_updated.rs                             12       0  100.00%
crates/mcp-nats/src/nats/subjects/server/subscribe_resource.rs                           12       0  100.00%
crates/mcp-nats/src/nats/subjects/server/complete.rs                                     12       0  100.00%
crates/mcp-nats/src/nats/subjects/server/call_tool.rs                                    12       0  100.00%
crates/mcp-nats/src/nats/subjects/server/prompt_list_changed.rs                          12       0  100.00%
crates/mcp-nats/src/nats/subjects/server/get_prompt.rs                                   12       0  100.00%
crates/mcp-nats/src/nats/subjects/server/cancel_task.rs                                  12       0  100.00%
crates/mcp-nats/src/nats/subjects/server/read_resource.rs                                12       0  100.00%
crates/mcp-nats/src/nats/subjects/server/cancelled.rs                                    12       0  100.00%
crates/mcp-nats/src/nats/subjects/server/elicitation_completed.rs                        12       0  100.00%
crates/mcp-nats/src/nats/subjects/server/resource_list_changed.rs                        12       0  100.00%
crates/mcp-nats/src/nats/subjects/server/set_logging_level.rs                            12       0  100.00%
crates/mcp-nats/src/nats/subjects/server/tool_list_changed.rs                            12       0  100.00%
crates/mcp-nats/src/nats/subjects/server/unsubscribe_resource.rs                         12       0  100.00%
crates/mcp-nats/src/nats/subjects/server/get_task_result.rs                              12       0  100.00%
crates/mcp-nats/src/nats/subjects/server/list_resources.rs                               12       0  100.00%
crates/mcp-nats/src/nats/subjects/server/progress.rs                                     12       0  100.00%
crates/acp-nats/src/nats/subjects/responses/response.rs                                  20       0  100.00%
crates/acp-nats/src/nats/subjects/responses/ext_ready.rs                                 12       0  100.00%
crates/acp-nats/src/nats/subjects/responses/prompt_response.rs                           27       0  100.00%
crates/acp-nats/src/nats/subjects/responses/cancelled.rs                                 15       0  100.00%
crates/acp-nats/src/nats/subjects/responses/update.rs                                    27       0  100.00%
crates/mcp-nats/src/nats/mod.rs                                                          99       0  100.00%
crates/mcp-nats/src/nats/parsing.rs                                                     191       0  100.00%
crates/trogon-nats/src/jetstream/claim_check.rs                                         405      10  97.53%   45-47, 116-122
crates/trogon-nats/src/jetstream/create_conflicts.rs                                     24       0  100.00%
crates/trogon-nats/src/jetstream/publish.rs                                              64       0  100.00%
crates/trogon-nats/src/jetstream/traits.rs                                               46      40  13.04%   181-251
crates/trogon-nats/src/jetstream/stream_max_age.rs                                       18       0  100.00%
crates/trogon-nats/src/jetstream/mocks.rs                                              1686       1  99.94%   505
crates/acp-nats/src/nats/subjects/commands/set_mode.rs                                   15       0  100.00%
crates/acp-nats/src/nats/subjects/commands/cancel.rs                                     15       0  100.00%
crates/acp-nats/src/nats/subjects/commands/fork.rs                                       15       0  100.00%
crates/acp-nats/src/nats/subjects/commands/set_model.rs                                  15       0  100.00%
crates/acp-nats/src/nats/subjects/commands/load.rs                                       15       0  100.00%
crates/acp-nats/src/nats/subjects/commands/resume.rs                                     15       0  100.00%
crates/acp-nats/src/nats/subjects/commands/set_config_option.rs                          15       0  100.00%
crates/acp-nats/src/nats/subjects/commands/close.rs                                      15       0  100.00%
crates/acp-nats/src/nats/subjects/commands/prompt.rs                                     15       0  100.00%
crates/trogon-nats/src/telemetry/messaging.rs                                            82       0  100.00%
crates/trogon-std/src/env/in_memory.rs                                                   73       0  100.00%
crates/trogon-std/src/env/system.rs                                                      17       0  100.00%
crates/acp-nats/src/config.rs                                                           203       0  100.00%
crates/acp-nats/src/session_id.rs                                                        68       0  100.00%
crates/acp-nats/src/ext_method_name.rs                                                   65       0  100.00%
crates/acp-nats/src/lib.rs                                                               69       0  100.00%
crates/acp-nats/src/client_proxy.rs                                                     181       0  100.00%
crates/acp-nats/src/acp_prefix.rs                                                        46       0  100.00%
crates/acp-nats/src/error.rs                                                             82       0  100.00%
crates/acp-nats/src/jsonrpc.rs                                                            6       0  100.00%
crates/acp-nats/src/in_flight_slot_guard.rs                                              32       0  100.00%
crates/acp-nats/src/pending_prompt_waiters.rs                                           131       0  100.00%
crates/acp-nats/src/req_id.rs                                                            39       0  100.00%
crates/trogon-decider-runtime/src/event/codec/event_payload_error.rs                     36       0  100.00%
crates/trogon-decider-runtime/src/event/codec/event_decode.rs                            29       0  100.00%
crates/acp-nats/src/telemetry/metrics.rs                                                 53       0  100.00%
crates/acp-nats/src/jetstream/provision.rs                                               52       0  100.00%
crates/acp-nats/src/jetstream/consumers.rs                                               91       0  100.00%
crates/acp-nats/src/jetstream/streams.rs                                                163       4  97.55%   206-208, 218
crates/acp-nats/src/jetstream/ext_policy.rs                                              26       0  100.00%
crates/trogon-gateway/src/source/github/config.rs                                        17       0  100.00%
crates/trogon-gateway/src/source/github/signature.rs                                     50       0  100.00%
crates/trogon-gateway/src/source/github/server.rs                                       328       0  100.00%
crates/trogon-scheduler/src/commands/snapshot.rs                                          4       0  100.00%
crates/trogon-scheduler/src/commands/state.rs                                           188       0  100.00%
crates/trogon-scheduler/src/commands/pause_schedule.rs                                  158       0  100.00%
crates/trogon-scheduler/src/commands/resume_schedule.rs                                 165       0  100.00%
crates/trogon-scheduler/src/commands/remove_schedule.rs                                 169       0  100.00%
crates/trogon-scheduler/src/commands/create_schedule.rs                                 182       0  100.00%
crates/trogon-decider-runtime/src/event/event_identity.rs                                 3       0  100.00%
crates/trogon-decider-runtime/src/event/mod.rs                                          170       0  100.00%
crates/trogon-decider-runtime/src/event/event_id.rs                                      32       0  100.00%
crates/trogon-decider-runtime/src/event/stream_event.rs                                   8       0  100.00%
crates/mcp-nats/src/server.rs                                                            31       0  100.00%
crates/mcp-nats/src/transport.rs                                                        698       0  100.00%
crates/mcp-nats/src/mcp_peer_id.rs                                                       31       0  100.00%
crates/mcp-nats/src/jsonrpc.rs                                                           22       0  100.00%
crates/mcp-nats/src/mcp_prefix.rs                                                        34       0  100.00%
crates/mcp-nats/src/config.rs                                                           110       0  100.00%
crates/mcp-nats/src/client.rs                                                            31       0  100.00%
crates/trogon-std/src/dirs/fixed.rs                                                      80       0  100.00%
crates/trogon-std/src/dirs/system.rs                                                     71       0  100.00%
crates/trogon-telemetry/src/resource_attribute.rs                                        23       0  100.00%
crates/trogon-telemetry/src/metric.rs                                                    26       1  96.15%   30
crates/trogon-telemetry/src/service_name.rs                                              44       0  100.00%
crates/trogon-telemetry/src/log.rs                                                       70       1  98.57%   35
crates/trogon-telemetry/src/trace.rs                                                     23       1  95.65%   24
crates/trogon-telemetry/src/lib.rs                                                      208      24  88.46%   54, 118, 123, 128, 138-139, 145-163, 199, 202, 205, 211
crates/trogon-nats/src/lease/provision.rs                                               187      10  94.65%   82-92
crates/trogon-nats/src/lease/nats_kv_lease_config.rs                                     26       0  100.00%
crates/trogon-nats/src/lease/renew_interval.rs                                           57       0  100.00%
crates/trogon-nats/src/lease/lease_key.rs                                                19       0  100.00%
crates/trogon-nats/src/lease/renew.rs                                                   246      19  92.28%   23-29, 48-59
crates/trogon-nats/src/lease/lease_timing.rs                                             15       0  100.00%
crates/trogon-nats/src/lease/ttl.rs                                                      68       0  100.00%
crates/trogon-nats/src/lease/mod.rs                                                     523      13  97.51%   113-126
crates/trogon-nats/src/lease/acquire.rs                                                   5       5  0.00%    9-14
crates/trogon-nats/src/lease/release.rs                                                   5       5  0.00%    8-12
crates/trogon-nats/src/lease/lease_bucket.rs                                             19       0  100.00%
crates/mcp-nats/src/nats/subjects/client/list_roots.rs                                   12       0  100.00%
crates/mcp-nats/src/nats/subjects/client/create_elicitation.rs                           12       0  100.00%
crates/mcp-nats/src/nats/subjects/client/create_message.rs                               12       0  100.00%
crates/mcp-nats/src/nats/subjects/client/cancelled.rs                                    12       0  100.00%
crates/mcp-nats/src/nats/subjects/client/initialized.rs                                  12       0  100.00%
crates/mcp-nats/src/nats/subjects/client/roots_list_changed.rs                           12       0  100.00%
crates/mcp-nats/src/nats/subjects/client/ping.rs                                          9       0  100.00%
crates/mcp-nats/src/nats/subjects/client/progress.rs                                     12       0  100.00%
crates/trogon-scheduler/src/processor/execution/worker/dispatcher.rs                    911      12  98.68%   171-172, 183, 187-188, 578, 917-923
crates/trogon-scheduler/src/processor/execution/worker/processor.rs                     913      10  98.90%   264, 324, 435-436, 442, 484-487, 579
crates/trogon-scheduler/src/processor/execution/worker/consumer.rs                      203       0  100.00%
crates/trogon-scheduler/src/processor/execution/worker/testkit.rs                       278       0  100.00%
crates/trogon-gateway/src/source/notion/notion_verification_token.rs                     17       0  100.00%
crates/trogon-gateway/src/source/notion/signature.rs                                     45       0  100.00%
crates/trogon-gateway/src/source/notion/verification_token.rs                           220       0  100.00%
crates/trogon-gateway/src/source/notion/notion_event_type.rs                             46       3  93.48%   50-52
crates/trogon-gateway/src/source/notion/server.rs                                       310       4  98.71%   115-116, 135-136
crates/trogon-scheduler/src/processor/execution/execution_schedules/mod.rs              190       0  100.00%
crates/mcp-nats/src/nats/subjects/subscriptions/all_server.rs                             6       0  100.00%
crates/mcp-nats/src/nats/subjects/subscriptions/all_client.rs                             6       0  100.00%
crates/mcp-nats/src/nats/subjects/subscriptions/one_server.rs                             9       0  100.00%
crates/mcp-nats/src/nats/subjects/subscriptions/one_client.rs                             9       0  100.00%
crates/trogon-std/src/time/mock.rs                                                      125       0  100.00%
crates/trogon-std/src/time/system.rs                                                     31       0  100.00%
TOTAL                                                                                 47453     790  98.34%

Diff against main

Filename                                                                              Stmts    Miss  Cover
----------------------------------------------------------------------------------  -------  ------  --------
crates/trogon-scheduler/src/processor/execution/checkpoints/failure.rs                  +38       0  +100.00%
crates/trogon-scheduler/src/processor/execution/checkpoints/store.rs                   +407     +17  +95.82%
crates/trogon-scheduler/src/processor/execution/checkpoints/codec.rs                   +641     +68  +89.39%
crates/trogon-scheduler/src/processor/execution/checkpoints/record.rs                    +6       0  +100.00%
crates/trogon-scheduler/src/processor/execution/reconciliation/recorded_events.rs      +565     +13  +97.70%
crates/trogon-scheduler/src/processor/execution/reconciliation/request.rs              +326      +3  +99.08%
crates/trogon-scheduler/src/processor/execution/reconciliation/schedule_key.rs          +67       0  +100.00%
crates/trogon-scheduler/src/processor/execution/reconciliation/schedule_subject.rs      +48       0  +100.00%
crates/trogon-scheduler/src/processor/execution/reconciliation/go_duration.rs           +59       0  +100.00%
crates/trogon-scheduler/src/processor/execution/reconciliation/reconcile.rs            +502       0  +100.00%
crates/trogonai-proto/src/convert.rs                                                    +73       0  +100.00%
crates/trogon-scheduler/src/telemetry/trace.rs                                          +41       0  +100.00%
crates/trogon-scheduler/src/telemetry/metrics.rs                                        +52       0  +100.00%
crates/trogon-nats/src/jetstream/traits.rs                                               +6       0  +13.04%
crates/trogon-nats/src/jetstream/mocks.rs                                               +49      -6  +0.37%
crates/trogon-scheduler/src/processor/execution/worker/dispatcher.rs                   +911     +12  +98.68%
crates/trogon-scheduler/src/processor/execution/worker/processor.rs                    +913     +10  +98.90%
crates/trogon-scheduler/src/processor/execution/worker/consumer.rs                     +203       0  +100.00%
crates/trogon-scheduler/src/processor/execution/worker/testkit.rs                      +278       0  +100.00%
crates/trogon-scheduler/src/processor/execution/execution_schedules/mod.rs             +190       0  +100.00%
TOTAL                                                                                 +5375    +117  -0.07%

Results for commit: a66706a

Minimum allowed coverage is 95%

♻️ This comment has been updated with latest results

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (2)
rsworkspace/crates/trogon-scheduler/src/processor/execution/nats_execution_tests.rs (1)

163-166: 💤 Low value

Shared KV bucket in first test may cause cross-run interference.

The first test uses a fixed STATE_BUCKET name while the second test uses nonce-suffixed bucket names. If the first test is run multiple times in parallel or leaves stale checkpoint data, subsequent runs could load unexpected checkpoint state for a different nonce-based schedule ID (though the key derivation from schedule ID should prevent collisions).

This is acceptable for manual integration tests, but consider using nonce-suffixed bucket names consistently for full isolation.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/nats_execution_tests.rs`
around lines 163 - 166, The test uses a fixed STATE_BUCKET which can leak state
between runs; change the first test to create a nonce-suffixed bucket name
(e.g., append a UUID/timestamp/nonce to STATE_BUCKET) before calling key_value
so the call key_value(&context, ...) uses the unique bucket, and ensure
ScheduleCheckpointStore::new(checkpoint_kv) and the ExecutionScheduleWriter
instantiation continue to use that KV so checkpoints are isolated per test run.
rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/codec.rs (1)

134-149: ⚡ Quick win

Preserve error sources from proto conversion failures.

Lines 141-142 and 145-146 discard the typed errors from v1::Schedule::try_from and v1::Delivery::try_from, replacing them with a sourceless DefinitionConversion error. As per coding guidelines, never discard error context by converting a typed error into a string; wrap the source error as a field or variant instead.

Add a source field to the DefinitionConversion variant or introduce separate error variants that preserve the underlying TryFrom errors.

Suggested fix to preserve error sources

Option 1: Add a source field to DefinitionConversion:

 #[derive(Debug, thiserror::Error)]
 pub enum CheckpointCodecError {
     // ... other variants ...
-    /// The forward definition conversion to proto failed.
-    #[error("checkpoint definition could not be encoded to proto")]
-    DefinitionConversion,
+    /// The forward definition conversion to proto failed.
+    #[error("checkpoint definition could not be encoded to proto: {source}")]
+    DefinitionConversion {
+        #[source]
+        source: Box<dyn std::error::Error + Send + Sync>,
+    },
 }

Then in encode_checkpoint_record:

         schedule: buffa::MessageField::some(
             v1::Schedule::try_from(&ScheduleEventSchedule::from(&record.schedule))
-                .map_err(|_| CheckpointCodecError::DefinitionConversion)?,
+                .map_err(|e| CheckpointCodecError::DefinitionConversion { source: Box::new(e) })?,
         ),
         delivery: buffa::MessageField::some(
             v1::Delivery::try_from(&ScheduleEventDelivery::from(&record.delivery))
-                .map_err(|_| CheckpointCodecError::DefinitionConversion)?,
+                .map_err(|e| CheckpointCodecError::DefinitionConversion { source: Box::new(e) })?,
         ),
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/codec.rs`
around lines 134 - 149, The proto conversion errors from v1::Schedule::try_from
and v1::Delivery::try_from are being swallowed by mapping them to
CheckpointCodecError::DefinitionConversion without preserving the source; update
the error type and the call sites to keep the original error: add a source field
(or distinct variants) to CheckpointCodecError::DefinitionConversion that can
hold the underlying TryFrom error type (or Box<dyn Error + Send + Sync>),
implement/derive the appropriate From or source wiring, and change the map_err
closures around v1::Schedule::try_from and v1::Delivery::try_from to wrap the
original error (e.g., map_err(|e| CheckpointCodecError::DefinitionConversion {
source: e.into() })). Ensure the Error impl (or thiserror attribute) preserves
and returns the source so callers can inspect the original conversion error.

Source: Coding guidelines

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@rsworkspace/crates/trogon-nats/src/jetstream/traits.rs`:
- Around line 110-116: The trait JetStreamSubjectPurger currently forces
discarding the SDK purge response; change it to preserve the SDK type by adding
an associated type (e.g., type PurgeResponse) and update the method signature
purge_subject_messages(&self, subject: &str) -> impl Future<Output =
Result<Self::PurgeResponse, Self::Error>> + Send so production implementations
can be zero-cost passthroughs; update the doc comment to describe that
implementations return the raw SDK purge response and leave any conversion to
callers.
- Around line 292-300: The impl for JetStreamSubjectPurger is discarding the SDK
PurgeResponse and wrapping the result to return (), violating the zero-cost
passthrough guideline; change the impl body of purge_subject_messages on
jetstream::stream::Stream to directly return the SDK call (i.e. make the
function body just self.purge().filter(subject).await) and, if needed, update
the JetStreamSubjectPurger trait signature to return the SDK's Result type (e.g.
Result<stream::PurgeResponse, stream::PurgeError>) so no response mapping or
error conversion is performed.

In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/reconcile.rs`:
- Around line 145-154: The code uses the local `current` checkpoint without
verifying it belongs to the expected `schedule_id`, so calls like
`reconcile_resumed` and `purge` can mutate/purge the wrong checkpoint; update
the checks around `current` to validate that `current.schedule_id ==
schedule_id.as_str()` (or equivalent comparison) before proceeding and return
`ReconcileError::MissingCheckpoint { schedule_id:
schedule_id.as_str().to_string() }` (or a more specific error) on mismatch;
apply the same validation in the other block referenced (the logic around lines
170-186) so both `reconcile_resumed` and `purge` only operate on checkpoints
whose `schedule_id` matches the requested `schedule_id`.
- Around line 60-62: ReconcileError's MissingCheckpoint variant (and the other
schedule-related variants in the same enum) currently use String for schedule_id
which loses the domain type; change those enum variants to use the domain type
ScheduleId instead of String, update the error message formatting to rely on
ScheduleId's Display/ToString, and fix all constructors/return sites to pass a
ScheduleId (or convert to one via the proper factory) so the type invariants are
preserved; ensure necessary imports and trait impls for ScheduleId are available
where ReconcileError is defined and where it's constructed.

In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/schedule_key.rs`:
- Around line 23-25: ScheduleKey::for_stream currently takes a raw &str allowing
wire strings to flow into routing/domain APIs; introduce a small newtype (e.g.,
StreamId or StreamRoutingId) that wraps String (or newtype over str) with
From<String>/From<&str> and an as_bytes()/as_ref() accessor to preserve the
“total over arbitrary strings” behavior, change ScheduleKey::for_stream
signature to accept that new type (e.g., pub fn for_stream(id: StreamId) ->
Self) and use id.as_bytes() when calling Uuid::new_v5, and update call sites
(notably where ScheduleKey::for_stream(&event.stream_id)) to convert the
boundary StreamEvent.stream_id into the new StreamId at the I/O/boundary (e.g.,
ScheduleKey::for_stream(event.stream_id.into())). Ensure the newtype lives near
routing/domain types and add trivial tests for conversion and that Uuid
generation unchanged.

In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/processor.rs`:
- Line 186: Replace stringifying errors before calling poison() with a typed
error wrapper: define a PoisonReason enum (e.g., variants Decode { source: E },
Deserialization { source: E }, Execution { source: E }, etc.) that implements
std::error::Error and Display, update ProcessingFailureRecord to store
PoisonReason (or Box<dyn std::error::Error + Send + Sync> / a serde-serializable
wrapper) instead of String, and change the four sites (the Err(error) arms that
currently do Err(format!("event could not be decoded: {error}")) and similar) to
wrap the original error into the appropriate PoisonReason variant (e.g.,
PoisonReason::Decode { source: error }) before passing it to poison(); ensure
PoisonReason/ProcessingFailureRecord are serializable for durable records.

---

Nitpick comments:
In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/codec.rs`:
- Around line 134-149: The proto conversion errors from v1::Schedule::try_from
and v1::Delivery::try_from are being swallowed by mapping them to
CheckpointCodecError::DefinitionConversion without preserving the source; update
the error type and the call sites to keep the original error: add a source field
(or distinct variants) to CheckpointCodecError::DefinitionConversion that can
hold the underlying TryFrom error type (or Box<dyn Error + Send + Sync>),
implement/derive the appropriate From or source wiring, and change the map_err
closures around v1::Schedule::try_from and v1::Delivery::try_from to wrap the
original error (e.g., map_err(|e| CheckpointCodecError::DefinitionConversion {
source: e.into() })). Ensure the Error impl (or thiserror attribute) preserves
and returns the source so callers can inspect the original conversion error.

In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/nats_execution_tests.rs`:
- Around line 163-166: The test uses a fixed STATE_BUCKET which can leak state
between runs; change the first test to create a nonce-suffixed bucket name
(e.g., append a UUID/timestamp/nonce to STATE_BUCKET) before calling key_value
so the call key_value(&context, ...) uses the unique bucket, and ensure
ScheduleCheckpointStore::new(checkpoint_kv) and the ExecutionScheduleWriter
instantiation continue to use that KV so checkpoints are isolated per test run.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 2223c3e2-e550-46e1-ba0c-10a4af3d7e3c

📥 Commits

Reviewing files that changed from the base of the PR and between 88d6ef5 and c3c11ab.

⛔ Files ignored due to path filters (1)
  • rsworkspace/Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (33)
  • rsworkspace/Cargo.toml
  • rsworkspace/crates/trogon-decider-nats/src/store.rs
  • rsworkspace/crates/trogon-decider-nats/src/stream_store.rs
  • rsworkspace/crates/trogon-nats/Cargo.toml
  • rsworkspace/crates/trogon-nats/src/jetstream/mocks.rs
  • rsworkspace/crates/trogon-nats/src/jetstream/mod.rs
  • rsworkspace/crates/trogon-nats/src/jetstream/traits.rs
  • rsworkspace/crates/trogon-scheduler/Cargo.toml
  • rsworkspace/crates/trogon-scheduler/src/lib.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/codec.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/failure.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/mod.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/record.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/store.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/execution_schedules/mod.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/mod.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/nats_execution_tests.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/go_duration.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/mod.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/reconcile.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/recorded_events.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/request.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/schedule_key.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/schedule_subject.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/consumer.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/dispatcher.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/mod.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/processor.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/testkit.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/mod.rs
  • rsworkspace/crates/trogon-scheduler/src/telemetry/metrics.rs
  • rsworkspace/crates/trogon-scheduler/src/telemetry/mod.rs
  • rsworkspace/crates/trogon-scheduler/src/telemetry/trace.rs

Comment thread rsworkspace/crates/trogon-nats/src/jetstream/traits.rs Outdated
Comment thread rsworkspace/crates/trogon-nats/src/jetstream/traits.rs
Comment thread rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/processor.rs Outdated
Comment thread rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/processor.rs Outdated
Comment thread rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/dispatcher.rs Outdated

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/reconcile.rs (1)

154-154: 💤 Low value

Avoid unnecessary clone in schedule-id comparison.

The comparison current.schedule_id != schedule_id.clone() allocates a new ScheduleId when a reference comparison suffices. Compare against the dereferenced value instead.

Suggested fix
-    if current.schedule_id != schedule_id.clone() {
+    if &current.schedule_id != schedule_id {

Apply the same change at line 191.

Also applies to: 191-191

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/reconcile.rs`
at line 154, Replace the unnecessary allocation caused by schedule_id.clone()
when comparing with current.schedule_id: change the comparisons from
"current.schedule_id != schedule_id.clone()" to dereference the schedule_id
reference (e.g., "current.schedule_id != *schedule_id") in reconcile.rs; apply
the same change for the second occurrence around the other comparison at the
noted location (the two places that use current.schedule_id and schedule_id).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/consumer.rs`:
- Around line 69-85: The DeliveredMessage impl for JetStreamDeliveredMessage
currently swallows settlement errors in ack(), term(), and retry() (using `let _
= …`) which hides delivery failures; update the implementation to surface errors
instead: either change the DeliveredMessage trait (in dispatcher.rs) so its
async methods return Result<(), E> (propagate the SDK errors from
JetStreamDeliveredMessage::ack/ack_with through ack(), term(), retry()), or if
changing the trait is not feasible immediately, at minimum log the error results
from self.message.ack().await and self.message.ack_with(...).await in ack(),
term(), and retry() using the existing logging facility so failures are
observable; ensure references to DeliveredMessage, JetStreamDeliveredMessage,
ack(), term(), retry(), and the DeliveredMessage::Output signature (in
dispatcher.rs) are updated/used accordingly.

---

Nitpick comments:
In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/reconcile.rs`:
- Line 154: Replace the unnecessary allocation caused by schedule_id.clone()
when comparing with current.schedule_id: change the comparisons from
"current.schedule_id != schedule_id.clone()" to dereference the schedule_id
reference (e.g., "current.schedule_id != *schedule_id") in reconcile.rs; apply
the same change for the second occurrence around the other comparison at the
noted location (the two places that use current.schedule_id and schedule_id).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: a09b8c62-6214-4ec0-b5d1-c9863b36b6d1

📥 Commits

Reviewing files that changed from the base of the PR and between c3c11ab and 47db48a.

📒 Files selected for processing (15)
  • rsworkspace/crates/trogon-nats/src/jetstream/mocks.rs
  • rsworkspace/crates/trogon-nats/src/jetstream/traits.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/store.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/execution_schedules/mod.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/nats_execution_tests.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/mod.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/reconcile.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/schedule_key.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/consumer.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/dispatcher.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/mod.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/processor.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/testkit.rs
  • rsworkspace/crates/trogon-scheduler/src/telemetry/metrics.rs
  • rsworkspace/crates/trogon-scheduler/src/telemetry/trace.rs
✅ Files skipped from review due to trivial changes (1)
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/mod.rs
🚧 Files skipped from review as they are similar to previous changes (9)
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/mod.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/schedule_key.rs
  • rsworkspace/crates/trogon-scheduler/src/telemetry/metrics.rs
  • rsworkspace/crates/trogon-scheduler/src/telemetry/trace.rs
  • rsworkspace/crates/trogon-nats/src/jetstream/mocks.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/nats_execution_tests.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/testkit.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/execution_schedules/mod.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/store.rs

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/dispatcher.rs (1)

43-48: ⚡ Quick win

DeliveredMessage settlement methods use String for errors.

The trait signature defines Result<(), String> for ack, term, and retry, which violates the coding guideline requiring typed errors. This propagates String-based errors throughout the settlement flow.

Consider defining a SettlementError enum that can capture different failure modes (network errors, protocol errors, etc.) while remaining Send + Sync. The trait could then return Result<(), SettlementError>.

As per coding guidelines: "Errors must be typed—use structs or enums, never String or format!()."

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/dispatcher.rs`
around lines 43 - 48, Change the settlement methods on DeliveredMessage to
return a typed error instead of String: define a Send + Sync enum (e.g.
SettlementError) that models failure cases (e.g. Network, Protocol, NotFound,
Internal) and implement conversions (From) from underlying error types, then
update the trait signatures for ack(), term(), and retry() to return Result<(),
SettlementError>; ensure SettlementError derives/send+sync bounds and update any
callers/propagations to map their String-based errors into the new enum.

Source: Coding guidelines

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/dispatcher.rs`:
- Around line 43-48: Change the settlement methods on DeliveredMessage to return
a typed error instead of String: define a Send + Sync enum (e.g.
SettlementError) that models failure cases (e.g. Network, Protocol, NotFound,
Internal) and implement conversions (From) from underlying error types, then
update the trait signatures for ack(), term(), and retry() to return Result<(),
SettlementError>; ensure SettlementError derives/send+sync bounds and update any
callers/propagations to map their String-based errors into the new enum.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: b71ff458-18ea-4363-9f72-29985e9cd838

📥 Commits

Reviewing files that changed from the base of the PR and between 47db48a and 8c51efc.

📒 Files selected for processing (10)
  • .github/workflows/ci-rust.yml
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/codec.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/store.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/reconcile.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/recorded_events.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/request.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/consumer.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/dispatcher.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/processor.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/testkit.rs
🚧 Files skipped from review as they are similar to previous changes (5)
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/request.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/store.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/codec.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/testkit.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/reconcile.rs

@yordis yordis added the rust:coverage-baseline-reset Relax Rust coverage gate to establish a new baseline label Jun 9, 2026
@yordis yordis force-pushed the yordis/nats-scheduler-plan branch from d2fd067 to 402c21a Compare June 9, 2026 04:24

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (1)
rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/codec.rs (1)

47-49: ⚡ Quick win

Keep the underlying proto conversion error.

Lines 169-174 collapse both TryFrom failures into the unit DefinitionConversion, so the save path loses which conversion failed and why. Keep separate source-carrying variants for schedule and delivery encoding instead of erasing the original typed error here.

As per coding guidelines, "Never discard error context by converting a typed error into a string; wrap the source error as a field or variant instead."

Also applies to: 169-175

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/codec.rs`
around lines 47 - 49, DefinitionConversion in the checkpoint codec erases the
underlying proto conversion errors; change the enum to keep source-carrying
variants (e.g., DefinitionScheduleConversion and DefinitionDeliveryConversion)
that wrap the specific conversion error types returned by the TryFrom
implementations so you don't lose context, then update the TryFrom impls that
currently map both failures to DefinitionConversion to return the specific
wrapped variant (or use #[from] if using thiserror) so the original typed error
is preserved for both schedule and delivery encoding.

Source: Coding guidelines

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@_TODO.md`:
- Around line 21-51: ScheduleCheckpoint is reusing
trogonai.scheduler.schedules.v1.Schedule, .Delivery, and .Message which couples
checkpoint wire format to the event schema; change ScheduleCheckpoint to embed
checkpoint-specific message types (e.g., ScheduleCheckpointSchedule,
ScheduleCheckpointDelivery, ScheduleCheckpointMessage) that duplicate the needed
fields from v1.Schedule/v1.Delivery/v1.Message or alternatively add a clear
comment/contract explaining version stability if you intentionally keep sharing;
update the ScheduleCheckpoint definition to reference the new checkpoint-scoped
message types and copy any required fields so checkpoint evolution is
independent of the event schema.
- Line 61: The comment references a non-existent function name; update the
mention to the actual function `decode_checkpoint_envelope` (or preserve its
behavior) and ensure any text or TODOs that say `decode_checkpoint_watermark`
are corrected to `decode_checkpoint_envelope`; if the intent was to discuss
replacing behavior, reword to “Preserve `decode_checkpoint_envelope` behavior or
replace it with an explicit recovery strategy” and ensure any surrounding
documentation or references (e.g., in checkpoints/codec.rs) point to the real
public function `decode_checkpoint_envelope`.

In `@rsworkspace/crates/trogon-scheduler/Cargo.toml`:
- Line 12: The Cargo.toml for trogon-scheduler lists async-nats with only
features = ["jetstream"] but the repository expects server_2_12 to be provided
by trogon-nats; either explicitly enable the server_2_12 feature on async-nats
in trogon-scheduler's dependency declaration (async-nats = { workspace = true,
features = ["jetstream", "server_2_12"] }) or update the layer/README comment to
state that trogon-nats supplies server_2_12 so developers know the feature is
coming from trogon-nats; adjust the trogon-scheduler Cargo.toml or its
documentation accordingly and ensure references to async_nats::HeaderMap and
async_nats::jetstream::{kv, consumer} remain valid.

In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/codec.rs`:
- Around line 121-130: StoredScheduleCheckpoint currently persists a wire-format
v1::ScheduleCreated blob (definition: Vec<u8>) which allows invalid wire shapes
into storage; replace that with a checkpoint-local DTO that mirrors the
validated domain Schedule definition (e.g., fields for schedule name,
cron/expression, timezone, etc.), update StoredScheduleCheckpoint to store that
DTO instead of Vec<u8>, change serialization/deserialization to (de)serialize
the DTO, and modify the load/restore code paths (the code handling
StoredScheduleCheckpoint and the code currently re-validating
v1::ScheduleCreated bytes) to stop decoding and re-validating proto bytes and
instead reconstruct domain objects directly from the DTO; apply the same change
to the other occurrences noted around the blocks that create/consume the
definition blob (the regions you flagged) so that wire types
(v1::ScheduleCreated) are only parsed once at the boundary and never persisted.
- Around line 200-218: The code decodes a ScheduleCreated from stored.definition
but never verifies that the embedded schedule ID matches the routing schedule_id
derived from stored.schedule_id; add a check after
v1::ScheduleCreated::decode_from_slice (after retrieving created.schedule via
created.schedule.as_option()) to compare the decoded schedule's ID (the field
accessed by schedule_from_proto input or created.schedule.schedule_id
equivalent) against the earlier schedule_id (from schedule_id_from) and return
an appropriate CheckpointCodecError::Domain (or other existing domain error) if
they differ, so ScheduleKey::derive/ ScheduleSubject::execution and the decoded
definition cannot be mismatched.

---

Nitpick comments:
In
`@rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/codec.rs`:
- Around line 47-49: DefinitionConversion in the checkpoint codec erases the
underlying proto conversion errors; change the enum to keep source-carrying
variants (e.g., DefinitionScheduleConversion and DefinitionDeliveryConversion)
that wrap the specific conversion error types returned by the TryFrom
implementations so you don't lose context, then update the TryFrom impls that
currently map both failures to DefinitionConversion to return the specific
wrapped variant (or use #[from] if using thiserror) so the original typed error
is preserved for both schedule and delivery encoding.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 803c669d-735d-42ec-8676-c34ca2ee3e2c

📥 Commits

Reviewing files that changed from the base of the PR and between 66f5901 and 402c21a.

⛔ Files ignored due to path filters (1)
  • rsworkspace/Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (34)
  • _TODO.md
  • rsworkspace/Cargo.toml
  • rsworkspace/crates/trogon-decider-nats/src/store.rs
  • rsworkspace/crates/trogon-decider-nats/src/stream_store.rs
  • rsworkspace/crates/trogon-nats/Cargo.toml
  • rsworkspace/crates/trogon-nats/src/jetstream/mocks.rs
  • rsworkspace/crates/trogon-nats/src/jetstream/mod.rs
  • rsworkspace/crates/trogon-nats/src/jetstream/traits.rs
  • rsworkspace/crates/trogon-scheduler/Cargo.toml
  • rsworkspace/crates/trogon-scheduler/src/lib.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/codec.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/failure.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/mod.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/record.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/store.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/execution_schedules/mod.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/mod.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/nats_execution_tests.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/go_duration.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/mod.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/reconcile.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/recorded_events.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/request.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/schedule_key.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/schedule_subject.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/consumer.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/dispatcher.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/mod.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/processor.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/testkit.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/mod.rs
  • rsworkspace/crates/trogon-scheduler/src/telemetry/metrics.rs
  • rsworkspace/crates/trogon-scheduler/src/telemetry/mod.rs
  • rsworkspace/crates/trogon-scheduler/src/telemetry/trace.rs
✅ Files skipped from review due to trivial changes (3)
  • rsworkspace/crates/trogon-nats/Cargo.toml
  • rsworkspace/crates/trogon-scheduler/src/processor/mod.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/schedule_subject.rs
🚧 Files skipped from review as they are similar to previous changes (28)
  • rsworkspace/crates/trogon-nats/src/jetstream/mod.rs
  • rsworkspace/Cargo.toml
  • rsworkspace/crates/trogon-decider-nats/src/stream_store.rs
  • rsworkspace/crates/trogon-scheduler/src/lib.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/schedule_key.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/go_duration.rs
  • rsworkspace/crates/trogon-decider-nats/src/store.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/mod.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/record.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/mod.rs
  • rsworkspace/crates/trogon-scheduler/src/telemetry/mod.rs
  • rsworkspace/crates/trogon-scheduler/src/telemetry/metrics.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/mod.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/mod.rs
  • rsworkspace/crates/trogon-nats/src/jetstream/mocks.rs
  • rsworkspace/crates/trogon-scheduler/src/telemetry/trace.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/failure.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/execution_schedules/mod.rs
  • rsworkspace/crates/trogon-nats/src/jetstream/traits.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/consumer.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/request.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/recorded_events.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/testkit.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/nats_execution_tests.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/store.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/dispatcher.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/reconciliation/reconcile.rs
  • rsworkspace/crates/trogon-scheduler/src/processor/execution/worker/processor.rs

Comment thread _TODO.md Outdated
Comment thread _TODO.md Outdated
Comment thread rsworkspace/crates/trogon-scheduler/Cargo.toml Outdated
Comment thread rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/codec.rs Outdated
Comment thread rsworkspace/crates/trogon-scheduler/src/processor/execution/checkpoints/codec.rs Outdated
@yordis yordis force-pushed the yordis/nats-scheduler-plan branch from bfefc42 to 1682289 Compare June 9, 2026 18:57
@yordis yordis force-pushed the yordis/nats-scheduler-plan branch from 1682289 to 8e24844 Compare June 9, 2026 20:03
@yordis yordis force-pushed the yordis/nats-scheduler-plan branch from 8e24844 to ac8b61c Compare June 9, 2026 21:07

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes using default effort and found 1 potential issue.

There are 2 total unresolved issues (including 1 from previous review).

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit ac8b61c. Configure here.

@yordis yordis force-pushed the yordis/nats-scheduler-plan branch from ac8b61c to 7d383cd Compare June 9, 2026 21:37
Signed-off-by: Yordis Prieto <yordis.prieto@gmail.com>
@yordis yordis force-pushed the yordis/nats-scheduler-plan branch from 7d383cd to a66706a Compare June 9, 2026 23:06
@yordis yordis merged commit c6708aa into main Jun 10, 2026
7 checks passed
@yordis yordis deleted the yordis/nats-scheduler-plan branch June 10, 2026 01:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

rust:coverage-baseline-reset Relax Rust coverage gate to establish a new baseline

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants