datafusion_physical_plan/joins/hash_join/
shared_bounds.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Utilities for shared bounds. Used in dynamic filter pushdown in Hash Joins.
19// TODO: include the link to the Dynamic Filter blog post.
20
21use std::fmt;
22use std::sync::Arc;
23
24use crate::joins::PartitionMode;
25use crate::ExecutionPlan;
26use crate::ExecutionPlanProperties;
27
28use datafusion_common::{Result, ScalarValue};
29use datafusion_expr::Operator;
30use datafusion_physical_expr::expressions::{lit, BinaryExpr, DynamicFilterPhysicalExpr};
31use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};
32
33use itertools::Itertools;
34use parking_lot::Mutex;
35use tokio::sync::Barrier;
36
37/// Represents the minimum and maximum values for a specific column.
38/// Used in dynamic filter pushdown to establish value boundaries.
39#[derive(Debug, Clone, PartialEq)]
40pub(crate) struct ColumnBounds {
41    /// The minimum value observed for this column
42    min: ScalarValue,
43    /// The maximum value observed for this column  
44    max: ScalarValue,
45}
46
47impl ColumnBounds {
48    pub(crate) fn new(min: ScalarValue, max: ScalarValue) -> Self {
49        Self { min, max }
50    }
51}
52
53/// Represents the bounds for all join key columns from a single partition.
54/// This contains the min/max values computed from one partition's build-side data.
55#[derive(Debug, Clone)]
56pub(crate) struct PartitionBounds {
57    /// Partition identifier for debugging and determinism (not strictly necessary)
58    partition: usize,
59    /// Min/max bounds for each join key column in this partition.
60    /// Index corresponds to the join key expression index.
61    column_bounds: Vec<ColumnBounds>,
62}
63
64impl PartitionBounds {
65    pub(crate) fn new(partition: usize, column_bounds: Vec<ColumnBounds>) -> Self {
66        Self {
67            partition,
68            column_bounds,
69        }
70    }
71
72    pub(crate) fn len(&self) -> usize {
73        self.column_bounds.len()
74    }
75
76    pub(crate) fn get_column_bounds(&self, index: usize) -> Option<&ColumnBounds> {
77        self.column_bounds.get(index)
78    }
79}
80
81/// Coordinates dynamic filter bounds collection across multiple partitions
82///
83/// This structure ensures that dynamic filters are built with complete information from all
84/// relevant partitions before being applied to probe-side scans. Incomplete filters would
85/// incorrectly eliminate valid join results.
86///
87/// ## Synchronization Strategy
88///
89/// 1. Each partition computes bounds from its build-side data
90/// 2. Bounds are stored in the shared vector
91/// 3. A barrier tracks how many partitions have reported their bounds
92/// 4. When the last partition reports, bounds are merged and the filter is updated exactly once
93///
94/// ## Partition Counting
95///
96/// The `total_partitions` count represents how many times `collect_build_side` will be called:
97/// - **CollectLeft**: Number of output partitions (each accesses shared build data)
98/// - **Partitioned**: Number of input partitions (each builds independently)
99///
100/// ## Thread Safety
101///
102/// All fields use a single mutex to ensure correct coordination between concurrent
103/// partition executions.
104pub(crate) struct SharedBoundsAccumulator {
105    /// Shared state protected by a single mutex to avoid ordering concerns
106    inner: Mutex<SharedBoundsState>,
107    barrier: Barrier,
108    /// Dynamic filter for pushdown to probe side
109    dynamic_filter: Arc<DynamicFilterPhysicalExpr>,
110    /// Right side join expressions needed for creating filter bounds
111    on_right: Vec<PhysicalExprRef>,
112}
113
114/// State protected by SharedBoundsAccumulator's mutex
115struct SharedBoundsState {
116    /// Bounds from completed partitions.
117    /// Each element represents the column bounds computed by one partition.
118    bounds: Vec<PartitionBounds>,
119}
120
121impl SharedBoundsAccumulator {
122    /// Creates a new SharedBoundsAccumulator configured for the given partition mode
123    ///
124    /// This method calculates how many times `collect_build_side` will be called based on the
125    /// partition mode's execution pattern. This count is critical for determining when we have
126    /// complete information from all partitions to build the dynamic filter.
127    ///
128    /// ## Partition Mode Execution Patterns
129    ///
130    /// - **CollectLeft**: Build side is collected ONCE from partition 0 and shared via `OnceFut`
131    ///   across all output partitions. Each output partition calls `collect_build_side` to access the shared build data.
132    ///   Although this results in multiple invocations, the  `report_partition_bounds` function contains deduplication logic to handle them safely.
133    ///   Expected calls = number of output partitions.
134    ///
135    ///
136    /// - **Partitioned**: Each partition independently builds its own hash table by calling
137    ///   `collect_build_side` once. Expected calls = number of build partitions.
138    ///
139    /// - **Auto**: Placeholder mode resolved during optimization. Uses 1 as safe default since
140    ///   the actual mode will be determined and a new bounds_accumulator created before execution.
141    ///
142    /// ## Why This Matters
143    ///
144    /// We cannot build a partial filter from some partitions - it would incorrectly eliminate
145    /// valid join results. We must wait until we have complete bounds information from ALL
146    /// relevant partitions before updating the dynamic filter.
147    pub(crate) fn new_from_partition_mode(
148        partition_mode: PartitionMode,
149        left_child: &dyn ExecutionPlan,
150        right_child: &dyn ExecutionPlan,
151        dynamic_filter: Arc<DynamicFilterPhysicalExpr>,
152        on_right: Vec<PhysicalExprRef>,
153    ) -> Self {
154        // Troubleshooting: If partition counts are incorrect, verify this logic matches
155        // the actual execution pattern in collect_build_side()
156        let expected_calls = match partition_mode {
157            // Each output partition accesses shared build data
158            PartitionMode::CollectLeft => {
159                right_child.output_partitioning().partition_count()
160            }
161            // Each partition builds its own data
162            PartitionMode::Partitioned => {
163                left_child.output_partitioning().partition_count()
164            }
165            // Default value, will be resolved during optimization (does not exist once `execute()` is called; will be replaced by one of the other two)
166            PartitionMode::Auto => unreachable!("PartitionMode::Auto should not be present at execution time. This is a bug in DataFusion, please report it!"),
167        };
168        Self {
169            inner: Mutex::new(SharedBoundsState {
170                bounds: Vec::with_capacity(expected_calls),
171            }),
172            barrier: Barrier::new(expected_calls),
173            dynamic_filter,
174            on_right,
175        }
176    }
177
178    /// Create a filter expression from individual partition bounds using OR logic.
179    ///
180    /// This creates a filter where each partition's bounds form a conjunction (AND)
181    /// of column range predicates, and all partitions are combined with OR.
182    ///
183    /// For example, with 2 partitions and 2 columns:
184    /// ((col0 >= p0_min0 AND col0 <= p0_max0 AND col1 >= p0_min1 AND col1 <= p0_max1)
185    ///  OR
186    ///  (col0 >= p1_min0 AND col0 <= p1_max0 AND col1 >= p1_min1 AND col1 <= p1_max1))
187    pub(crate) fn create_filter_from_partition_bounds(
188        &self,
189        bounds: &[PartitionBounds],
190    ) -> Result<Arc<dyn PhysicalExpr>> {
191        if bounds.is_empty() {
192            return Ok(lit(true));
193        }
194
195        // Create a predicate for each partition
196        let mut partition_predicates = Vec::with_capacity(bounds.len());
197
198        for partition_bounds in bounds.iter().sorted_by_key(|b| b.partition) {
199            // Create range predicates for each join key in this partition
200            let mut column_predicates = Vec::with_capacity(partition_bounds.len());
201
202            for (col_idx, right_expr) in self.on_right.iter().enumerate() {
203                if let Some(column_bounds) = partition_bounds.get_column_bounds(col_idx) {
204                    // Create predicate: col >= min AND col <= max
205                    let min_expr = Arc::new(BinaryExpr::new(
206                        Arc::clone(right_expr),
207                        Operator::GtEq,
208                        lit(column_bounds.min.clone()),
209                    )) as Arc<dyn PhysicalExpr>;
210                    let max_expr = Arc::new(BinaryExpr::new(
211                        Arc::clone(right_expr),
212                        Operator::LtEq,
213                        lit(column_bounds.max.clone()),
214                    )) as Arc<dyn PhysicalExpr>;
215                    let range_expr =
216                        Arc::new(BinaryExpr::new(min_expr, Operator::And, max_expr))
217                            as Arc<dyn PhysicalExpr>;
218                    column_predicates.push(range_expr);
219                }
220            }
221
222            // Combine all column predicates for this partition with AND
223            if !column_predicates.is_empty() {
224                let partition_predicate = column_predicates
225                    .into_iter()
226                    .reduce(|acc, pred| {
227                        Arc::new(BinaryExpr::new(acc, Operator::And, pred))
228                            as Arc<dyn PhysicalExpr>
229                    })
230                    .unwrap();
231                partition_predicates.push(partition_predicate);
232            }
233        }
234
235        // Combine all partition predicates with OR
236        let combined_predicate = partition_predicates
237            .into_iter()
238            .reduce(|acc, pred| {
239                Arc::new(BinaryExpr::new(acc, Operator::Or, pred))
240                    as Arc<dyn PhysicalExpr>
241            })
242            .unwrap_or_else(|| lit(true));
243
244        Ok(combined_predicate)
245    }
246
247    /// Report bounds from a completed partition and update dynamic filter if all partitions are done
248    ///
249    /// This method coordinates the dynamic filter updates across all partitions. It stores the
250    /// bounds from the current partition, increments the completion counter, and when all
251    /// partitions have reported, creates an OR'd filter from individual partition bounds.
252    ///
253    /// This method is async and uses a [`tokio::sync::Barrier`] to wait for all partitions
254    /// to report their bounds. Once that occurs, the method will resolve for all callers and the
255    /// dynamic filter will be updated exactly once.
256    ///
257    /// # Note
258    ///
259    /// As barriers are reusable, it is likely an error to call this method more times than the
260    /// total number of partitions - as it can lead to pending futures that never resolve. We rely
261    /// on correct usage from the caller rather than imposing additional checks here. If this is a concern,
262    /// consider making the resulting future shared so the ready result can be reused.
263    ///
264    /// # Arguments
265    /// * `left_side_partition_id` - The identifier for the **left-side** partition reporting its bounds
266    /// * `partition_bounds` - The bounds computed by this partition (if any)
267    ///
268    /// # Returns
269    /// * `Result<()>` - Ok if successful, Err if filter update failed
270    pub(crate) async fn report_partition_bounds(
271        &self,
272        left_side_partition_id: usize,
273        partition_bounds: Option<Vec<ColumnBounds>>,
274    ) -> Result<()> {
275        // Store bounds in the accumulator - this runs once per partition
276        if let Some(bounds) = partition_bounds {
277            let mut guard = self.inner.lock();
278
279            let should_push = if let Some(last_bound) = guard.bounds.last() {
280                // In `PartitionMode::CollectLeft`, all streams on the left side share the same partition id (0).
281                // Since this function can be called multiple times for that same partition, we must deduplicate
282                // by checking against the last recorded bound.
283                last_bound.partition != left_side_partition_id
284            } else {
285                true
286            };
287
288            if should_push {
289                guard
290                    .bounds
291                    .push(PartitionBounds::new(left_side_partition_id, bounds));
292            }
293        }
294
295        if self.barrier.wait().await.is_leader() {
296            // All partitions have reported, so we can update the filter
297            let inner = self.inner.lock();
298            if !inner.bounds.is_empty() {
299                let filter_expr =
300                    self.create_filter_from_partition_bounds(&inner.bounds)?;
301                self.dynamic_filter.update(filter_expr)?;
302            }
303        }
304
305        Ok(())
306    }
307}
308
309impl fmt::Debug for SharedBoundsAccumulator {
310    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
311        write!(f, "SharedBoundsAccumulator")
312    }
313}