datafusion_physical_optimizer/join_selection.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! The [`JoinSelection`] rule tries to modify a given plan so that it can
19//! accommodate infinite sources and utilize statistical information (if there
20//! is any) to obtain more performant plans. To achieve the first goal, it
21//! tries to transform a non-runnable query (with the given infinite sources)
22//! into a runnable query by replacing pipeline-breaking join operations with
23//! pipeline-friendly ones. To achieve the second goal, it selects the proper
24//! `PartitionMode` and the build side using the available statistics for hash joins.
25
26use crate::PhysicalOptimizerRule;
27use datafusion_common::config::ConfigOptions;
28use datafusion_common::error::Result;
29use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
30use datafusion_common::{internal_err, JoinSide, JoinType};
31use datafusion_expr_common::sort_properties::SortProperties;
32use datafusion_physical_expr::expressions::Column;
33use datafusion_physical_expr::LexOrdering;
34use datafusion_physical_plan::execution_plan::EmissionType;
35use datafusion_physical_plan::joins::utils::ColumnIndex;
36use datafusion_physical_plan::joins::{
37 CrossJoinExec, HashJoinExec, NestedLoopJoinExec, PartitionMode,
38 StreamJoinPartitionMode, SymmetricHashJoinExec,
39};
40use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
41use std::sync::Arc;
42
43/// The [`JoinSelection`] rule tries to modify a given plan so that it can
44/// accommodate infinite sources and optimize joins in the plan according to
45/// available statistical information, if there is any.
46#[derive(Default, Debug)]
47pub struct JoinSelection {}
48
49impl JoinSelection {
50 #[allow(missing_docs)]
51 pub fn new() -> Self {
52 Self {}
53 }
54}
55
56// TODO: We need some performance test for Right Semi/Right Join swap to Left Semi/Left Join in case that the right side is smaller but not much smaller.
57// TODO: In PrestoSQL, the optimizer flips join sides only if one side is much smaller than the other by more than SIZE_DIFFERENCE_THRESHOLD times, by default is 8 times.
58/// Checks statistics for join swap.
59pub(crate) fn should_swap_join_order(
60 left: &dyn ExecutionPlan,
61 right: &dyn ExecutionPlan,
62) -> Result<bool> {
63 // Get the left and right table's total bytes
64 // If both the left and right tables contain total_byte_size statistics,
65 // use `total_byte_size` to determine `should_swap_join_order`, else use `num_rows`
66 let left_stats = left.partition_statistics(None)?;
67 let right_stats = right.partition_statistics(None)?;
68 // First compare `total_byte_size` of left and right side,
69 // if information in this field is insufficient fallback to the `num_rows`
70 match (
71 left_stats.total_byte_size.get_value(),
72 right_stats.total_byte_size.get_value(),
73 ) {
74 (Some(l), Some(r)) => Ok(l > r),
75 _ => match (
76 left_stats.num_rows.get_value(),
77 right_stats.num_rows.get_value(),
78 ) {
79 (Some(l), Some(r)) => Ok(l > r),
80 _ => Ok(false),
81 },
82 }
83}
84
85fn supports_collect_by_thresholds(
86 plan: &dyn ExecutionPlan,
87 threshold_byte_size: usize,
88 threshold_num_rows: usize,
89) -> bool {
90 // Currently we do not trust the 0 value from stats, due to stats collection might have bug
91 // TODO check the logic in datasource::get_statistics_with_limit()
92 let Ok(stats) = plan.partition_statistics(None) else {
93 return false;
94 };
95
96 if let Some(byte_size) = stats.total_byte_size.get_value() {
97 *byte_size != 0 && *byte_size < threshold_byte_size
98 } else if let Some(num_rows) = stats.num_rows.get_value() {
99 *num_rows != 0 && *num_rows < threshold_num_rows
100 } else {
101 false
102 }
103}
104
105impl PhysicalOptimizerRule for JoinSelection {
106 fn optimize(
107 &self,
108 plan: Arc<dyn ExecutionPlan>,
109 config: &ConfigOptions,
110 ) -> Result<Arc<dyn ExecutionPlan>> {
111 // First, we make pipeline-fixing modifications to joins so as to accommodate
112 // unbounded inputs. Each pipeline-fixing subrule, which is a function
113 // of type `PipelineFixerSubrule`, takes a single [`PipelineStatePropagator`]
114 // argument storing state variables that indicate the unboundedness status
115 // of the current [`ExecutionPlan`] as we traverse the plan tree.
116 let subrules: Vec<Box<PipelineFixerSubrule>> = vec![
117 Box::new(hash_join_convert_symmetric_subrule),
118 Box::new(hash_join_swap_subrule),
119 ];
120 let new_plan = plan
121 .transform_up(|p| apply_subrules(p, &subrules, config))
122 .data()?;
123 // Next, we apply another subrule that tries to optimize joins using any
124 // statistics their inputs might have.
125 // - For a hash join with partition mode [`PartitionMode::Auto`], we will
126 // make a cost-based decision to select which `PartitionMode` mode
127 // (`Partitioned`/`CollectLeft`) is optimal. If the statistics information
128 // is not available, we will fall back to [`PartitionMode::Partitioned`].
129 // - We optimize/swap join sides so that the left (build) side of the join
130 // is the small side. If the statistics information is not available, we
131 // do not modify join sides.
132 // - We will also swap left and right sides for cross joins so that the left
133 // side is the small side.
134 let config = &config.optimizer;
135 let collect_threshold_byte_size = config.hash_join_single_partition_threshold;
136 let collect_threshold_num_rows = config.hash_join_single_partition_threshold_rows;
137 new_plan
138 .transform_up(|plan| {
139 statistical_join_selection_subrule(
140 plan,
141 collect_threshold_byte_size,
142 collect_threshold_num_rows,
143 )
144 })
145 .data()
146 }
147
148 fn name(&self) -> &str {
149 "join_selection"
150 }
151
152 fn schema_check(&self) -> bool {
153 true
154 }
155}
156
157/// Tries to create a [`HashJoinExec`] in [`PartitionMode::CollectLeft`] when possible.
158///
159/// This function will first consider the given join type and check whether the
160/// `CollectLeft` mode is applicable. Otherwise, it will try to swap the join sides.
161/// When the `ignore_threshold` is false, this function will also check left
162/// and right sizes in bytes or rows.
163pub(crate) fn try_collect_left(
164 hash_join: &HashJoinExec,
165 ignore_threshold: bool,
166 threshold_byte_size: usize,
167 threshold_num_rows: usize,
168) -> Result<Option<Arc<dyn ExecutionPlan>>> {
169 let left = hash_join.left();
170 let right = hash_join.right();
171
172 let left_can_collect = ignore_threshold
173 || supports_collect_by_thresholds(
174 &**left,
175 threshold_byte_size,
176 threshold_num_rows,
177 );
178 let right_can_collect = ignore_threshold
179 || supports_collect_by_thresholds(
180 &**right,
181 threshold_byte_size,
182 threshold_num_rows,
183 );
184
185 match (left_can_collect, right_can_collect) {
186 (true, true) => {
187 if hash_join.join_type().supports_swap()
188 && should_swap_join_order(&**left, &**right)?
189 {
190 Ok(Some(hash_join.swap_inputs(PartitionMode::CollectLeft)?))
191 } else {
192 Ok(Some(Arc::new(HashJoinExec::try_new(
193 Arc::clone(left),
194 Arc::clone(right),
195 hash_join.on().to_vec(),
196 hash_join.filter().cloned(),
197 hash_join.join_type(),
198 hash_join.projection.clone(),
199 PartitionMode::CollectLeft,
200 hash_join.null_equality(),
201 )?)))
202 }
203 }
204 (true, false) => Ok(Some(Arc::new(HashJoinExec::try_new(
205 Arc::clone(left),
206 Arc::clone(right),
207 hash_join.on().to_vec(),
208 hash_join.filter().cloned(),
209 hash_join.join_type(),
210 hash_join.projection.clone(),
211 PartitionMode::CollectLeft,
212 hash_join.null_equality(),
213 )?))),
214 (false, true) => {
215 if hash_join.join_type().supports_swap() {
216 hash_join.swap_inputs(PartitionMode::CollectLeft).map(Some)
217 } else {
218 Ok(None)
219 }
220 }
221 (false, false) => Ok(None),
222 }
223}
224
225/// Creates a partitioned hash join execution plan, swapping inputs if beneficial.
226///
227/// Checks if the join order should be swapped based on the join type and input statistics.
228/// If swapping is optimal and supported, creates a swapped partitioned hash join; otherwise,
229/// creates a standard partitioned hash join.
230pub(crate) fn partitioned_hash_join(
231 hash_join: &HashJoinExec,
232) -> Result<Arc<dyn ExecutionPlan>> {
233 let left = hash_join.left();
234 let right = hash_join.right();
235 if hash_join.join_type().supports_swap() && should_swap_join_order(&**left, &**right)?
236 {
237 hash_join.swap_inputs(PartitionMode::Partitioned)
238 } else {
239 Ok(Arc::new(HashJoinExec::try_new(
240 Arc::clone(left),
241 Arc::clone(right),
242 hash_join.on().to_vec(),
243 hash_join.filter().cloned(),
244 hash_join.join_type(),
245 hash_join.projection.clone(),
246 PartitionMode::Partitioned,
247 hash_join.null_equality(),
248 )?))
249 }
250}
251
252/// This subrule tries to modify a given plan so that it can
253/// optimize hash and cross joins in the plan according to available statistical information.
254fn statistical_join_selection_subrule(
255 plan: Arc<dyn ExecutionPlan>,
256 collect_threshold_byte_size: usize,
257 collect_threshold_num_rows: usize,
258) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
259 let transformed =
260 if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
261 match hash_join.partition_mode() {
262 PartitionMode::Auto => try_collect_left(
263 hash_join,
264 false,
265 collect_threshold_byte_size,
266 collect_threshold_num_rows,
267 )?
268 .map_or_else(
269 || partitioned_hash_join(hash_join).map(Some),
270 |v| Ok(Some(v)),
271 )?,
272 PartitionMode::CollectLeft => try_collect_left(hash_join, true, 0, 0)?
273 .map_or_else(
274 || partitioned_hash_join(hash_join).map(Some),
275 |v| Ok(Some(v)),
276 )?,
277 PartitionMode::Partitioned => {
278 let left = hash_join.left();
279 let right = hash_join.right();
280 if hash_join.join_type().supports_swap()
281 && should_swap_join_order(&**left, &**right)?
282 {
283 hash_join
284 .swap_inputs(PartitionMode::Partitioned)
285 .map(Some)?
286 } else {
287 None
288 }
289 }
290 }
291 } else if let Some(cross_join) = plan.as_any().downcast_ref::<CrossJoinExec>() {
292 let left = cross_join.left();
293 let right = cross_join.right();
294 if should_swap_join_order(&**left, &**right)? {
295 cross_join.swap_inputs().map(Some)?
296 } else {
297 None
298 }
299 } else if let Some(nl_join) = plan.as_any().downcast_ref::<NestedLoopJoinExec>() {
300 let left = nl_join.left();
301 let right = nl_join.right();
302 if nl_join.join_type().supports_swap()
303 && should_swap_join_order(&**left, &**right)?
304 {
305 nl_join.swap_inputs().map(Some)?
306 } else {
307 None
308 }
309 } else {
310 None
311 };
312
313 Ok(if let Some(transformed) = transformed {
314 Transformed::yes(transformed)
315 } else {
316 Transformed::no(plan)
317 })
318}
319
320/// Pipeline-fixing join selection subrule.
321pub type PipelineFixerSubrule =
322 dyn Fn(Arc<dyn ExecutionPlan>, &ConfigOptions) -> Result<Arc<dyn ExecutionPlan>>;
323
324/// Converts a hash join to a symmetric hash join if both its inputs are
325/// unbounded and incremental.
326///
327/// This subrule checks if a hash join can be replaced with a symmetric hash join when dealing
328/// with unbounded (infinite) inputs on both sides. This replacement avoids pipeline breaking and
329/// preserves query runnability. If the replacement is applicable, this subrule makes this change;
330/// otherwise, it leaves the input unchanged.
331///
332/// # Arguments
333/// * `input` - The current state of the pipeline, including the execution plan.
334/// * `config_options` - Configuration options that might affect the transformation logic.
335///
336/// # Returns
337/// An `Option` that contains the `Result` of the transformation. If the transformation is not applicable,
338/// it returns `None`. If applicable, it returns `Some(Ok(...))` with the modified pipeline state,
339/// or `Some(Err(...))` if an error occurs during the transformation.
340fn hash_join_convert_symmetric_subrule(
341 input: Arc<dyn ExecutionPlan>,
342 config_options: &ConfigOptions,
343) -> Result<Arc<dyn ExecutionPlan>> {
344 // Check if the current plan node is a HashJoinExec.
345 if let Some(hash_join) = input.as_any().downcast_ref::<HashJoinExec>() {
346 let left_unbounded = hash_join.left.boundedness().is_unbounded();
347 let left_incremental = matches!(
348 hash_join.left.pipeline_behavior(),
349 EmissionType::Incremental | EmissionType::Both
350 );
351 let right_unbounded = hash_join.right.boundedness().is_unbounded();
352 let right_incremental = matches!(
353 hash_join.right.pipeline_behavior(),
354 EmissionType::Incremental | EmissionType::Both
355 );
356 // Process only if both left and right sides are unbounded and incrementally emit.
357 if left_unbounded && right_unbounded & left_incremental & right_incremental {
358 // Determine the partition mode based on configuration.
359 let mode = if config_options.optimizer.repartition_joins {
360 StreamJoinPartitionMode::Partitioned
361 } else {
362 StreamJoinPartitionMode::SinglePartition
363 };
364 // A closure to determine the required sort order for each side of the join in the SymmetricHashJoinExec.
365 // This function checks if the columns involved in the filter have any specific ordering requirements.
366 // If the child nodes (left or right side of the join) already have a defined order and the columns used in the
367 // filter predicate are ordered, this function captures that ordering requirement. The identified order is then
368 // used in the SymmetricHashJoinExec to maintain bounded memory during join operations.
369 // However, if the child nodes do not have an inherent order, or if the filter columns are unordered,
370 // the function concludes that no specific order is required for the SymmetricHashJoinExec. This approach
371 // ensures that the symmetric hash join operation only imposes ordering constraints when necessary,
372 // based on the properties of the child nodes and the filter condition.
373 let determine_order = |side: JoinSide| -> Option<LexOrdering> {
374 hash_join
375 .filter()
376 .map(|filter| {
377 filter.column_indices().iter().any(
378 |ColumnIndex {
379 index,
380 side: column_side,
381 }| {
382 // Skip if column side does not match the join side.
383 if *column_side != side {
384 return false;
385 }
386 // Retrieve equivalence properties and schema based on the side.
387 let (equivalence, schema) = match side {
388 JoinSide::Left => (
389 hash_join.left().equivalence_properties(),
390 hash_join.left().schema(),
391 ),
392 JoinSide::Right => (
393 hash_join.right().equivalence_properties(),
394 hash_join.right().schema(),
395 ),
396 JoinSide::None => return false,
397 };
398
399 let name = schema.field(*index).name();
400 let col = Arc::new(Column::new(name, *index)) as _;
401 // Check if the column is ordered.
402 equivalence.get_expr_properties(col).sort_properties
403 != SortProperties::Unordered
404 },
405 )
406 })
407 .unwrap_or(false)
408 .then(|| {
409 match side {
410 JoinSide::Left => hash_join.left().output_ordering(),
411 JoinSide::Right => hash_join.right().output_ordering(),
412 JoinSide::None => unreachable!(),
413 }
414 .cloned()
415 })
416 .flatten()
417 };
418
419 // Determine the sort order for both left and right sides.
420 let left_order = determine_order(JoinSide::Left);
421 let right_order = determine_order(JoinSide::Right);
422
423 return SymmetricHashJoinExec::try_new(
424 Arc::clone(hash_join.left()),
425 Arc::clone(hash_join.right()),
426 hash_join.on().to_vec(),
427 hash_join.filter().cloned(),
428 hash_join.join_type(),
429 hash_join.null_equality(),
430 left_order,
431 right_order,
432 mode,
433 )
434 .map(|exec| Arc::new(exec) as _);
435 }
436 }
437 Ok(input)
438}
439
440/// This subrule will swap build/probe sides of a hash join depending on whether
441/// one of its inputs may produce an infinite stream of records. The rule ensures
442/// that the left (build) side of the hash join always operates on an input stream
443/// that will produce a finite set of records. If the left side can not be chosen
444/// to be "finite", the join sides stay the same as the original query.
445/// ```text
446/// For example, this rule makes the following transformation:
447///
448///
449///
450/// +--------------+ +--------------+
451/// | | unbounded | |
452/// Left | Infinite | true | Hash |\true
453/// | Data source |--------------| Repartition | \ +--------------+ +--------------+
454/// | | | | \ | | | |
455/// +--------------+ +--------------+ - | Hash Join |-------| Projection |
456/// - | | | |
457/// +--------------+ +--------------+ / +--------------+ +--------------+
458/// | | unbounded | | /
459/// Right | Finite | false | Hash |/false
460/// | Data Source |--------------| Repartition |
461/// | | | |
462/// +--------------+ +--------------+
463///
464///
465///
466/// +--------------+ +--------------+
467/// | | unbounded | |
468/// Left | Finite | false | Hash |\false
469/// | Data source |--------------| Repartition | \ +--------------+ +--------------+
470/// | | | | \ | | true | | true
471/// +--------------+ +--------------+ - | Hash Join |-------| Projection |-----
472/// - | | | |
473/// +--------------+ +--------------+ / +--------------+ +--------------+
474/// | | unbounded | | /
475/// Right | Infinite | true | Hash |/true
476/// | Data Source |--------------| Repartition |
477/// | | | |
478/// +--------------+ +--------------+
479/// ```
480pub fn hash_join_swap_subrule(
481 mut input: Arc<dyn ExecutionPlan>,
482 _config_options: &ConfigOptions,
483) -> Result<Arc<dyn ExecutionPlan>> {
484 if let Some(hash_join) = input.as_any().downcast_ref::<HashJoinExec>() {
485 if hash_join.left.boundedness().is_unbounded()
486 && !hash_join.right.boundedness().is_unbounded()
487 && matches!(
488 *hash_join.join_type(),
489 JoinType::Inner
490 | JoinType::Left
491 | JoinType::LeftSemi
492 | JoinType::LeftAnti
493 )
494 {
495 input = swap_join_according_to_unboundedness(hash_join)?;
496 }
497 }
498 Ok(input)
499}
500
501/// This function swaps sides of a hash join to make it runnable even if one of
502/// its inputs are infinite. Note that this is not always possible; i.e.
503/// [`JoinType::Full`], [`JoinType::Right`], [`JoinType::RightAnti`] and
504/// [`JoinType::RightSemi`] can not run with an unbounded left side, even if
505/// we swap join sides. Therefore, we do not consider them here.
506/// This function is crate public as it is useful for downstream projects
507/// to implement, or experiment with, their own join selection rules.
508pub(crate) fn swap_join_according_to_unboundedness(
509 hash_join: &HashJoinExec,
510) -> Result<Arc<dyn ExecutionPlan>> {
511 let partition_mode = hash_join.partition_mode();
512 let join_type = hash_join.join_type();
513 match (*partition_mode, *join_type) {
514 (
515 _,
516 JoinType::Right
517 | JoinType::RightSemi
518 | JoinType::RightAnti
519 | JoinType::RightMark
520 | JoinType::Full,
521 ) => internal_err!("{join_type} join cannot be swapped for unbounded input."),
522 (PartitionMode::Partitioned, _) => {
523 hash_join.swap_inputs(PartitionMode::Partitioned)
524 }
525 (PartitionMode::CollectLeft, _) => {
526 hash_join.swap_inputs(PartitionMode::CollectLeft)
527 }
528 (PartitionMode::Auto, _) => {
529 // Use `PartitionMode::Partitioned` as default if `Auto` is selected.
530 hash_join.swap_inputs(PartitionMode::Partitioned)
531 }
532 }
533}
534
535/// Apply given `PipelineFixerSubrule`s to a given plan. This plan, along with
536/// auxiliary boundedness information, is in the `PipelineStatePropagator` object.
537fn apply_subrules(
538 mut input: Arc<dyn ExecutionPlan>,
539 subrules: &Vec<Box<PipelineFixerSubrule>>,
540 config_options: &ConfigOptions,
541) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
542 for subrule in subrules {
543 input = subrule(input, config_options)?;
544 }
545 Ok(Transformed::yes(input))
546}
547
548// See tests in datafusion/core/tests/physical_optimizer