datafusion_physical_plan/joins/hash_join/shared_bounds.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Utilities for shared bounds. Used in dynamic filter pushdown in Hash Joins.
19// TODO: include the link to the Dynamic Filter blog post.
20
21use std::fmt;
22use std::sync::Arc;
23
24use crate::joins::PartitionMode;
25use crate::ExecutionPlan;
26use crate::ExecutionPlanProperties;
27
28use datafusion_common::{Result, ScalarValue};
29use datafusion_expr::Operator;
30use datafusion_physical_expr::expressions::{lit, BinaryExpr, DynamicFilterPhysicalExpr};
31use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};
32
33use itertools::Itertools;
34use parking_lot::Mutex;
35use tokio::sync::Barrier;
36
37/// Represents the minimum and maximum values for a specific column.
38/// Used in dynamic filter pushdown to establish value boundaries.
39#[derive(Debug, Clone, PartialEq)]
40pub(crate) struct ColumnBounds {
41 /// The minimum value observed for this column
42 min: ScalarValue,
43 /// The maximum value observed for this column
44 max: ScalarValue,
45}
46
47impl ColumnBounds {
48 pub(crate) fn new(min: ScalarValue, max: ScalarValue) -> Self {
49 Self { min, max }
50 }
51}
52
53/// Represents the bounds for all join key columns from a single partition.
54/// This contains the min/max values computed from one partition's build-side data.
55#[derive(Debug, Clone)]
56pub(crate) struct PartitionBounds {
57 /// Partition identifier for debugging and determinism (not strictly necessary)
58 partition: usize,
59 /// Min/max bounds for each join key column in this partition.
60 /// Index corresponds to the join key expression index.
61 column_bounds: Vec<ColumnBounds>,
62}
63
64impl PartitionBounds {
65 pub(crate) fn new(partition: usize, column_bounds: Vec<ColumnBounds>) -> Self {
66 Self {
67 partition,
68 column_bounds,
69 }
70 }
71
72 pub(crate) fn len(&self) -> usize {
73 self.column_bounds.len()
74 }
75
76 pub(crate) fn get_column_bounds(&self, index: usize) -> Option<&ColumnBounds> {
77 self.column_bounds.get(index)
78 }
79}
80
81/// Coordinates dynamic filter bounds collection across multiple partitions
82///
83/// This structure ensures that dynamic filters are built with complete information from all
84/// relevant partitions before being applied to probe-side scans. Incomplete filters would
85/// incorrectly eliminate valid join results.
86///
87/// ## Synchronization Strategy
88///
89/// 1. Each partition computes bounds from its build-side data
90/// 2. Bounds are stored in the shared vector
91/// 3. A barrier tracks how many partitions have reported their bounds
92/// 4. When the last partition reports, bounds are merged and the filter is updated exactly once
93///
94/// ## Partition Counting
95///
96/// The `total_partitions` count represents how many times `collect_build_side` will be called:
97/// - **CollectLeft**: Number of output partitions (each accesses shared build data)
98/// - **Partitioned**: Number of input partitions (each builds independently)
99///
100/// ## Thread Safety
101///
102/// All fields use a single mutex to ensure correct coordination between concurrent
103/// partition executions.
104pub(crate) struct SharedBoundsAccumulator {
105 /// Shared state protected by a single mutex to avoid ordering concerns
106 inner: Mutex<SharedBoundsState>,
107 barrier: Barrier,
108 /// Dynamic filter for pushdown to probe side
109 dynamic_filter: Arc<DynamicFilterPhysicalExpr>,
110 /// Right side join expressions needed for creating filter bounds
111 on_right: Vec<PhysicalExprRef>,
112}
113
114/// State protected by SharedBoundsAccumulator's mutex
115struct SharedBoundsState {
116 /// Bounds from completed partitions.
117 /// Each element represents the column bounds computed by one partition.
118 bounds: Vec<PartitionBounds>,
119}
120
121impl SharedBoundsAccumulator {
122 /// Creates a new SharedBoundsAccumulator configured for the given partition mode
123 ///
124 /// This method calculates how many times `collect_build_side` will be called based on the
125 /// partition mode's execution pattern. This count is critical for determining when we have
126 /// complete information from all partitions to build the dynamic filter.
127 ///
128 /// ## Partition Mode Execution Patterns
129 ///
130 /// - **CollectLeft**: Build side is collected ONCE from partition 0 and shared via `OnceFut`
131 /// across all output partitions. Each output partition calls `collect_build_side` to access the shared build data.
132 /// Although this results in multiple invocations, the `report_partition_bounds` function contains deduplication logic to handle them safely.
133 /// Expected calls = number of output partitions.
134 ///
135 ///
136 /// - **Partitioned**: Each partition independently builds its own hash table by calling
137 /// `collect_build_side` once. Expected calls = number of build partitions.
138 ///
139 /// - **Auto**: Placeholder mode resolved during optimization. Uses 1 as safe default since
140 /// the actual mode will be determined and a new bounds_accumulator created before execution.
141 ///
142 /// ## Why This Matters
143 ///
144 /// We cannot build a partial filter from some partitions - it would incorrectly eliminate
145 /// valid join results. We must wait until we have complete bounds information from ALL
146 /// relevant partitions before updating the dynamic filter.
147 pub(crate) fn new_from_partition_mode(
148 partition_mode: PartitionMode,
149 left_child: &dyn ExecutionPlan,
150 right_child: &dyn ExecutionPlan,
151 dynamic_filter: Arc<DynamicFilterPhysicalExpr>,
152 on_right: Vec<PhysicalExprRef>,
153 ) -> Self {
154 // Troubleshooting: If partition counts are incorrect, verify this logic matches
155 // the actual execution pattern in collect_build_side()
156 let expected_calls = match partition_mode {
157 // Each output partition accesses shared build data
158 PartitionMode::CollectLeft => {
159 right_child.output_partitioning().partition_count()
160 }
161 // Each partition builds its own data
162 PartitionMode::Partitioned => {
163 left_child.output_partitioning().partition_count()
164 }
165 // Default value, will be resolved during optimization (does not exist once `execute()` is called; will be replaced by one of the other two)
166 PartitionMode::Auto => unreachable!("PartitionMode::Auto should not be present at execution time. This is a bug in DataFusion, please report it!"),
167 };
168 Self {
169 inner: Mutex::new(SharedBoundsState {
170 bounds: Vec::with_capacity(expected_calls),
171 }),
172 barrier: Barrier::new(expected_calls),
173 dynamic_filter,
174 on_right,
175 }
176 }
177
178 /// Create a filter expression from individual partition bounds using OR logic.
179 ///
180 /// This creates a filter where each partition's bounds form a conjunction (AND)
181 /// of column range predicates, and all partitions are combined with OR.
182 ///
183 /// For example, with 2 partitions and 2 columns:
184 /// ((col0 >= p0_min0 AND col0 <= p0_max0 AND col1 >= p0_min1 AND col1 <= p0_max1)
185 /// OR
186 /// (col0 >= p1_min0 AND col0 <= p1_max0 AND col1 >= p1_min1 AND col1 <= p1_max1))
187 pub(crate) fn create_filter_from_partition_bounds(
188 &self,
189 bounds: &[PartitionBounds],
190 ) -> Result<Arc<dyn PhysicalExpr>> {
191 if bounds.is_empty() {
192 return Ok(lit(true));
193 }
194
195 // Create a predicate for each partition
196 let mut partition_predicates = Vec::with_capacity(bounds.len());
197
198 for partition_bounds in bounds.iter().sorted_by_key(|b| b.partition) {
199 // Create range predicates for each join key in this partition
200 let mut column_predicates = Vec::with_capacity(partition_bounds.len());
201
202 for (col_idx, right_expr) in self.on_right.iter().enumerate() {
203 if let Some(column_bounds) = partition_bounds.get_column_bounds(col_idx) {
204 // Create predicate: col >= min AND col <= max
205 let min_expr = Arc::new(BinaryExpr::new(
206 Arc::clone(right_expr),
207 Operator::GtEq,
208 lit(column_bounds.min.clone()),
209 )) as Arc<dyn PhysicalExpr>;
210 let max_expr = Arc::new(BinaryExpr::new(
211 Arc::clone(right_expr),
212 Operator::LtEq,
213 lit(column_bounds.max.clone()),
214 )) as Arc<dyn PhysicalExpr>;
215 let range_expr =
216 Arc::new(BinaryExpr::new(min_expr, Operator::And, max_expr))
217 as Arc<dyn PhysicalExpr>;
218 column_predicates.push(range_expr);
219 }
220 }
221
222 // Combine all column predicates for this partition with AND
223 if !column_predicates.is_empty() {
224 let partition_predicate = column_predicates
225 .into_iter()
226 .reduce(|acc, pred| {
227 Arc::new(BinaryExpr::new(acc, Operator::And, pred))
228 as Arc<dyn PhysicalExpr>
229 })
230 .unwrap();
231 partition_predicates.push(partition_predicate);
232 }
233 }
234
235 // Combine all partition predicates with OR
236 let combined_predicate = partition_predicates
237 .into_iter()
238 .reduce(|acc, pred| {
239 Arc::new(BinaryExpr::new(acc, Operator::Or, pred))
240 as Arc<dyn PhysicalExpr>
241 })
242 .unwrap_or_else(|| lit(true));
243
244 Ok(combined_predicate)
245 }
246
247 /// Report bounds from a completed partition and update dynamic filter if all partitions are done
248 ///
249 /// This method coordinates the dynamic filter updates across all partitions. It stores the
250 /// bounds from the current partition, increments the completion counter, and when all
251 /// partitions have reported, creates an OR'd filter from individual partition bounds.
252 ///
253 /// This method is async and uses a [`tokio::sync::Barrier`] to wait for all partitions
254 /// to report their bounds. Once that occurs, the method will resolve for all callers and the
255 /// dynamic filter will be updated exactly once.
256 ///
257 /// # Note
258 ///
259 /// As barriers are reusable, it is likely an error to call this method more times than the
260 /// total number of partitions - as it can lead to pending futures that never resolve. We rely
261 /// on correct usage from the caller rather than imposing additional checks here. If this is a concern,
262 /// consider making the resulting future shared so the ready result can be reused.
263 ///
264 /// # Arguments
265 /// * `left_side_partition_id` - The identifier for the **left-side** partition reporting its bounds
266 /// * `partition_bounds` - The bounds computed by this partition (if any)
267 ///
268 /// # Returns
269 /// * `Result<()>` - Ok if successful, Err if filter update failed
270 pub(crate) async fn report_partition_bounds(
271 &self,
272 left_side_partition_id: usize,
273 partition_bounds: Option<Vec<ColumnBounds>>,
274 ) -> Result<()> {
275 // Store bounds in the accumulator - this runs once per partition
276 if let Some(bounds) = partition_bounds {
277 let mut guard = self.inner.lock();
278
279 let should_push = if let Some(last_bound) = guard.bounds.last() {
280 // In `PartitionMode::CollectLeft`, all streams on the left side share the same partition id (0).
281 // Since this function can be called multiple times for that same partition, we must deduplicate
282 // by checking against the last recorded bound.
283 last_bound.partition != left_side_partition_id
284 } else {
285 true
286 };
287
288 if should_push {
289 guard
290 .bounds
291 .push(PartitionBounds::new(left_side_partition_id, bounds));
292 }
293 }
294
295 if self.barrier.wait().await.is_leader() {
296 // All partitions have reported, so we can update the filter
297 let inner = self.inner.lock();
298 if !inner.bounds.is_empty() {
299 let filter_expr =
300 self.create_filter_from_partition_bounds(&inner.bounds)?;
301 self.dynamic_filter.update(filter_expr)?;
302 }
303 }
304
305 Ok(())
306 }
307}
308
309impl fmt::Debug for SharedBoundsAccumulator {
310 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
311 write!(f, "SharedBoundsAccumulator")
312 }
313}