Skip to content

Commit 7904cae

Browse files
committed
fix: address review feedback - bloom filter validation, hoisting, docs
1 parent bfd846c commit 7904cae

4 files changed

Lines changed: 95 additions & 45 deletions

File tree

docs/performance/SQLITE_COMPARISON.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,14 +70,15 @@ from join keys before buffering
7070
| `BloomFilter` class | `include/common/bloom_filter.hpp` | MurmurHash3-based bloom filter |
7171
| `BloomFilterArgs` RPC | `include/network/rpc_message.hpp` | Serialization for network transfer |
7272
| `ClusterManager` storage | `include/common/cluster_manager.hpp` | Stores bloom filter per context |
73-
| `PushData` handler | `src/main.cpp` | Applies bloom filter before buffering |
73+
| `PushData` handler | `src/main.cpp` | Receives and buffers filtered tuples |
74+
| `ShuffleFragment` handler | `src/main.cpp` | Applies bloom filter before sending |
7475
| Coordinator | `src/distributed/distributed_executor.cpp` | Broadcasts filter after Phase 1 |
7576

7677
### Test Coverage
7778
- 10 unit tests covering: BloomFilter class, BloomFilterArgs serialization, ClusterManager storage, filter application logic
7879
- Tests located in `tests/bloom_filter_test.cpp`
7980

80-
## 8. Future Roadmap
81+
## 7. Future Roadmap
8182
With the scan gap closed, our focus shifts to higher-level analytical throughput:
8283
* **Stage 1: SIMD-Accelerated Filtering**: Utilize AVX-512/NEON instructions to filter multiple rows in a single CPU cycle.
8384
* **Stage 2: Vectorized Execution**: Move from row-at-a-time `TupleView` to batch-at-a-time `VectorBatch` processing.

src/common/bloom_filter.cpp

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,46 @@ BloomFilter::BloomFilter(const uint8_t* data, size_t size) {
8484
expected_elements_ = static_cast<size_t>(tmp_expected);
8585
offset += sizeof(uint64_t);
8686

87-
// Validate bit array size
88-
size_t bit_bytes = (num_bits_ + 7) / 8;
89-
if (size < offset + bit_bytes) {
90-
// Truncated payload - reset to safe empty state
87+
// Validate header fields before using them
88+
constexpr size_t MAX_BITS = (1ULL << 40); // ~1TB max, reasonable upper bound
89+
constexpr size_t MAX_HASHES = 64; // reasonable upper bound
90+
constexpr size_t MAX_EXPECTED = (1ULL << 30); // ~1B elements max
91+
92+
if (num_bits_ == 0 || num_bits_ > MAX_BITS) {
93+
num_bits_ = 0;
94+
num_hashes_ = 0;
95+
expected_elements_ = 0;
96+
bits_.clear();
97+
return;
98+
}
99+
if (num_hashes_ > MAX_HASHES) {
100+
num_bits_ = 0;
101+
num_hashes_ = 0;
102+
expected_elements_ = 0;
103+
bits_.clear();
104+
return;
105+
}
106+
if (expected_elements_ > MAX_EXPECTED) {
107+
num_bits_ = 0;
108+
num_hashes_ = 0;
109+
expected_elements_ = 0;
110+
bits_.clear();
111+
return;
112+
}
113+
114+
// Validate bit array size and overflow safety
115+
size_t bit_bytes = 0;
116+
if (num_bits_ > (SIZE_MAX - 7) / 8) {
117+
num_bits_ = 0;
118+
num_hashes_ = 0;
119+
expected_elements_ = 0;
120+
bits_.clear();
121+
return;
122+
}
123+
bit_bytes = (num_bits_ + 7) / 8;
124+
125+
// Check that bit_bytes fits in remaining payload
126+
if (bit_bytes > size || offset > size || bit_bytes > size - offset) {
91127
num_bits_ = 0;
92128
num_hashes_ = 0;
93129
expected_elements_ = 0;

src/main.cpp

Lines changed: 34 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -499,8 +499,8 @@ int main(int argc, char* argv[]) {
499499
if (cluster_manager != nullptr) {
500500
cluster_manager->set_bloom_filter(
501501
args.context_id, args.build_table, args.probe_table,
502-
args.probe_key_col, args.filter_data, args.expected_elements,
503-
args.num_hashes);
502+
args.probe_key_col, std::move(args.filter_data),
503+
args.expected_elements, args.num_hashes);
504504
}
505505
cloudsql::network::QueryResultsReply reply;
506506
reply.success = true;
@@ -571,6 +571,30 @@ int main(int argc, char* argv[]) {
571571
bool overall_success = true;
572572
std::string delivery_errors;
573573

574+
// Hoist bloom filter and key resolution out of per-destination loop
575+
cloudsql::common::BloomFilter bloom;
576+
bool have_bloom = false;
577+
size_t key_idx = static_cast<size_t>(-1);
578+
579+
if (cluster_manager->has_bloom_filter(args.context_id)) {
580+
bloom = cluster_manager->get_bloom_filter(args.context_id);
581+
std::string probe_key_col = cluster_manager->get_probe_key_col(args.context_id);
582+
583+
if (!probe_key_col.empty()) {
584+
auto table_meta_opt = catalog->get_table_by_name(args.table_name);
585+
if (table_meta_opt.has_value()) {
586+
const auto* table_meta = table_meta_opt.value();
587+
for (size_t i = 0; i < table_meta->columns.size(); ++i) {
588+
if (table_meta->columns[i].name == probe_key_col) {
589+
key_idx = i;
590+
break;
591+
}
592+
}
593+
}
594+
}
595+
have_bloom = (key_idx != static_cast<size_t>(-1));
596+
}
597+
574598
for (auto& [node_id, rows] : partitions) {
575599
const cloudsql::cluster::NodeInfo* target_node = nullptr;
576600
for (const auto& n : data_nodes) {
@@ -590,42 +614,16 @@ int main(int argc, char* argv[]) {
590614
}
591615

592616
// Apply bloom filter on sender side before sending
593-
std::vector<cloudsql::executor::Tuple> rows_to_send =
594-
std::move(rows);
595-
if (cluster_manager->has_bloom_filter(args.context_id)) {
596-
auto bloom =
597-
cluster_manager->get_bloom_filter(args.context_id);
598-
std::string probe_key_col =
599-
cluster_manager->get_probe_key_col(args.context_id);
600-
601-
if (!probe_key_col.empty()) {
602-
// Find key column index in current table
603-
auto table_meta_opt =
604-
catalog->get_table_by_name(args.table_name);
605-
if (table_meta_opt.has_value()) {
606-
const auto* table_meta = table_meta_opt.value();
607-
size_t key_idx = static_cast<size_t>(-1);
608-
for (size_t i = 0; i < table_meta->columns.size();
609-
++i) {
610-
if (table_meta->columns[i].name ==
611-
probe_key_col) {
612-
key_idx = i;
613-
break;
614-
}
615-
}
616-
617-
if (key_idx != static_cast<size_t>(-1)) {
618-
std::vector<cloudsql::executor::Tuple> filtered;
619-
filtered.reserve(rows_to_send.size());
620-
for (auto& row : rows_to_send) {
621-
if (bloom.might_contain(row.get(key_idx))) {
622-
filtered.push_back(std::move(row));
623-
}
624-
}
625-
rows_to_send = std::move(filtered);
626-
}
617+
std::vector<cloudsql::executor::Tuple> rows_to_send = std::move(rows);
618+
if (have_bloom) {
619+
std::vector<cloudsql::executor::Tuple> filtered;
620+
filtered.reserve(rows_to_send.size());
621+
for (auto& row : rows_to_send) {
622+
if (bloom.might_contain(row.get(key_idx))) {
623+
filtered.push_back(std::move(row));
627624
}
628625
}
626+
rows_to_send = std::move(filtered);
629627
}
630628

631629
cloudsql::network::PushDataArgs push_args;

tests/bloom_filter_test.cpp

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,14 +165,21 @@ TEST(BloomFilterTests, DifferentValueTypes) {
165165
* @brief Tests BloomFilterArgs serialization round-trip.
166166
*/
167167
TEST(BloomFilterTests, BloomFilterArgsSerialization) {
168+
// Create a real bloom filter and use its serialized form
169+
BloomFilter original(50);
170+
original.insert(Value::make_int64(10));
171+
original.insert(Value::make_int64(20));
172+
original.insert(Value::make_text("hello"));
173+
std::vector<uint8_t> real_filter_data = original.serialize();
174+
168175
BloomFilterArgs args;
169176
args.context_id = "ctx_123";
170177
args.build_table = "users";
171178
args.probe_table = "orders";
172179
args.probe_key_col = "user_id";
173-
args.filter_data = {0x01, 0x02, 0x03};
174-
args.expected_elements = 1000;
175-
args.num_hashes = 4;
180+
args.filter_data = real_filter_data;
181+
args.expected_elements = original.expected_elements();
182+
args.num_hashes = original.num_hashes();
176183

177184
auto serialized = args.serialize();
178185
auto deserialized = BloomFilterArgs::deserialize(serialized);
@@ -185,6 +192,14 @@ TEST(BloomFilterTests, BloomFilterArgsSerialization) {
185192
EXPECT_EQ(args.num_hashes, deserialized.num_hashes);
186193
ASSERT_EQ(args.filter_data.size(), deserialized.filter_data.size());
187194
EXPECT_EQ(args.filter_data, deserialized.filter_data);
195+
196+
// Reconstruct bloom filter from deserialized data and verify it works
197+
BloomFilter reconstructed(deserialized.filter_data.data(), deserialized.filter_data.size());
198+
EXPECT_EQ(reconstructed.expected_elements(), original.expected_elements());
199+
EXPECT_EQ(reconstructed.num_hashes(), original.num_hashes());
200+
EXPECT_TRUE(reconstructed.might_contain(Value::make_int64(10)));
201+
EXPECT_TRUE(reconstructed.might_contain(Value::make_int64(20)));
202+
EXPECT_TRUE(reconstructed.might_contain(Value::make_text("hello")));
188203
}
189204

190205
/**

0 commit comments

Comments
 (0)