Skip to content

feat: implement distributed Shuffle Join (Phase 6)#6

Merged
poyrazK merged 20 commits intomainfrom
feature/phase-6-shuffle-join
Mar 2, 2026
Merged

feat: implement distributed Shuffle Join (Phase 6)#6
poyrazK merged 20 commits intomainfrom
feature/phase-6-shuffle-join

Conversation

@poyrazK
Copy link
Copy Markdown
Owner

@poyrazK poyrazK commented Mar 2, 2026

Description

This PR implements the core infrastructure and orchestration for distributed Shuffle Joins in cloudSQL (Phase 6). It enables the cluster to dynamically re-partition and redistribute large tables based on join keys, allowing for efficient cross-shard join execution.

Key Changes

  • Context-Aware Buffering: Enhanced ClusterManager to isolate shuffle data using a context_id, enabling concurrent distributed joins.
  • Shuffle RPC Infrastructure: Introduced ShuffleFragment RPC and implemented its node-side logic for scanning local shards and pushing partitions to peers.
  • Orchestration Logic: Updated DistributedExecutor to coordinate the multi-phase shuffle join process (Redistribution -> Local Join).
  • Execution Integration: Updated BufferScanOperator and QueryExecutor to correctly scan isolated staging buffers.
  • Advanced Validation: Added new integration tests for shuffle orchestration, buffer isolation, and error handling for non-equality joins.

Validation Results

  • Distributed Tests: 9/9 PASSED (verified Shuffle Join, Broadcast Join, and Shard Pruning).
  • Core Tests: All existing storage and execution tests passing.
  • Build: CLEAN build verified.

Steps to Test

  1. Build: mkdir build && cd build && cmake .. && make -j
  2. Run distributed tests: ./distributed_tests

Summary by CodeRabbit

  • New Features

    • Shuffle joins across shards with per-execution context isolation and explicit context propagation
    • Context-aware buffering and execution propagation for concurrent queries
    • Broadcast-table capability made accessible; query execution can carry an execution context
  • Bug Fixes

    • Improved RPC server thread-safety during worker handling
  • Tests

    • New distributed tests for shuffle orchestration, concurrent isolation, and non-equality join rejection

@poyrazK poyrazK added documentation Improvements or additions to documentation enhancement New feature or request labels Mar 2, 2026
@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 2, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds per-execution context_id propagation and context-scoped shuffle buffering; implements two-phase shuffle-join orchestration (ShuffleFragment + PushData + context-aware buffering), updates RPC (de)serialization for strings and context_id, adjusts executor/operator APIs, refines RpcServer worker synchronization, expands tests and CI flags.

Changes

Cohort / File(s) Summary
Cluster & Shuffle Buffering
include/common/cluster_manager.hpp
Shuffle buffers re-keyed to context_id -> table -> rows. APIs buffer_shuffle_data, has_shuffle_data, fetch_shuffle_data now accept context_id.
Executor: Context Propagation & Operators
include/executor/query_executor.hpp, src/executor/query_executor.cpp, include/executor/operator.hpp, src/executor/operator.cpp
QueryExecutor gains set_context_id + context_id_; BufferScanOperator constructor and instances now accept and store context_id. Call sites updated.
Distributed Executor & Orchestration
include/distributed/distributed_executor.hpp, src/distributed/distributed_executor.cpp
Introduces per-execution context_id generation; adds two-phase shuffle-join orchestration using ShuffleFragment RPCs and context-aware prepare/commit/abort flows. broadcast_table made public.
RPC Messages & Serialization
include/network/rpc_message.hpp
Adds RpcType::ShuffleFragment, ShuffleFragmentArgs; extends ExecuteFragmentArgs and PushDataArgs with context_id; adds Serializer::serialize_string/deserialize_string and updates (de)serialization for related structs.
Main RPC Handler & Shuffle Processing
src/main.cpp
Adds ShuffleFragment RPC handler that scans local shard, partitions by join key, and PushData to peers with context_id; integrates QueryExecutor context usage and context-aware buffering.
RPC Server Concurrency
include/network/rpc_server.hpp, src/network/rpc_server.cpp
Adds worker_mutex_; stop() swaps worker list under lock then joins without holding the lock; accept_loop() guards worker thread creation with the mutex.
Tests
tests/distributed_tests.cpp
Adds/updates tests: BroadcastJoinOrchestration, ShuffleJoinOrchestration, ConcurrentShuffleIsolation, NonEqualityJoinRejection; tracks ShuffleFragment/PushData/ExecuteFragment counts and asserts context isolation and error cases.
Build & CI
.github/workflows/ci.yml, CMakeLists.txt
CI now sets -DSTRICT_LINT=OFF and clears CMAKE_CXX_CLANG_TIDY; CMake clang-tidy lookup skipped when CMAKE_CXX_CLANG_TIDY is pre-defined.
Docs / Plan
plans/CPP_MIGRATION_PLAN.md
Adds Phase 6: Multi-Shard Joins (Shuffle Join) and documents context-aware buffering, shuffle RPCs, and orchestration tasks.

Sequence Diagram

sequenceDiagram
    participant Client as Client
    participant DE as DistributedExecutor
    participant RPC as RpcServer
    participant Peer as RemotePeer
    participant Cluster as ClusterManager
    participant Local as LocalShard

    Client->>DE: execute_query(JOIN sql)
    activate DE
    DE->>DE: generate context_id

    Note over DE,Local: Phase 1 - shuffle left table
    DE->>RPC: ShuffleFragment(context_id, left_table, join_key)
    activate RPC
    RPC->>Local: scan local rows
    Local->>RPC: partition rows by join key
    RPC->>Peer: PushData(context_id, table, partitioned_rows) (to each peer)
    Peer-->>RPC: QueryResults
    RPC-->>DE: aggregated QueryResults
    deactivate RPC

    Note over DE,Local: Phase 2 - shuffle right table
    DE->>RPC: ShuffleFragment(context_id, right_table, join_key)
    activate RPC
    RPC->>Local: scan local rows
    Local->>RPC: partition rows by join key
    RPC->>Peer: PushData(context_id, table, partitioned_rows) (to each peer)
    Peer-->>RPC: QueryResults
    RPC-->>DE: aggregated QueryResults
    deactivate RPC

    Note over DE,Cluster: collect buffered data
    DE->>Cluster: fetch_shuffle_data(context_id, left_table)
    Cluster-->>DE: left rows
    DE->>Cluster: fetch_shuffle_data(context_id, right_table)
    Cluster-->>DE: right rows

    DE->>DE: local join on buffered data
    DE-->>Client: results
    deactivate DE
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

Poem

🐇 I hop with a context_id snug in my paw,
I scatter rows far, then gather them all,
Two-phase shuffles and messages I draw,
Buffers kept distinct — no cross-talk at all,
A rabbit rejoices as queries enthrall ✨

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 16.67% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: implement distributed Shuffle Join (Phase 6)' directly and clearly describes the main change—implementing distributed Shuffle Join as Phase 6, which aligns perfectly with the PR objectives and the extensive changes across buffering, RPC infrastructure, and orchestration.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature/phase-6-shuffle-join

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/distributed/distributed_executor.cpp (1)

307-321: ⚠️ Potential issue | 🔴 Critical

Capture node by value in async fan-out.

Line 307 currently captures &node; this can dangle across async execution and route requests incorrectly.

🐛 Proposed fix
-    for (const auto& node : target_nodes) {
-        query_futures.push_back(std::async(std::launch::async, [&node, fragment_payload]() {
+    for (const auto& node : target_nodes) {
+        query_futures.push_back(std::async(std::launch::async, [node, fragment_payload]() {
             network::RpcClient client(node.address, node.cluster_port);
             network::QueryResultsReply reply;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/distributed/distributed_executor.cpp` around lines 307 - 321, The lambda
passed into query_futures.push_back captures &node which can dangle when the
async runs; change the capture so the lambda captures node by value (e.g.,
[node, fragment_payload]) or copy node into a local variable before creating the
async to ensure the network::RpcClient is constructed with a valid node copy;
update the lambda capture in the async call that constructs network::RpcClient
and uses node.id/node.address/node.cluster_port accordingly.
🧹 Nitpick comments (2)
include/network/rpc_message.hpp (1)

136-156: Use VAL_SIZE_32 constant instead of magic number 4.

The new serialize_string and deserialize_string methods use hardcoded 4 for the length field size, while the rest of the Serializer class uses VAL_SIZE_32. This inconsistency reduces maintainability.

♻️ Proposed fix for consistency
     static void serialize_string(const std::string& s, std::vector<uint8_t>& out) {
         const auto len = static_cast<uint32_t>(s.size());
         const size_t offset = out.size();
-        out.resize(offset + 4 + len);
-        std::memcpy(out.data() + offset, &len, 4);
-        std::memcpy(out.data() + offset + 4, s.data(), len);
+        out.resize(offset + VAL_SIZE_32 + len);
+        std::memcpy(out.data() + offset, &len, VAL_SIZE_32);
+        std::memcpy(out.data() + offset + VAL_SIZE_32, s.data(), len);
     }

     static std::string deserialize_string(const uint8_t* data, size_t& offset, size_t size) {
         uint32_t len = 0;
-        if (offset + 4 <= size) {
-            std::memcpy(&len, data + offset, 4);
-            offset += 4;
+        if (offset + VAL_SIZE_32 <= size) {
+            std::memcpy(&len, data + offset, VAL_SIZE_32);
+            offset += VAL_SIZE_32;
         }
         std::string s;
         if (offset + len <= size) {
             s.assign(reinterpret_cast<const char*>(data + offset), len);
             offset += len;
         }
         return s;
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@include/network/rpc_message.hpp` around lines 136 - 156, Replace the
hardcoded length byte count "4" in serialize_string and deserialize_string with
the VAL_SIZE_32 constant used elsewhere in Serializer to maintain consistency:
in serialize_string use VAL_SIZE_32 when computing resize(offset + VAL_SIZE_32 +
len), when copying the length use memcpy(..., &len, VAL_SIZE_32) and when
copying the string data adjust the second memcpy offset by VAL_SIZE_32; in
deserialize_string replace the conditional checks and memcpy that use 4 with
VAL_SIZE_32 (i.e., check offset + VAL_SIZE_32 <= size, memcpy(&len, data +
offset, VAL_SIZE_32), and advance offset by VAL_SIZE_32) so all size
calculations and copies reference VAL_SIZE_32 uniformly in serialize_string and
deserialize_string.
tests/distributed_tests.cpp (1)

247-249: Make fetch-phase detection less SQL-string brittle.

Matching "SELECT * FROM small_table" directly can make this test flaky with harmless SQL rendering changes. Prefer a structured signal (dedicated RPC arg/flag or normalized parser-level check).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/distributed_tests.cpp` around lines 247 - 249, The test is brittle
because it checks args.sql for the exact string "SELECT * FROM small_table";
instead add or use a structured fetch-phase indicator on the query RPC (e.g., a
boolean flag like args.is_fetch_all or an enum args.phase) and change the test
to detect that flag instead of string matching (update the RPC/query struct and
the caller that sets args to populate args.is_fetch_all, then in tests replace
the args.sql check with a check of args.is_fetch_all while leaving fetch_calls
unchanged).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/distributed/distributed_executor.cpp`:
- Around line 133-140: The loop sending ShuffleFragment currently ignores
failures (connect() and call()), allowing execution to continue with incomplete
redistribution; update the block iterating over data_nodes using
network::RpcClient so that if client.connect() returns false or client.call(...)
indicates failure (e.g., non-success return or empty/invalid resp) you abort the
operation by returning an error/throwing or otherwise signaling failure from the
surrounding function instead of continuing; ensure you propagate a clear
error/status up to the caller and apply the same change to the other
ShuffleFragment loop that mirrors this logic (the second block handling
ShuffleFragment).
- Around line 105-114: The join key strings extracted from the binary expression
(in the block handling join.condition and parser::BinaryExpr) may be qualified
(e.g., "table.col") and must be normalized before packaging into
ShuffleFragmentArgs; update the code that sets left_key and right_key to strip
qualification and retain only the catalog column name (or use the AST node for
the BinaryExpr left()/right() to extract unqualified identifier) and then pass
those normalized names into ShuffleFragmentArgs so downstream lookup by catalog
column name succeeds.

In `@src/main.cpp`:
- Around line 415-427: Guard shard routing by first checking
cluster_manager->get_data_nodes() for emptiness and stabilizing its order before
using its size in cloudsql::cluster::ShardManager::compute_shard; specifically,
if data_nodes is empty skip/handle the tuple (e.g., log or queue) instead of
calling compute_shard, and before computing the shard sort or otherwise
deterministically order data_nodes by a stable key (e.g., data_nodes[].id) so
that the index returned by compute_shard maps consistently into partitions (the
unordered_map partitions and subsequent push_back into
partitions[data_nodes[node_idx].id] must only be executed after confirming
data_nodes.size()>0 and node_idx is within bounds). Ensure references:
cluster_manager->get_data_nodes(), storage::HeapTable::TupleMeta meta, key_idx,
cloudsql::cluster::ShardManager::compute_shard, partitions, and data_nodes are
updated accordingly.
- Around line 432-458: The shuffle currently marks reply.success = true
unconditionally while skipping delivery correctness checks: ensure the loop over
partitions (variables partitions, node_id, rows and data_nodes lookup) does not
treat zero-row partitions as “skip” (still perform the PushData notification)
and verify the RPC result from cloudsql::network::RpcClient (check
client.connect() and the return of client.call for RpcType::PushData and/or the
contents of resp); track a boolean (e.g., overall_success) initialized true, set
it false on connect failure or on a failing client.call/negative response for
any partition, break or continue as appropriate, and only set reply.success =
true when overall_success remains true after all partitions are processed.

In `@src/network/rpc_client.cpp`:
- Line 42: CI flagged a clang-format style mismatch in
src/network/rpc_client.cpp around the variable declaration struct sockaddr_in
addr{};—fix by running the project's formatter (e.g., clang-format -i
src/network/rpc_client.cpp or the repo's configured format command) to reformat
the file, verify the change includes the sockaddr_in line and surrounding
function/class (rpc client related code), then commit the formatted file so the
pipeline passes; ensure you use the repo-specified clang-format version or the
project's formatting script if provided.

In `@src/network/rpc_server.cpp`:
- Line 34: Run the project clang-format style on src/network/rpc_server.cpp to
fix style violations (the brace-initializer spacing for the declaration struct
sockaddr_in addr{}; and the similar occurrence at the other location) — reformat
the file using the repository’s clang-format settings so the declaration and
surrounding lines match the expected style and CI passes.

In `@src/network/server.cpp`:
- Line 123: CI failing due to clang-format style violations in this source; run
clang-format -i on the file to automatically fix formatting (this will correct
the declaration lines such as "struct sockaddr_in addr{}" and the other similar
sockaddr_in initializations around the file), then re-run CI; no code logic
changes needed—just apply the formatter to the file to resolve style mismatches.

In `@src/storage/storage_manager.cpp`:
- Line 194: The CI failure is due to a clang-format style mismatch around the
brace-initialization of the local variable (see struct stat st{} in
storage_manager.cpp); fix it by running your formatter and committing the
result: run clang-format -i src/storage/storage_manager.cpp (or apply the
project's clang-format settings) to reformat the file so the declaration (struct
stat st{}) matches project style, then add and push the updated file.

In `@tests/server_tests.cpp`:
- Line 66: The CI failure is due to clang-format style mismatches in
tests/server_tests.cpp (e.g., the declaration using "struct sockaddr_in addr{}"
at the indicated locations); fix by running clang-format -i on that file or
applying the project's clang-format rules to reformat the file so
spacing/brace/whitespace match style (ensure all occurrences such as the
sockaddr_in declarations at the noted lines are reformatted consistently).

---

Outside diff comments:
In `@src/distributed/distributed_executor.cpp`:
- Around line 307-321: The lambda passed into query_futures.push_back captures
&node which can dangle when the async runs; change the capture so the lambda
captures node by value (e.g., [node, fragment_payload]) or copy node into a
local variable before creating the async to ensure the network::RpcClient is
constructed with a valid node copy; update the lambda capture in the async call
that constructs network::RpcClient and uses
node.id/node.address/node.cluster_port accordingly.

---

Nitpick comments:
In `@include/network/rpc_message.hpp`:
- Around line 136-156: Replace the hardcoded length byte count "4" in
serialize_string and deserialize_string with the VAL_SIZE_32 constant used
elsewhere in Serializer to maintain consistency: in serialize_string use
VAL_SIZE_32 when computing resize(offset + VAL_SIZE_32 + len), when copying the
length use memcpy(..., &len, VAL_SIZE_32) and when copying the string data
adjust the second memcpy offset by VAL_SIZE_32; in deserialize_string replace
the conditional checks and memcpy that use 4 with VAL_SIZE_32 (i.e., check
offset + VAL_SIZE_32 <= size, memcpy(&len, data + offset, VAL_SIZE_32), and
advance offset by VAL_SIZE_32) so all size calculations and copies reference
VAL_SIZE_32 uniformly in serialize_string and deserialize_string.

In `@tests/distributed_tests.cpp`:
- Around line 247-249: The test is brittle because it checks args.sql for the
exact string "SELECT * FROM small_table"; instead add or use a structured
fetch-phase indicator on the query RPC (e.g., a boolean flag like
args.is_fetch_all or an enum args.phase) and change the test to detect that flag
instead of string matching (update the RPC/query struct and the caller that sets
args to populate args.is_fetch_all, then in tests replace the args.sql check
with a check of args.is_fetch_all while leaving fetch_calls unchanged).

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 39dfe13 and ebe4ab5.

📒 Files selected for processing (16)
  • include/common/cluster_manager.hpp
  • include/distributed/distributed_executor.hpp
  • include/executor/operator.hpp
  • include/executor/query_executor.hpp
  • include/network/rpc_message.hpp
  • plans/CPP_MIGRATION_PLAN.md
  • src/distributed/distributed_executor.cpp
  • src/executor/operator.cpp
  • src/executor/query_executor.cpp
  • src/main.cpp
  • src/network/rpc_client.cpp
  • src/network/rpc_server.cpp
  • src/network/server.cpp
  • src/storage/storage_manager.cpp
  • tests/distributed_tests.cpp
  • tests/server_tests.cpp

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
tests/distributed_tests.cpp (2)

303-305: push_calls counter is tracked but never asserted.

The push_calls atomic counter is incremented in the handler (line 315) but no assertion verifies its value. If shuffle operations are expected to generate push calls as part of the data redistribution, consider adding an assertion. If not relevant for this test's scope, consider removing the counter to avoid confusion.

🔧 Option A: Add assertion if push calls are expected
     // Each table (2) should be shuffled on each node (2) = 4 shuffle calls total
     EXPECT_GE(shuffle_calls.load(), 4);
+    // Shuffled data should be pushed between nodes
+    EXPECT_GE(push_calls.load(), 2);
     // Finally, ExecuteFragment should be sent to both nodes
     EXPECT_GE(fragment_calls.load(), 2);
🔧 Option B: Remove unused counter
     std::atomic<int> shuffle_calls{0};
-    std::atomic<int> push_calls{0};
     std::atomic<int> fragment_calls{0};
     
     // ... in handler:
-        } else if (h.type == RpcType::PushData) {
-            push_calls++;
         } else if (h.type == RpcType::ExecuteFragment) {

Also applies to: 358-363

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/distributed_tests.cpp` around lines 303 - 305, The push_calls atomic is
incremented in the request handler but never asserted, causing dead code or a
missing test check; either add an assertion (e.g., compare push_calls.load() to
the expected count after the shuffle completes, alongside existing assertions
for shuffle_calls/fragment_calls) or remove the push_calls declaration and its
increments from the handler to avoid confusion—update the same test block where
shuffle_calls and fragment_calls are asserted to keep assertions consistent.

403-421: Test relies on early validation before network calls — consider adding a mock server for robustness.

The test registers a node on port 7800 (line 407) but doesn't start an RpcServer. This works only if the executor validates the join condition before attempting to connect to nodes. If the implementation changes to defer validation, this test will fail with connection errors rather than the expected "equality join condition" error.

Consider starting a minimal mock server to make the test resilient to implementation changes:

🛡️ Proposed fix to add a mock server
 TEST(DistributedExecutorTests, NonEqualityJoinRejection) {
+    RpcServer node1(7800);
+    node1.set_handler(RpcType::ExecuteFragment,
+                      [](const RpcHeader&, const std::vector<uint8_t>&, int) {
+                          // Should never be called if validation works correctly
+                          FAIL() << "Unexpected network call for non-equality join";
+                      });
+    ASSERT_TRUE(node1.start());
+
     auto catalog = Catalog::create();
     const config::Config config;
     ClusterManager cm(&config);
     cm.register_node("n1", "127.0.0.1", 7800, config::RunMode::Data);
     DistributedExecutor exec(*catalog, cm);
 
     // ... rest of test ...
 
     EXPECT_FALSE(res.success());
     EXPECT_THAT(res.error(), testing::HasSubstr("equality join condition"));
+
+    node1.stop();
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/distributed_tests.cpp` around lines 403 - 421, The test
NonEqualityJoinRejection relies on node registration but does not start an RPC
listener, so if validation is deferred the test will fail with connection
errors; modify the test in tests/distributed_tests.cpp to start a minimal/mock
RpcServer (or similar test double) bound to the same address/port used in
cm.register_node("n1", "127.0.0.1", 7800, ...) before creating
DistributedExecutor exec, ensure the mock server accepts or cleanly rejects
connections for the test duration and is stopped/teardown'ed at the end so the
EXPECT_FALSE(res.success()) and EXPECT_THAT(res.error(),
testing::HasSubstr("equality join condition")) assertions remain robust.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In @.github/workflows/ci.yml:
- Around line 59-60: The CI currently disables static analysis by setting
-DSTRICT_LINT=OFF and -DCMAKE_CXX_CLANG_TIDY="" for all matrix builds; restore
static-analysis as a quality gate by removing or reverting those flags in the
main build matrix (set -DSTRICT_LINT=ON and remove the empty
-DCMAKE_CXX_CLANG_TIDY override so the default clang-tidy runs) and if noise is
a concern, move clang-tidy into a separate non-blocking job that runs the same
configuration with -DCMAKE_CXX_CLANG_TIDY enabled (use the existing matrix but
mark that job as non-blocking) so static analysis is preserved without blocking
developer iteration.

---

Nitpick comments:
In `@tests/distributed_tests.cpp`:
- Around line 303-305: The push_calls atomic is incremented in the request
handler but never asserted, causing dead code or a missing test check; either
add an assertion (e.g., compare push_calls.load() to the expected count after
the shuffle completes, alongside existing assertions for
shuffle_calls/fragment_calls) or remove the push_calls declaration and its
increments from the handler to avoid confusion—update the same test block where
shuffle_calls and fragment_calls are asserted to keep assertions consistent.
- Around line 403-421: The test NonEqualityJoinRejection relies on node
registration but does not start an RPC listener, so if validation is deferred
the test will fail with connection errors; modify the test in
tests/distributed_tests.cpp to start a minimal/mock RpcServer (or similar test
double) bound to the same address/port used in cm.register_node("n1",
"127.0.0.1", 7800, ...) before creating DistributedExecutor exec, ensure the
mock server accepts or cleanly rejects connections for the test duration and is
stopped/teardown'ed at the end so the EXPECT_FALSE(res.success()) and
EXPECT_THAT(res.error(), testing::HasSubstr("equality join condition"))
assertions remain robust.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between ebe4ab5 and 42f8dde.

📒 Files selected for processing (4)
  • .github/workflows/ci.yml
  • include/network/rpc_server.hpp
  • src/network/rpc_server.cpp
  • tests/distributed_tests.cpp
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/network/rpc_server.cpp

Comment on lines +59 to +60
-DSTRICT_LINT=OFF \
-DCMAKE_CXX_CLANG_TIDY="" \
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Do not globally disable static-analysis gates in CI.

Line 59 and Line 60 turn off strict lint and effectively disable clang-tidy for all matrix builds. That removes a key quality gate and increases the chance of shipping unsafe or brittle C++ changes.

Suggested adjustment
-          -DSTRICT_LINT=OFF \
-          -DCMAKE_CXX_CLANG_TIDY="" \
+          -DSTRICT_LINT=ON \

If CI noise is the concern, keep clang-tidy in a dedicated non-blocking job rather than disabling it in the main build/test path.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
-DSTRICT_LINT=OFF \
-DCMAKE_CXX_CLANG_TIDY="" \
-DSTRICT_LINT=ON \
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In @.github/workflows/ci.yml around lines 59 - 60, The CI currently disables
static analysis by setting -DSTRICT_LINT=OFF and -DCMAKE_CXX_CLANG_TIDY="" for
all matrix builds; restore static-analysis as a quality gate by removing or
reverting those flags in the main build matrix (set -DSTRICT_LINT=ON and remove
the empty -DCMAKE_CXX_CLANG_TIDY override so the default clang-tidy runs) and if
noise is a concern, move clang-tidy into a separate non-blocking job that runs
the same configuration with -DCMAKE_CXX_CLANG_TIDY enabled (use the existing
matrix but mark that job as non-blocking) so static analysis is preserved
without blocking developer iteration.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (2)
src/main.cpp (2)

415-427: ⚠️ Potential issue | 🔴 Critical

Guard shard routing against empty and unstable node ordering.

compute_shard(..., data_nodes.size()) is unsafe when no data nodes are active, and index-based routing is non-deterministic unless data_nodes order is stabilized before indexing.

🐛 Proposed fix
+#include <algorithm>
...
                             auto data_nodes = cluster_manager->get_data_nodes();
+                            if (data_nodes.empty()) {
+                                throw std::runtime_error("No active data nodes for shuffle");
+                            }
+                            std::sort(data_nodes.begin(), data_nodes.end(),
+                                      [](const auto& a, const auto& b) { return a.id < b.id; });

                             std::unordered_map<std::string, std::vector<cloudsql::executor::Tuple>>
                                 partitions;
...
                                     uint32_t node_idx =
                                         cloudsql::cluster::ShardManager::compute_shard(
                                             key_val, static_cast<uint32_t>(data_nodes.size()));
+                                    if (node_idx >= data_nodes.size()) {
+                                        throw std::runtime_error("Shard routing index out of bounds");
+                                    }
                                     partitions[data_nodes[node_idx].id].push_back(
                                         std::move(meta.tuple));
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main.cpp` around lines 415 - 427, Guard against empty data_nodes and
stabilize their ordering before using index-based routing: after calling
cluster_manager->get_data_nodes(), check if data_nodes.empty() and handle that
case (return/error/skip) instead of calling ShardManager::compute_shard; also
produce a deterministic order (e.g., sort data_nodes by id) before computing
node_idx so data_nodes[node_idx].id is stable when populating partitions. Ensure
you reference data_nodes, cluster_manager->get_data_nodes(),
cloudsql::cluster::ShardManager::compute_shard, and partitions when implementing
the checks and ordering.

432-458: ⚠️ Potential issue | 🔴 Critical

Do not mark shuffle successful on partial/failed partition delivery.

The current loop ignores PushData connect/call/response failures and still sets reply.success = true. That can silently lose rows while reporting success.

🐛 Proposed fix
-                            // 2. Push partitions to peers
-                            for (auto& [node_id, rows] : partitions) {
-                                // Find node info
-                                const cloudsql::cluster::NodeInfo* target_node = nullptr;
-                                for (const auto& n : data_nodes) {
-                                    if (n.id == node_id) {
-                                        target_node = &n;
-                                        break;
-                                    }
-                                }
-
-                                if (target_node != nullptr) {
-                                    cloudsql::network::RpcClient client(target_node->address,
-                                                                        target_node->cluster_port);
-                                    if (client.connect()) {
-                                        cloudsql::network::PushDataArgs push_args;
-                                        push_args.context_id = args.context_id;
-                                        push_args.table_name = args.table_name;
-                                        push_args.rows = std::move(rows);
-                                        std::vector<uint8_t> resp;
-                                        static_cast<void>(
-                                            client.call(cloudsql::network::RpcType::PushData,
-                                                        push_args.serialize(), resp));
-                                    }
-                                }
-                            }
-                            reply.success = true;
+                            // 2. Push partitions to every data node and verify delivery
+                            bool overall_success = true;
+                            std::string first_error;
+                            for (const auto& n : data_nodes) {
+                                cloudsql::network::PushDataArgs push_args;
+                                push_args.context_id = args.context_id;
+                                push_args.table_name = args.table_name;
+                                if (auto it = partitions.find(n.id); it != partitions.end()) {
+                                    push_args.rows = std::move(it->second);
+                                }
+
+                                cloudsql::network::RpcClient client(n.address, n.cluster_port);
+                                if (!client.connect()) {
+                                    overall_success = false;
+                                    if (first_error.empty()) {
+                                        first_error = "PushData connect failed for node: " + n.id;
+                                    }
+                                    continue;
+                                }
+                                std::vector<uint8_t> resp;
+                                if (!client.call(cloudsql::network::RpcType::PushData,
+                                                 push_args.serialize(), resp)) {
+                                    overall_success = false;
+                                    if (first_error.empty()) {
+                                        first_error = "PushData RPC failed for node: " + n.id;
+                                    }
+                                    continue;
+                                }
+                                auto push_reply =
+                                    cloudsql::network::QueryResultsReply::deserialize(resp);
+                                if (!push_reply.success) {
+                                    overall_success = false;
+                                    if (first_error.empty()) {
+                                        first_error = "PushData rejected by node: " + n.id +
+                                                      " - " + push_reply.error_msg;
+                                    }
+                                }
+                            }
+                            reply.success = overall_success;
+                            if (!overall_success) {
+                                reply.error_msg = first_error;
+                            }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main.cpp` around lines 432 - 458, The loop that pushes partitions
(variables: partitions, target_node, cloudsql::network::RpcClient, PushDataArgs)
currently ignores connection/call/response failures and unconditionally sets
reply.success = true; change it to record per-partition success (e.g., a bool
all_succeeded = true) and for each node set all_succeeded = false if
client.connect() fails or if the RPC call returns an error/invalid response, and
only set reply.success = true when all_succeeded remains true; make sure to
preserve/move rows only on successful send and propagate/log which
node/partition failed so callers can retry or surface an error.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@src/main.cpp`:
- Around line 415-427: Guard against empty data_nodes and stabilize their
ordering before using index-based routing: after calling
cluster_manager->get_data_nodes(), check if data_nodes.empty() and handle that
case (return/error/skip) instead of calling ShardManager::compute_shard; also
produce a deterministic order (e.g., sort data_nodes by id) before computing
node_idx so data_nodes[node_idx].id is stable when populating partitions. Ensure
you reference data_nodes, cluster_manager->get_data_nodes(),
cloudsql::cluster::ShardManager::compute_shard, and partitions when implementing
the checks and ordering.
- Around line 432-458: The loop that pushes partitions (variables: partitions,
target_node, cloudsql::network::RpcClient, PushDataArgs) currently ignores
connection/call/response failures and unconditionally sets reply.success = true;
change it to record per-partition success (e.g., a bool all_succeeded = true)
and for each node set all_succeeded = false if client.connect() fails or if the
RPC call returns an error/invalid response, and only set reply.success = true
when all_succeeded remains true; make sure to preserve/move rows only on
successful send and propagate/log which node/partition failed so callers can
retry or surface an error.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c235da6 and a3dffe9.

📒 Files selected for processing (1)
  • src/main.cpp

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
src/distributed/distributed_executor.cpp (1)

459-465: Consider checking PushData RPC success in broadcast_table.

Unlike the shuffle logic which now validates push responses, broadcast_table ignores PushData failures. This could lead to incomplete data distribution during broadcasts.

♻️ Proposed fix to validate push responses
     for (const auto& node : data_nodes) {
         network::RpcClient client(node.address, node.cluster_port);
         if (client.connect()) {
             std::vector<uint8_t> resp_payload;
-            static_cast<void>(client.call(network::RpcType::PushData, push_payload, resp_payload));
+            if (!client.call(network::RpcType::PushData, push_payload, resp_payload)) {
+                return false;  // Or accumulate errors
+            }
+            auto reply = network::QueryResultsReply::deserialize(resp_payload);
+            if (!reply.success) {
+                return false;
+            }
+        } else {
+            return false;
         }
     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/distributed/distributed_executor.cpp` around lines 459 - 465, The
broadcast_table loop currently ignores the result of the PushData RPC; update
the logic in broadcast_table to capture and validate the RPC response from
network::RpcClient::call (used with network::RpcType::PushData and
push_payload), check the returned status/resp_payload for success, and handle
failures (e.g., log an error via the existing logger, mark the node as failed,
and return or propagate an error/boolean) instead of discarding the result;
ensure the updated behavior mirrors the shuffle logic's validation pattern so
incomplete broadcasts are detected and reported.
src/main.cpp (1)

450-488: Consider handling missing target node as an error.

The loop iterates over partitions which are pre-initialized from data_nodes, so target_node should always be found. However, if the node list changes between initialization (line 429-431) and the push loop (line 450), or if there's a logic error, the target_node == nullptr case silently succeeds for that partition.

♻️ Optional: Add defensive error handling
                            for (auto& [node_id, rows] : partitions) {
                                // Find node info
                                const cloudsql::cluster::NodeInfo* target_node = nullptr;
                                for (const auto& n : data_nodes) {
                                    if (n.id == node_id) {
                                        target_node = &n;
                                        break;
                                    }
                                }

-                               if (target_node != nullptr) {
+                               if (target_node == nullptr) {
+                                   overall_success = false;
+                                   delivery_errors += "Node not found: " + node_id + "; ";
+                                   continue;
+                               }
+
                                    cloudsql::network::RpcClient client(target_node->address,
                                                                        target_node->cluster_port);
                                    // ... rest of the code
-                               }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main.cpp` around lines 450 - 488, The code currently ignores the case
where target_node remains nullptr inside the loop over partitions; update the
loop that finds target_node (iterating partitions and data_nodes) to treat a
missing target_node as a failure by setting overall_success = false and
appending a clear message to delivery_errors (include node_id and that the node
info was missing) before continuing to the next partition; ensure this uses the
same symbols (partitions, node_id, target_node, data_nodes, overall_success,
delivery_errors) so the defensive handling is applied whenever a node lookup
fails.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/distributed/distributed_executor.cpp`:
- Line 267: Replace the incorrect identifier GLOBAL_TX_ID with the correct
GLOBAL_TXN_ID where the transaction id is set (e.g., change the assignment to
args.txn_id = GLOBAL_TXN_ID); update any nearby uses of GLOBAL_TX_ID in
distributed_executor.cpp that refer to the global transaction identifier to use
GLOBAL_TXN_ID and rebuild to ensure the symbol resolves.

---

Nitpick comments:
In `@src/distributed/distributed_executor.cpp`:
- Around line 459-465: The broadcast_table loop currently ignores the result of
the PushData RPC; update the logic in broadcast_table to capture and validate
the RPC response from network::RpcClient::call (used with
network::RpcType::PushData and push_payload), check the returned
status/resp_payload for success, and handle failures (e.g., log an error via the
existing logger, mark the node as failed, and return or propagate an
error/boolean) instead of discarding the result; ensure the updated behavior
mirrors the shuffle logic's validation pattern so incomplete broadcasts are
detected and reported.

In `@src/main.cpp`:
- Around line 450-488: The code currently ignores the case where target_node
remains nullptr inside the loop over partitions; update the loop that finds
target_node (iterating partitions and data_nodes) to treat a missing target_node
as a failure by setting overall_success = false and appending a clear message to
delivery_errors (include node_id and that the node info was missing) before
continuing to the next partition; ensure this uses the same symbols (partitions,
node_id, target_node, data_nodes, overall_success, delivery_errors) so the
defensive handling is applied whenever a node lookup fails.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a3dffe9 and fd5bf3d.

📒 Files selected for processing (4)
  • include/network/rpc_message.hpp
  • src/distributed/distributed_executor.cpp
  • src/main.cpp
  • tests/distributed_tests.cpp

@poyrazK poyrazK merged commit eff4798 into main Mar 2, 2026
5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

documentation Improvements or additions to documentation enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant