Skip to content

Commit 767b8d1

Browse files
committed
perf: SIMD short-circuit in JoinHashMap probe
[AURON-2160] Optimize join hash map probe by checking hash_matched first before computing empty mask. This reduces ~50% SIMD instructions when hash hit rate is high (typical join scenarios). Before: Always compute both hash_matched and empty SIMD masks. After: Only compute empty mask when hash_matched has no hits. Also add a criterion microbenchmark (benches/join_hash_map.rs) covering realistic BHJ build sizes (5M/10M/20M keys) × three hit rates (0/50/100%). Results on Apple M2 Pro (probe_size=4096): build size | hit=0% | hit=50% | hit=100% ----------------+---------+---------+--------- 5M (~128 MB) | 6.63 µs | 6.52 µs | 6.35 µs 10M (~256 MB) | 6.68 µs | 6.50 µs | 6.36 µs 20M (~512 MB) | 6.70 µs | 6.59 µs | 6.36 µs Latency stays flat because prefetch_read_data (4-step ahead) fully pipelines cache misses. The hit=100% path is consistently ~4-5% faster, aligning with the optimization goal. Instruction-count savings can be confirmed on x86 via: perf stat -e instructions Run benchmark: cargo bench --bench join_hash_map -p datafusion-ext-plans
1 parent 016c7a4 commit 767b8d1

9 files changed

Lines changed: 1066 additions & 576 deletions

File tree

Cargo.lock

Lines changed: 925 additions & 569 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ parking_lot = "0.12.5"
182182
paste = "1.0.15"
183183
procfs = "0.18.0"
184184
prost = "0.14.3"
185+
criterion = { version = "0.5", features = ["html_reports"] }
185186
prost-types = "0.14.3"
186187
prost-reflect = "0.16.3"
187188
rand = "0.9.3"

native-engine/datafusion-ext-plans/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,4 +73,9 @@ sonic-rs = { workspace = true }
7373
procfs = { workspace = true }
7474

7575
[dev-dependencies]
76+
criterion = { workspace = true }
7677
rand = { workspace = true }
78+
79+
[[bench]]
80+
name = "join_hash_map"
81+
harness = false
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one or more
2+
// contributor license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright ownership.
4+
// The ASF licenses this file to You under the Apache License, Version 2.0
5+
// (the "License"); you may not use this file except in compliance with
6+
// the License. You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
//! Microbenchmarks for JoinHashMap::lookup_many.
17+
//!
18+
//! Sweeps three build-side map sizes to cover different cache regimes:
19+
//! - 4K keys (~2 MB map) → fits in L2, mostly cache-hot
20+
//! - 64K keys (~32 MB map) → spills into L3
21+
//! - 1M keys (~512 MB map)→ well beyond L3, cache-cold
22+
//!
23+
//! For each map size, three probe hit rates are measured:
24+
//! - 0% (all misses)
25+
//! - 50% (half hit, half miss)
26+
//! - 100% (all hits — typical inner-join scenario)
27+
//!
28+
//! Probe batch size is fixed at 4096 to match a typical Spark batch.
29+
30+
use std::sync::Arc;
31+
32+
use arrow::{
33+
array::Int32Array,
34+
datatypes::{DataType, Field, Schema},
35+
record_batch::RecordBatch,
36+
};
37+
use criterion::{BenchmarkId, Criterion, black_box, criterion_group, criterion_main};
38+
use datafusion_ext_plans::joins::join_hash_map::JoinHashMap;
39+
40+
/// Probe batch size — matches the default Spark vectorized batch size.
41+
const PROBE_SIZE: usize = 4096;
42+
43+
/// Build-side hash base: top bit always set so hashes are non-zero
44+
/// (matches join_create_hashes convention).
45+
const BUILD_HASH_BASE: u32 = 0x8000_0001;
46+
47+
fn make_map(build_size: usize) -> (JoinHashMap, Vec<u32>) {
48+
let values: Vec<i32> = (0..build_size as i32).collect();
49+
let array = Arc::new(Int32Array::from(values));
50+
let schema = Arc::new(Schema::new(vec![Field::new("k", DataType::Int32, false)]));
51+
let batch = RecordBatch::try_new(schema, vec![array.clone() as _]).expect("build batch");
52+
53+
let build_hashes: Vec<u32> = (0..build_size as u32)
54+
.map(|i| BUILD_HASH_BASE.wrapping_add(i))
55+
.collect();
56+
57+
let map = JoinHashMap::create_from_data_batch_and_hashes(
58+
batch,
59+
vec![array as _],
60+
build_hashes.clone(),
61+
)
62+
.expect("build map");
63+
64+
(map, build_hashes)
65+
}
66+
67+
/// Generate PROBE_SIZE hashes at the given hit rate.
68+
/// Hits and misses are interleaved evenly (Bresenham) to avoid cache grouping
69+
/// bias.
70+
fn make_probe_hashes(build_hashes: &[u32], build_size: usize, hit_rate: f64) -> Vec<u32> {
71+
// Miss hashes start well above the build range to avoid accidental collision.
72+
let miss_base: u32 = BUILD_HASH_BASE.wrapping_add(build_size as u32 + 0x0010_0000);
73+
let build_len = build_hashes.len();
74+
(0..PROBE_SIZE)
75+
.map(|i| {
76+
let cumulative = ((i + 1) as f64 * hit_rate) as usize;
77+
let prev = (i as f64 * hit_rate) as usize;
78+
if cumulative > prev {
79+
build_hashes[i % build_len]
80+
} else {
81+
miss_base.wrapping_add(i as u32)
82+
}
83+
})
84+
.collect()
85+
}
86+
87+
fn bench_lookup_many(c: &mut Criterion) {
88+
// (label, build_size)
89+
// map memory ≈ (build_size * 2 / 8).next_power_of_two() * 64 bytes
90+
// 5M → ~128 MB (realistic BHJ, ~50 MB serialized data)
91+
// 10M → ~256 MB (realistic BHJ, ~100–200 MB serialized data)
92+
// 20M → ~512 MB (realistic BHJ, ~1 GB serialized data)
93+
let build_sizes: &[(&str, usize)] = &[
94+
("build=5M", 5_000_000),
95+
("build=10M", 10_000_000),
96+
("build=20M", 20_000_000),
97+
];
98+
let hit_rates: &[(&str, f64)] = &[("hit=0%", 0.0), ("hit=50%", 0.5), ("hit=100%", 1.0)];
99+
100+
let mut group = c.benchmark_group("JoinHashMap::lookup_many");
101+
102+
for &(size_label, build_size) in build_sizes {
103+
let (map, build_hashes) = make_map(build_size);
104+
for &(rate_label, hit_rate) in hit_rates {
105+
let probe = make_probe_hashes(&build_hashes, build_size, hit_rate);
106+
let label = format!("{size_label}/{rate_label}");
107+
group.bench_with_input(BenchmarkId::from_parameter(&label), &label, |b, _| {
108+
b.iter(|| {
109+
let result = map.lookup_many(black_box(probe.clone()));
110+
black_box(result)
111+
});
112+
});
113+
}
114+
}
115+
116+
group.finish();
117+
}
118+
119+
criterion_group!(benches, bench_lookup_many);
120+
criterion_main!(benches);

native-engine/datafusion-ext-plans/src/agg/acc.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -536,8 +536,8 @@ impl AccColumn for AccBytesColumn {
536536
if array_data.is_valid(i) {
537537
let offset_begin = offset_buffer[i] as usize;
538538
let offset_end = offset_buffer[i + 1] as usize;
539-
self.items.push(Some(AccBytes::from_slice(
540-
&data_buffer[offset_begin..offset_end],
539+
self.items.push(Some(AccBytes::from(
540+
data_buffer[offset_begin..offset_end].as_ref(),
541541
)));
542542
} else {
543543
self.items.push(None);

native-engine/datafusion-ext-plans/src/agg/agg_hash_map.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ impl AggHashMapKey for &[u8] {
179179
}
180180

181181
fn into_owned(self) -> OwnedKey {
182-
OwnedKey::from_slice(self)
182+
OwnedKey::from(self.as_ref())
183183
}
184184
}
185185

native-engine/datafusion-ext-plans/src/agg/first.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ impl Agg for AggFirst {
118118
((acc_idx, partial_arg_idx) in (acc_idx, partial_arg_idx)) => {
119119
if flag_accs.value(acc_idx).is_none() {
120120
if partial_arg.is_valid(partial_arg_idx) {
121-
value_accs.set_value(acc_idx, Some(AccBytes::from(partial_arg.value(partial_arg_idx).as_ref())));
121+
value_accs.set_value(acc_idx, Some(AccBytes::from(partial_arg.value(partial_arg_idx).as_ref() as &[u8])));
122122
flag_accs.set_value(acc_idx, Some(true));
123123
} else {
124124
value_accs.set_value(acc_idx, None);

native-engine/datafusion-ext-plans/src/agg/first_ignores_null.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ impl Agg for AggFirstIgnoresNull {
107107
idx_for_zipped! {
108108
((acc_idx, partial_arg_idx) in (acc_idx, partial_arg_idx)) => {
109109
if accs.value(acc_idx).is_none() && partial_arg.is_valid(partial_arg_idx) {
110-
accs.set_value(acc_idx, Some(AccBytes::from(partial_arg.value(partial_arg_idx).as_ref())));
110+
accs.set_value(acc_idx, Some(AccBytes::from(partial_arg.value(partial_arg_idx).as_ref() as &[u8])));
111111
}
112112
}
113113
}

native-engine/datafusion-ext-plans/src/joins/join_hash_map.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -253,15 +253,23 @@ impl Table {
253253
let mut e = entries![i] as usize;
254254
loop {
255255
let hash_matched = self.map[e].hashes.simd_eq(Simd::splat(hashes[i]));
256-
let empty = self.map[e].hashes.simd_eq(Simd::splat(0));
257256

258-
if let Some(pos) = (hash_matched | empty).first_set() {
257+
// Fast path: check hash match first (common case)
258+
if let Some(pos) = hash_matched.first_set() {
259259
hashes[i] = unsafe {
260260
// safety: transmute MapValue(u32) to u32
261261
std::mem::transmute(self.map[e].values[pos])
262262
};
263263
break;
264264
}
265+
266+
// Slow path: check empty slot only when no match
267+
let empty = self.map[e].hashes.simd_eq(Simd::splat(0));
268+
if empty.any() {
269+
hashes[i] = MapValue::EMPTY.0;
270+
break;
271+
}
272+
265273
e += 1;
266274
e %= 1 << self.map_mod_bits;
267275
}

0 commit comments

Comments
 (0)