datafusion_physical_optimizer/enforce_distribution.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! EnforceDistribution optimizer rule inspects the physical plan with respect
19//! to distribution requirements and adds [`RepartitionExec`]s to satisfy them
20//! when necessary. If increasing parallelism is beneficial (and also desirable
21//! according to the configuration), this rule increases partition counts in
22//! the physical plan.
23
24use std::fmt::Debug;
25use std::sync::Arc;
26
27use crate::optimizer::PhysicalOptimizerRule;
28use crate::output_requirements::OutputRequirementExec;
29use crate::utils::{
30 add_sort_above_with_check, is_coalesce_partitions, is_repartition,
31 is_sort_preserving_merge,
32};
33
34use arrow::compute::SortOptions;
35use datafusion_common::config::ConfigOptions;
36use datafusion_common::error::Result;
37use datafusion_common::stats::Precision;
38use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
39use datafusion_expr::logical_plan::JoinType;
40use datafusion_physical_expr::expressions::{Column, NoOp};
41use datafusion_physical_expr::utils::map_columns_before_projection;
42use datafusion_physical_expr::{
43 physical_exprs_equal, EquivalenceProperties, PhysicalExpr, PhysicalExprRef,
44};
45use datafusion_physical_plan::aggregates::{
46 AggregateExec, AggregateMode, PhysicalGroupBy,
47};
48use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
49use datafusion_physical_plan::execution_plan::EmissionType;
50use datafusion_physical_plan::joins::{
51 CrossJoinExec, HashJoinExec, PartitionMode, SortMergeJoinExec,
52};
53use datafusion_physical_plan::projection::{ProjectionExec, ProjectionExpr};
54use datafusion_physical_plan::repartition::RepartitionExec;
55use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
56use datafusion_physical_plan::tree_node::PlanContext;
57use datafusion_physical_plan::union::{can_interleave, InterleaveExec, UnionExec};
58use datafusion_physical_plan::windows::WindowAggExec;
59use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAggExec};
60use datafusion_physical_plan::ExecutionPlanProperties;
61use datafusion_physical_plan::{Distribution, ExecutionPlan, Partitioning};
62
63use itertools::izip;
64
65/// The `EnforceDistribution` rule ensures that distribution requirements are
66/// met. In doing so, this rule will increase the parallelism in the plan by
67/// introducing repartitioning operators to the physical plan.
68///
69/// For example, given an input such as:
70///
71///
72/// ```text
73/// ┌─────────────────────────────────┐
74/// │ │
75/// │ ExecutionPlan │
76/// │ │
77/// └─────────────────────────────────┘
78/// ▲ ▲
79/// │ │
80/// ┌─────┘ └─────┐
81/// │ │
82/// │ │
83/// │ │
84/// ┌───────────┐ ┌───────────┐
85/// │ │ │ │
86/// │ batch A1 │ │ batch B1 │
87/// │ │ │ │
88/// ├───────────┤ ├───────────┤
89/// │ │ │ │
90/// │ batch A2 │ │ batch B2 │
91/// │ │ │ │
92/// ├───────────┤ ├───────────┤
93/// │ │ │ │
94/// │ batch A3 │ │ batch B3 │
95/// │ │ │ │
96/// └───────────┘ └───────────┘
97///
98/// Input Input
99/// A B
100/// ```
101///
102/// This rule will attempt to add a `RepartitionExec` to increase parallelism
103/// (to 3, in this case) and create the following arrangement:
104///
105/// ```text
106/// ┌─────────────────────────────────┐
107/// │ │
108/// │ ExecutionPlan │
109/// │ │
110/// └─────────────────────────────────┘
111/// ▲ ▲ ▲ Input now has 3
112/// │ │ │ partitions
113/// ┌───────┘ │ └───────┐
114/// │ │ │
115/// │ │ │
116/// ┌───────────┐ ┌───────────┐ ┌───────────┐
117/// │ │ │ │ │ │
118/// │ batch A1 │ │ batch A3 │ │ batch B3 │
119/// │ │ │ │ │ │
120/// ├───────────┤ ├───────────┤ ├───────────┤
121/// │ │ │ │ │ │
122/// │ batch B2 │ │ batch B1 │ │ batch A2 │
123/// │ │ │ │ │ │
124/// └───────────┘ └───────────┘ └───────────┘
125/// ▲ ▲ ▲
126/// │ │ │
127/// └─────────┐ │ ┌──────────┘
128/// │ │ │
129/// │ │ │
130/// ┌─────────────────────────────────┐ batches are
131/// │ RepartitionExec(3) │ repartitioned
132/// │ RoundRobin │
133/// │ │
134/// └─────────────────────────────────┘
135/// ▲ ▲
136/// │ │
137/// ┌─────┘ └─────┐
138/// │ │
139/// │ │
140/// │ │
141/// ┌───────────┐ ┌───────────┐
142/// │ │ │ │
143/// │ batch A1 │ │ batch B1 │
144/// │ │ │ │
145/// ├───────────┤ ├───────────┤
146/// │ │ │ │
147/// │ batch A2 │ │ batch B2 │
148/// │ │ │ │
149/// ├───────────┤ ├───────────┤
150/// │ │ │ │
151/// │ batch A3 │ │ batch B3 │
152/// │ │ │ │
153/// └───────────┘ └───────────┘
154///
155///
156/// Input Input
157/// A B
158/// ```
159///
160/// The `EnforceDistribution` rule
161/// - is idempotent; i.e. it can be applied multiple times, each time producing
162/// the same result.
163/// - always produces a valid plan in terms of distribution requirements. Its
164/// input plan can be valid or invalid with respect to distribution requirements,
165/// but the output plan will always be valid.
166/// - produces a valid plan in terms of ordering requirements, *if* its input is
167/// a valid plan in terms of ordering requirements. If the input plan is invalid,
168/// this rule does not attempt to fix it as doing so is the responsibility of the
169/// `EnforceSorting` rule.
170///
171/// Note that distribution requirements are met in the strictest way. This may
172/// result in more than strictly necessary [`RepartitionExec`]s in the plan, but
173/// meeting the requirements in the strictest way may help avoid possible data
174/// skew in joins.
175///
176/// For example for a hash join with keys (a, b, c), the required Distribution(a, b, c)
177/// can be satisfied by several alternative partitioning ways: (a, b, c), (a, b),
178/// (a, c), (b, c), (a), (b), (c) and ( ).
179///
180/// This rule only chooses the exact match and satisfies the Distribution(a, b, c)
181/// by a HashPartition(a, b, c).
182#[derive(Default, Debug)]
183pub struct EnforceDistribution {}
184
185impl EnforceDistribution {
186 #[allow(missing_docs)]
187 pub fn new() -> Self {
188 Self {}
189 }
190}
191
192impl PhysicalOptimizerRule for EnforceDistribution {
193 fn optimize(
194 &self,
195 plan: Arc<dyn ExecutionPlan>,
196 config: &ConfigOptions,
197 ) -> Result<Arc<dyn ExecutionPlan>> {
198 let top_down_join_key_reordering = config.optimizer.top_down_join_key_reordering;
199
200 let adjusted = if top_down_join_key_reordering {
201 // Run a top-down process to adjust input key ordering recursively
202 let plan_requirements = PlanWithKeyRequirements::new_default(plan);
203 let adjusted = plan_requirements
204 .transform_down(adjust_input_keys_ordering)
205 .data()?;
206 adjusted.plan
207 } else {
208 // Run a bottom-up process
209 plan.transform_up(|plan| {
210 Ok(Transformed::yes(reorder_join_keys_to_inputs(plan)?))
211 })
212 .data()?
213 };
214
215 let distribution_context = DistributionContext::new_default(adjusted);
216 // Distribution enforcement needs to be applied bottom-up.
217 let distribution_context = distribution_context
218 .transform_up(|distribution_context| {
219 ensure_distribution(distribution_context, config)
220 })
221 .data()?;
222 Ok(distribution_context.plan)
223 }
224
225 fn name(&self) -> &str {
226 "EnforceDistribution"
227 }
228
229 fn schema_check(&self) -> bool {
230 true
231 }
232}
233
234#[derive(Debug, Clone)]
235struct JoinKeyPairs {
236 left_keys: Vec<Arc<dyn PhysicalExpr>>,
237 right_keys: Vec<Arc<dyn PhysicalExpr>>,
238}
239
240/// Keeps track of parent required key orderings.
241pub type PlanWithKeyRequirements = PlanContext<Vec<Arc<dyn PhysicalExpr>>>;
242
243/// When the physical planner creates the Joins, the ordering of join keys is from the original query.
244/// That might not match with the output partitioning of the join node's children
245/// A Top-Down process will use this method to adjust children's output partitioning based on the parent key reordering requirements:
246///
247/// Example:
248/// TopJoin on (a, b, c)
249/// bottom left join on(b, a, c)
250/// bottom right join on(c, b, a)
251///
252/// Will be adjusted to:
253/// TopJoin on (a, b, c)
254/// bottom left join on(a, b, c)
255/// bottom right join on(a, b, c)
256///
257/// Example:
258/// TopJoin on (a, b, c)
259/// Agg1 group by (b, a, c)
260/// Agg2 group by (c, b, a)
261///
262/// Will be adjusted to:
263/// TopJoin on (a, b, c)
264/// Projection(b, a, c)
265/// Agg1 group by (a, b, c)
266/// Projection(c, b, a)
267/// Agg2 group by (a, b, c)
268///
269/// Following is the explanation of the reordering process:
270///
271/// 1) If the current plan is Partitioned HashJoin, SortMergeJoin, check whether the requirements can be satisfied by adjusting join keys ordering:
272/// Requirements can not be satisfied, clear the current requirements, generate new requirements(to pushdown) based on the current join keys, return the unchanged plan.
273/// Requirements is already satisfied, clear the current requirements, generate new requirements(to pushdown) based on the current join keys, return the unchanged plan.
274/// Requirements can be satisfied by adjusting keys ordering, clear the current requirements, generate new requirements(to pushdown) based on the adjusted join keys, return the changed plan.
275///
276/// 2) If the current plan is Aggregation, check whether the requirements can be satisfied by adjusting group by keys ordering:
277/// Requirements can not be satisfied, clear all the requirements, return the unchanged plan.
278/// Requirements is already satisfied, clear all the requirements, return the unchanged plan.
279/// Requirements can be satisfied by adjusting keys ordering, clear all the requirements, return the changed plan.
280///
281/// 3) If the current plan is RepartitionExec, CoalescePartitionsExec or WindowAggExec, clear all the requirements, return the unchanged plan
282/// 4) If the current plan is Projection, transform the requirements to the columns before the Projection and push down requirements
283/// 5) For other types of operators, by default, pushdown the parent requirements to children.
284pub fn adjust_input_keys_ordering(
285 mut requirements: PlanWithKeyRequirements,
286) -> Result<Transformed<PlanWithKeyRequirements>> {
287 let plan = Arc::clone(&requirements.plan);
288
289 if let Some(HashJoinExec {
290 left,
291 right,
292 on,
293 filter,
294 join_type,
295 projection,
296 mode,
297 null_equality,
298 ..
299 }) = plan.as_any().downcast_ref::<HashJoinExec>()
300 {
301 match mode {
302 PartitionMode::Partitioned => {
303 let join_constructor = |new_conditions: (
304 Vec<(PhysicalExprRef, PhysicalExprRef)>,
305 Vec<SortOptions>,
306 )| {
307 HashJoinExec::try_new(
308 Arc::clone(left),
309 Arc::clone(right),
310 new_conditions.0,
311 filter.clone(),
312 join_type,
313 // TODO: although projection is not used in the join here, because projection pushdown is after enforce_distribution. Maybe we need to handle it later. Same as filter.
314 projection.clone(),
315 PartitionMode::Partitioned,
316 *null_equality,
317 )
318 .map(|e| Arc::new(e) as _)
319 };
320 return reorder_partitioned_join_keys(
321 requirements,
322 on,
323 &[],
324 &join_constructor,
325 )
326 .map(Transformed::yes);
327 }
328 PartitionMode::CollectLeft => {
329 // Push down requirements to the right side
330 requirements.children[1].data = match join_type {
331 JoinType::Inner | JoinType::Right => shift_right_required(
332 &requirements.data,
333 left.schema().fields().len(),
334 )
335 .unwrap_or_default(),
336 JoinType::RightSemi | JoinType::RightAnti | JoinType::RightMark => {
337 requirements.data.clone()
338 }
339 JoinType::Left
340 | JoinType::LeftSemi
341 | JoinType::LeftAnti
342 | JoinType::Full
343 | JoinType::LeftMark => vec![],
344 };
345 }
346 PartitionMode::Auto => {
347 // Can not satisfy, clear the current requirements and generate new empty requirements
348 requirements.data.clear();
349 }
350 }
351 } else if let Some(CrossJoinExec { left, .. }) =
352 plan.as_any().downcast_ref::<CrossJoinExec>()
353 {
354 let left_columns_len = left.schema().fields().len();
355 // Push down requirements to the right side
356 requirements.children[1].data =
357 shift_right_required(&requirements.data, left_columns_len)
358 .unwrap_or_default();
359 } else if let Some(SortMergeJoinExec {
360 left,
361 right,
362 on,
363 filter,
364 join_type,
365 sort_options,
366 null_equality,
367 ..
368 }) = plan.as_any().downcast_ref::<SortMergeJoinExec>()
369 {
370 let join_constructor = |new_conditions: (
371 Vec<(PhysicalExprRef, PhysicalExprRef)>,
372 Vec<SortOptions>,
373 )| {
374 SortMergeJoinExec::try_new(
375 Arc::clone(left),
376 Arc::clone(right),
377 new_conditions.0,
378 filter.clone(),
379 *join_type,
380 new_conditions.1,
381 *null_equality,
382 )
383 .map(|e| Arc::new(e) as _)
384 };
385 return reorder_partitioned_join_keys(
386 requirements,
387 on,
388 sort_options,
389 &join_constructor,
390 )
391 .map(Transformed::yes);
392 } else if let Some(aggregate_exec) = plan.as_any().downcast_ref::<AggregateExec>() {
393 if !requirements.data.is_empty() {
394 if aggregate_exec.mode() == &AggregateMode::FinalPartitioned {
395 return reorder_aggregate_keys(requirements, aggregate_exec)
396 .map(Transformed::yes);
397 } else {
398 requirements.data.clear();
399 }
400 } else {
401 // Keep everything unchanged
402 return Ok(Transformed::no(requirements));
403 }
404 } else if let Some(proj) = plan.as_any().downcast_ref::<ProjectionExec>() {
405 let expr = proj.expr();
406 // For Projection, we need to transform the requirements to the columns before the Projection
407 // And then to push down the requirements
408 // Construct a mapping from new name to the original Column
409 let proj_exprs: Vec<_> = expr
410 .iter()
411 .map(|p| (Arc::clone(&p.expr), p.alias.clone()))
412 .collect();
413 let new_required = map_columns_before_projection(&requirements.data, &proj_exprs);
414 if new_required.len() == requirements.data.len() {
415 requirements.children[0].data = new_required;
416 } else {
417 // Can not satisfy, clear the current requirements and generate new empty requirements
418 requirements.data.clear();
419 }
420 } else if plan.as_any().downcast_ref::<RepartitionExec>().is_some()
421 || plan
422 .as_any()
423 .downcast_ref::<CoalescePartitionsExec>()
424 .is_some()
425 || plan.as_any().downcast_ref::<WindowAggExec>().is_some()
426 {
427 requirements.data.clear();
428 } else {
429 // By default, push down the parent requirements to children
430 for child in requirements.children.iter_mut() {
431 child.data.clone_from(&requirements.data);
432 }
433 }
434 Ok(Transformed::yes(requirements))
435}
436
437pub fn reorder_partitioned_join_keys<F>(
438 mut join_plan: PlanWithKeyRequirements,
439 on: &[(PhysicalExprRef, PhysicalExprRef)],
440 sort_options: &[SortOptions],
441 join_constructor: &F,
442) -> Result<PlanWithKeyRequirements>
443where
444 F: Fn(
445 (Vec<(PhysicalExprRef, PhysicalExprRef)>, Vec<SortOptions>),
446 ) -> Result<Arc<dyn ExecutionPlan>>,
447{
448 let parent_required = &join_plan.data;
449 let join_key_pairs = extract_join_keys(on);
450 let eq_properties = join_plan.plan.equivalence_properties();
451
452 let (
453 JoinKeyPairs {
454 left_keys,
455 right_keys,
456 },
457 positions,
458 ) = try_reorder(join_key_pairs, parent_required, eq_properties);
459
460 if let Some(positions) = positions {
461 if !positions.is_empty() {
462 let new_join_on = new_join_conditions(&left_keys, &right_keys);
463 let new_sort_options = (0..sort_options.len())
464 .map(|idx| sort_options[positions[idx]])
465 .collect();
466 join_plan.plan = join_constructor((new_join_on, new_sort_options))?;
467 }
468 }
469
470 join_plan.children[0].data = left_keys;
471 join_plan.children[1].data = right_keys;
472 Ok(join_plan)
473}
474
475pub fn reorder_aggregate_keys(
476 mut agg_node: PlanWithKeyRequirements,
477 agg_exec: &AggregateExec,
478) -> Result<PlanWithKeyRequirements> {
479 let parent_required = &agg_node.data;
480 let output_columns = agg_exec
481 .group_expr()
482 .expr()
483 .iter()
484 .enumerate()
485 .map(|(index, (_, name))| Column::new(name, index))
486 .collect::<Vec<_>>();
487
488 let output_exprs = output_columns
489 .iter()
490 .map(|c| Arc::new(c.clone()) as _)
491 .collect::<Vec<_>>();
492
493 if parent_required.len() == output_exprs.len()
494 && agg_exec.group_expr().null_expr().is_empty()
495 && !physical_exprs_equal(&output_exprs, parent_required)
496 {
497 if let Some(positions) = expected_expr_positions(&output_exprs, parent_required) {
498 if let Some(agg_exec) =
499 agg_exec.input().as_any().downcast_ref::<AggregateExec>()
500 {
501 if matches!(agg_exec.mode(), &AggregateMode::Partial) {
502 let group_exprs = agg_exec.group_expr().expr();
503 let new_group_exprs = positions
504 .into_iter()
505 .map(|idx| group_exprs[idx].clone())
506 .collect();
507 let partial_agg = Arc::new(AggregateExec::try_new(
508 AggregateMode::Partial,
509 PhysicalGroupBy::new_single(new_group_exprs),
510 agg_exec.aggr_expr().to_vec(),
511 agg_exec.filter_expr().to_vec(),
512 Arc::clone(agg_exec.input()),
513 Arc::clone(&agg_exec.input_schema),
514 )?);
515 // Build new group expressions that correspond to the output
516 // of the "reordered" aggregator:
517 let group_exprs = partial_agg.group_expr().expr();
518 let new_group_by = PhysicalGroupBy::new_single(
519 partial_agg
520 .output_group_expr()
521 .into_iter()
522 .enumerate()
523 .map(|(idx, expr)| (expr, group_exprs[idx].1.clone()))
524 .collect(),
525 );
526 let new_final_agg = Arc::new(AggregateExec::try_new(
527 AggregateMode::FinalPartitioned,
528 new_group_by,
529 agg_exec.aggr_expr().to_vec(),
530 agg_exec.filter_expr().to_vec(),
531 Arc::clone(&partial_agg) as _,
532 agg_exec.input_schema(),
533 )?);
534
535 agg_node.plan = Arc::clone(&new_final_agg) as _;
536 agg_node.data.clear();
537 agg_node.children = vec![PlanWithKeyRequirements::new(
538 partial_agg as _,
539 vec![],
540 agg_node.children.swap_remove(0).children,
541 )];
542
543 // Need to create a new projection to change the expr ordering back
544 let agg_schema = new_final_agg.schema();
545 let mut proj_exprs = output_columns
546 .iter()
547 .map(|col| {
548 let name = col.name();
549 let index = agg_schema.index_of(name)?;
550 Ok(ProjectionExpr {
551 expr: Arc::new(Column::new(name, index)) as _,
552 alias: name.to_owned(),
553 })
554 })
555 .collect::<Result<Vec<_>>>()?;
556 let agg_fields = agg_schema.fields();
557 for (idx, field) in
558 agg_fields.iter().enumerate().skip(output_columns.len())
559 {
560 let name = field.name();
561 let plan = Arc::new(Column::new(name, idx)) as _;
562 proj_exprs.push(ProjectionExpr {
563 expr: plan,
564 alias: name.clone(),
565 })
566 }
567 return ProjectionExec::try_new(proj_exprs, new_final_agg).map(|p| {
568 PlanWithKeyRequirements::new(Arc::new(p), vec![], vec![agg_node])
569 });
570 }
571 }
572 }
573 }
574 Ok(agg_node)
575}
576
577fn shift_right_required(
578 parent_required: &[Arc<dyn PhysicalExpr>],
579 left_columns_len: usize,
580) -> Option<Vec<Arc<dyn PhysicalExpr>>> {
581 let new_right_required = parent_required
582 .iter()
583 .filter_map(|r| {
584 r.as_any().downcast_ref::<Column>().and_then(|col| {
585 col.index()
586 .checked_sub(left_columns_len)
587 .map(|index| Arc::new(Column::new(col.name(), index)) as _)
588 })
589 })
590 .collect::<Vec<_>>();
591
592 // if the parent required are all coming from the right side, the requirements can be pushdown
593 (new_right_required.len() == parent_required.len()).then_some(new_right_required)
594}
595
596/// When the physical planner creates the Joins, the ordering of join keys is from the original query.
597/// That might not match with the output partitioning of the join node's children
598/// This method will try to change the ordering of the join keys to match with the
599/// partitioning of the join nodes' children. If it can not match with both sides, it will try to
600/// match with one, either the left side or the right side.
601///
602/// Example:
603/// TopJoin on (a, b, c)
604/// bottom left join on(b, a, c)
605/// bottom right join on(c, b, a)
606///
607/// Will be adjusted to:
608/// TopJoin on (b, a, c)
609/// bottom left join on(b, a, c)
610/// bottom right join on(c, b, a)
611///
612/// Compared to the Top-Down reordering process, this Bottom-Up approach is much simpler, but might not reach a best result.
613/// The Bottom-Up approach will be useful in future if we plan to support storage partition-wised Joins.
614/// In that case, the datasources/tables might be pre-partitioned and we can't adjust the key ordering of the datasources
615/// and then can't apply the Top-Down reordering process.
616pub fn reorder_join_keys_to_inputs(
617 plan: Arc<dyn ExecutionPlan>,
618) -> Result<Arc<dyn ExecutionPlan>> {
619 let plan_any = plan.as_any();
620 if let Some(HashJoinExec {
621 left,
622 right,
623 on,
624 filter,
625 join_type,
626 projection,
627 mode,
628 null_equality,
629 ..
630 }) = plan_any.downcast_ref::<HashJoinExec>()
631 {
632 if matches!(mode, PartitionMode::Partitioned) {
633 let (join_keys, positions) = reorder_current_join_keys(
634 extract_join_keys(on),
635 Some(left.output_partitioning()),
636 Some(right.output_partitioning()),
637 left.equivalence_properties(),
638 right.equivalence_properties(),
639 );
640 if positions.is_some_and(|idxs| !idxs.is_empty()) {
641 let JoinKeyPairs {
642 left_keys,
643 right_keys,
644 } = join_keys;
645 let new_join_on = new_join_conditions(&left_keys, &right_keys);
646 return Ok(Arc::new(HashJoinExec::try_new(
647 Arc::clone(left),
648 Arc::clone(right),
649 new_join_on,
650 filter.clone(),
651 join_type,
652 projection.clone(),
653 PartitionMode::Partitioned,
654 *null_equality,
655 )?));
656 }
657 }
658 } else if let Some(SortMergeJoinExec {
659 left,
660 right,
661 on,
662 filter,
663 join_type,
664 sort_options,
665 null_equality,
666 ..
667 }) = plan_any.downcast_ref::<SortMergeJoinExec>()
668 {
669 let (join_keys, positions) = reorder_current_join_keys(
670 extract_join_keys(on),
671 Some(left.output_partitioning()),
672 Some(right.output_partitioning()),
673 left.equivalence_properties(),
674 right.equivalence_properties(),
675 );
676 if let Some(positions) = positions {
677 if !positions.is_empty() {
678 let JoinKeyPairs {
679 left_keys,
680 right_keys,
681 } = join_keys;
682 let new_join_on = new_join_conditions(&left_keys, &right_keys);
683 let new_sort_options = (0..sort_options.len())
684 .map(|idx| sort_options[positions[idx]])
685 .collect();
686 return SortMergeJoinExec::try_new(
687 Arc::clone(left),
688 Arc::clone(right),
689 new_join_on,
690 filter.clone(),
691 *join_type,
692 new_sort_options,
693 *null_equality,
694 )
695 .map(|smj| Arc::new(smj) as _);
696 }
697 }
698 }
699 Ok(plan)
700}
701
702/// Reorder the current join keys ordering based on either left partition or right partition
703fn reorder_current_join_keys(
704 join_keys: JoinKeyPairs,
705 left_partition: Option<&Partitioning>,
706 right_partition: Option<&Partitioning>,
707 left_equivalence_properties: &EquivalenceProperties,
708 right_equivalence_properties: &EquivalenceProperties,
709) -> (JoinKeyPairs, Option<Vec<usize>>) {
710 match (left_partition, right_partition) {
711 (Some(Partitioning::Hash(left_exprs, _)), _) => {
712 match try_reorder(join_keys, left_exprs, left_equivalence_properties) {
713 (join_keys, None) => reorder_current_join_keys(
714 join_keys,
715 None,
716 right_partition,
717 left_equivalence_properties,
718 right_equivalence_properties,
719 ),
720 result => result,
721 }
722 }
723 (_, Some(Partitioning::Hash(right_exprs, _))) => {
724 try_reorder(join_keys, right_exprs, right_equivalence_properties)
725 }
726 _ => (join_keys, None),
727 }
728}
729
730fn try_reorder(
731 join_keys: JoinKeyPairs,
732 expected: &[Arc<dyn PhysicalExpr>],
733 equivalence_properties: &EquivalenceProperties,
734) -> (JoinKeyPairs, Option<Vec<usize>>) {
735 let eq_groups = equivalence_properties.eq_group();
736 let mut normalized_expected = vec![];
737 let mut normalized_left_keys = vec![];
738 let mut normalized_right_keys = vec![];
739 if join_keys.left_keys.len() != expected.len() {
740 return (join_keys, None);
741 }
742 if physical_exprs_equal(expected, &join_keys.left_keys)
743 || physical_exprs_equal(expected, &join_keys.right_keys)
744 {
745 return (join_keys, Some(vec![]));
746 } else if !equivalence_properties.eq_group().is_empty() {
747 normalized_expected = expected
748 .iter()
749 .map(|e| eq_groups.normalize_expr(Arc::clone(e)))
750 .collect();
751
752 normalized_left_keys = join_keys
753 .left_keys
754 .iter()
755 .map(|e| eq_groups.normalize_expr(Arc::clone(e)))
756 .collect();
757
758 normalized_right_keys = join_keys
759 .right_keys
760 .iter()
761 .map(|e| eq_groups.normalize_expr(Arc::clone(e)))
762 .collect();
763
764 if physical_exprs_equal(&normalized_expected, &normalized_left_keys)
765 || physical_exprs_equal(&normalized_expected, &normalized_right_keys)
766 {
767 return (join_keys, Some(vec![]));
768 }
769 }
770
771 let Some(positions) = expected_expr_positions(&join_keys.left_keys, expected)
772 .or_else(|| expected_expr_positions(&join_keys.right_keys, expected))
773 .or_else(|| expected_expr_positions(&normalized_left_keys, &normalized_expected))
774 .or_else(|| {
775 expected_expr_positions(&normalized_right_keys, &normalized_expected)
776 })
777 else {
778 return (join_keys, None);
779 };
780
781 let mut new_left_keys = vec![];
782 let mut new_right_keys = vec![];
783 for pos in positions.iter() {
784 new_left_keys.push(Arc::clone(&join_keys.left_keys[*pos]));
785 new_right_keys.push(Arc::clone(&join_keys.right_keys[*pos]));
786 }
787 let pairs = JoinKeyPairs {
788 left_keys: new_left_keys,
789 right_keys: new_right_keys,
790 };
791
792 (pairs, Some(positions))
793}
794
795/// Return the expected expressions positions.
796/// For example, the current expressions are ['c', 'a', 'a', b'], the expected expressions are ['b', 'c', 'a', 'a'],
797///
798/// This method will return a Vec [3, 0, 1, 2]
799fn expected_expr_positions(
800 current: &[Arc<dyn PhysicalExpr>],
801 expected: &[Arc<dyn PhysicalExpr>],
802) -> Option<Vec<usize>> {
803 if current.is_empty() || expected.is_empty() {
804 return None;
805 }
806 let mut indexes: Vec<usize> = vec![];
807 let mut current = current.to_vec();
808 for expr in expected.iter() {
809 // Find the position of the expected expr in the current expressions
810 if let Some(expected_position) = current.iter().position(|e| e.eq(expr)) {
811 current[expected_position] = Arc::new(NoOp::new());
812 indexes.push(expected_position);
813 } else {
814 return None;
815 }
816 }
817 Some(indexes)
818}
819
820fn extract_join_keys(on: &[(PhysicalExprRef, PhysicalExprRef)]) -> JoinKeyPairs {
821 let (left_keys, right_keys) = on
822 .iter()
823 .map(|(l, r)| (Arc::clone(l) as _, Arc::clone(r) as _))
824 .unzip();
825 JoinKeyPairs {
826 left_keys,
827 right_keys,
828 }
829}
830
831fn new_join_conditions(
832 new_left_keys: &[Arc<dyn PhysicalExpr>],
833 new_right_keys: &[Arc<dyn PhysicalExpr>],
834) -> Vec<(PhysicalExprRef, PhysicalExprRef)> {
835 new_left_keys
836 .iter()
837 .zip(new_right_keys.iter())
838 .map(|(l_key, r_key)| (Arc::clone(l_key), Arc::clone(r_key)))
839 .collect()
840}
841
842/// Adds RoundRobin repartition operator to the plan increase parallelism.
843///
844/// # Arguments
845///
846/// * `input`: Current node.
847/// * `n_target`: desired target partition number, if partition number of the
848/// current executor is less than this value. Partition number will be increased.
849///
850/// # Returns
851///
852/// A [`Result`] object that contains new execution plan where the desired
853/// partition number is achieved by adding a RoundRobin repartition.
854fn add_roundrobin_on_top(
855 input: DistributionContext,
856 n_target: usize,
857) -> Result<DistributionContext> {
858 // Adding repartition is helpful:
859 if input.plan.output_partitioning().partition_count() < n_target {
860 // When there is an existing ordering, we preserve ordering
861 // during repartition. This will be un-done in the future
862 // If any of the following conditions is true
863 // - Preserving ordering is not helpful in terms of satisfying ordering requirements
864 // - Usage of order preserving variants is not desirable
865 // (determined by flag `config.optimizer.prefer_existing_sort`)
866 let partitioning = Partitioning::RoundRobinBatch(n_target);
867 let repartition =
868 RepartitionExec::try_new(Arc::clone(&input.plan), partitioning)?
869 .with_preserve_order();
870
871 let new_plan = Arc::new(repartition) as _;
872
873 Ok(DistributionContext::new(new_plan, true, vec![input]))
874 } else {
875 // Partition is not helpful, we already have desired number of partitions.
876 Ok(input)
877 }
878}
879
880/// Adds a hash repartition operator:
881/// - to increase parallelism, and/or
882/// - to satisfy requirements of the subsequent operators.
883///
884/// Repartition(Hash) is added on top of operator `input`.
885///
886/// # Arguments
887///
888/// * `input`: Current node.
889/// * `hash_exprs`: Stores Physical Exprs that are used during hashing.
890/// * `n_target`: desired target partition number, if partition number of the
891/// current executor is less than this value. Partition number will be increased.
892///
893/// # Returns
894///
895/// A [`Result`] object that contains new execution plan where the desired
896/// distribution is satisfied by adding a Hash repartition.
897fn add_hash_on_top(
898 input: DistributionContext,
899 hash_exprs: Vec<Arc<dyn PhysicalExpr>>,
900 n_target: usize,
901) -> Result<DistributionContext> {
902 // Early return if hash repartition is unnecessary
903 // `RepartitionExec: partitioning=Hash([...], 1), input_partitions=1` is unnecessary.
904 if n_target == 1 && input.plan.output_partitioning().partition_count() == 1 {
905 return Ok(input);
906 }
907
908 let dist = Distribution::HashPartitioned(hash_exprs);
909 let satisfied = input
910 .plan
911 .output_partitioning()
912 .satisfy(&dist, input.plan.equivalence_properties());
913
914 // Add hash repartitioning when:
915 // - The hash distribution requirement is not satisfied, or
916 // - We can increase parallelism by adding hash partitioning.
917 if !satisfied || n_target > input.plan.output_partitioning().partition_count() {
918 // When there is an existing ordering, we preserve ordering during
919 // repartition. This will be rolled back in the future if any of the
920 // following conditions is true:
921 // - Preserving ordering is not helpful in terms of satisfying ordering
922 // requirements.
923 // - Usage of order preserving variants is not desirable (per the flag
924 // `config.optimizer.prefer_existing_sort`).
925 let partitioning = dist.create_partitioning(n_target);
926 let repartition =
927 RepartitionExec::try_new(Arc::clone(&input.plan), partitioning)?
928 .with_preserve_order();
929 let plan = Arc::new(repartition) as _;
930
931 return Ok(DistributionContext::new(plan, true, vec![input]));
932 }
933
934 Ok(input)
935}
936
937/// Adds a [`SortPreservingMergeExec`] or a [`CoalescePartitionsExec`] operator
938/// on top of the given plan node to satisfy a single partition requirement
939/// while preserving ordering constraints.
940///
941/// # Parameters
942///
943/// * `input`: Current node.
944///
945/// # Returns
946///
947/// Updated node with an execution plan, where the desired single distribution
948/// requirement is satisfied.
949fn add_merge_on_top(input: DistributionContext) -> DistributionContext {
950 // Apply only when the partition count is larger than one.
951 if input.plan.output_partitioning().partition_count() > 1 {
952 // When there is an existing ordering, we preserve ordering
953 // when decreasing partitions. This will be un-done in the future
954 // if any of the following conditions is true
955 // - Preserving ordering is not helpful in terms of satisfying ordering requirements
956 // - Usage of order preserving variants is not desirable
957 // (determined by flag `config.optimizer.prefer_existing_sort`)
958 let new_plan = if let Some(req) = input.plan.output_ordering() {
959 Arc::new(SortPreservingMergeExec::new(
960 req.clone(),
961 Arc::clone(&input.plan),
962 )) as _
963 } else {
964 // If there is no input order, we can simply coalesce partitions:
965 Arc::new(CoalescePartitionsExec::new(Arc::clone(&input.plan))) as _
966 };
967
968 DistributionContext::new(new_plan, true, vec![input])
969 } else {
970 input
971 }
972}
973
974/// Updates the physical plan inside [`DistributionContext`] so that distribution
975/// changing operators are removed from the top. If they are necessary, they will
976/// be added in subsequent stages.
977///
978/// Assume that following plan is given:
979/// ```text
980/// "RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
981/// " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
982/// " DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
983/// ```
984///
985/// Since `RepartitionExec`s change the distribution, this function removes
986/// them and returns following plan:
987///
988/// ```text
989/// "DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
990/// ```
991fn remove_dist_changing_operators(
992 mut distribution_context: DistributionContext,
993) -> Result<DistributionContext> {
994 while is_repartition(&distribution_context.plan)
995 || is_coalesce_partitions(&distribution_context.plan)
996 || is_sort_preserving_merge(&distribution_context.plan)
997 {
998 // All of above operators have a single child. First child is only child.
999 // Remove any distribution changing operators at the beginning:
1000 distribution_context = distribution_context.children.swap_remove(0);
1001 // Note that they will be re-inserted later on if necessary or helpful.
1002 }
1003
1004 Ok(distribution_context)
1005}
1006
1007/// Updates the [`DistributionContext`] if preserving ordering while changing partitioning is not helpful or desirable.
1008///
1009/// Assume that following plan is given:
1010/// ```text
1011/// "SortPreservingMergeExec: \[a@0 ASC]"
1012/// " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10, preserve_order=true",
1013/// " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2, preserve_order=true",
1014/// " DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
1015/// ```
1016///
1017/// This function converts plan above to the following:
1018///
1019/// ```text
1020/// "CoalescePartitionsExec"
1021/// " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=10",
1022/// " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
1023/// " DataSourceExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC], file_type=parquet",
1024/// ```
1025pub fn replace_order_preserving_variants(
1026 mut context: DistributionContext,
1027) -> Result<DistributionContext> {
1028 context.children = context
1029 .children
1030 .into_iter()
1031 .map(|child| {
1032 if child.data {
1033 replace_order_preserving_variants(child)
1034 } else {
1035 Ok(child)
1036 }
1037 })
1038 .collect::<Result<Vec<_>>>()?;
1039
1040 if is_sort_preserving_merge(&context.plan) {
1041 let child_plan = Arc::clone(&context.children[0].plan);
1042 context.plan = Arc::new(
1043 CoalescePartitionsExec::new(child_plan).with_fetch(context.plan.fetch()),
1044 );
1045 return Ok(context);
1046 } else if let Some(repartition) =
1047 context.plan.as_any().downcast_ref::<RepartitionExec>()
1048 {
1049 if repartition.preserve_order() {
1050 context.plan = Arc::new(RepartitionExec::try_new(
1051 Arc::clone(&context.children[0].plan),
1052 repartition.partitioning().clone(),
1053 )?);
1054 return Ok(context);
1055 }
1056 }
1057
1058 context.update_plan_from_children()
1059}
1060
1061/// A struct to keep track of repartition requirements for each child node.
1062struct RepartitionRequirementStatus {
1063 /// The distribution requirement for the node.
1064 requirement: Distribution,
1065 /// Designates whether round robin partitioning is theoretically beneficial;
1066 /// i.e. the operator can actually utilize parallelism.
1067 roundrobin_beneficial: bool,
1068 /// Designates whether round robin partitioning is beneficial according to
1069 /// the statistical information we have on the number of rows.
1070 roundrobin_beneficial_stats: bool,
1071 /// Designates whether hash partitioning is necessary.
1072 hash_necessary: bool,
1073}
1074
1075/// Calculates the `RepartitionRequirementStatus` for each children to generate
1076/// consistent and sensible (in terms of performance) distribution requirements.
1077/// As an example, a hash join's left (build) child might produce
1078///
1079/// ```text
1080/// RepartitionRequirementStatus {
1081/// ..,
1082/// hash_necessary: true
1083/// }
1084/// ```
1085///
1086/// while its right (probe) child might have very few rows and produce:
1087///
1088/// ```text
1089/// RepartitionRequirementStatus {
1090/// ..,
1091/// hash_necessary: false
1092/// }
1093/// ```
1094///
1095/// These statuses are not consistent as all children should agree on hash
1096/// partitioning. This function aligns the statuses to generate consistent
1097/// hash partitions for each children. After alignment, the right child's
1098/// status would turn into:
1099///
1100/// ```text
1101/// RepartitionRequirementStatus {
1102/// ..,
1103/// hash_necessary: true
1104/// }
1105/// ```
1106fn get_repartition_requirement_status(
1107 plan: &Arc<dyn ExecutionPlan>,
1108 batch_size: usize,
1109 should_use_estimates: bool,
1110) -> Result<Vec<RepartitionRequirementStatus>> {
1111 let mut needs_alignment = false;
1112 let children = plan.children();
1113 let rr_beneficial = plan.benefits_from_input_partitioning();
1114 let requirements = plan.required_input_distribution();
1115 let mut repartition_status_flags = vec![];
1116 for (child, requirement, roundrobin_beneficial) in
1117 izip!(children.into_iter(), requirements, rr_beneficial)
1118 {
1119 // Decide whether adding a round robin is beneficial depending on
1120 // the statistical information we have on the number of rows:
1121 let roundrobin_beneficial_stats = match child.partition_statistics(None)?.num_rows
1122 {
1123 Precision::Exact(n_rows) => n_rows > batch_size,
1124 Precision::Inexact(n_rows) => !should_use_estimates || (n_rows > batch_size),
1125 Precision::Absent => true,
1126 };
1127 let is_hash = matches!(requirement, Distribution::HashPartitioned(_));
1128 // Hash re-partitioning is necessary when the input has more than one
1129 // partitions:
1130 let multi_partitions = child.output_partitioning().partition_count() > 1;
1131 let roundrobin_sensible = roundrobin_beneficial && roundrobin_beneficial_stats;
1132 needs_alignment |= is_hash && (multi_partitions || roundrobin_sensible);
1133 repartition_status_flags.push((
1134 is_hash,
1135 RepartitionRequirementStatus {
1136 requirement,
1137 roundrobin_beneficial,
1138 roundrobin_beneficial_stats,
1139 hash_necessary: is_hash && multi_partitions,
1140 },
1141 ));
1142 }
1143 // Align hash necessary flags for hash partitions to generate consistent
1144 // hash partitions at each children:
1145 if needs_alignment {
1146 // When there is at least one hash requirement that is necessary or
1147 // beneficial according to statistics, make all children require hash
1148 // repartitioning:
1149 for (is_hash, status) in &mut repartition_status_flags {
1150 if *is_hash {
1151 status.hash_necessary = true;
1152 }
1153 }
1154 }
1155 Ok(repartition_status_flags
1156 .into_iter()
1157 .map(|(_, status)| status)
1158 .collect())
1159}
1160
1161/// This function checks whether we need to add additional data exchange
1162/// operators to satisfy distribution requirements. Since this function
1163/// takes care of such requirements, we should avoid manually adding data
1164/// exchange operators in other places.
1165///
1166/// This function is intended to be used in a bottom up traversal, as it
1167/// can first repartition (or newly partition) at the datasources -- these
1168/// source partitions may be later repartitioned with additional data exchange operators.
1169pub fn ensure_distribution(
1170 dist_context: DistributionContext,
1171 config: &ConfigOptions,
1172) -> Result<Transformed<DistributionContext>> {
1173 let dist_context = update_children(dist_context)?;
1174
1175 if dist_context.plan.children().is_empty() {
1176 return Ok(Transformed::no(dist_context));
1177 }
1178
1179 let target_partitions = config.execution.target_partitions;
1180 // When `false`, round robin repartition will not be added to increase parallelism
1181 let enable_round_robin = config.optimizer.enable_round_robin_repartition;
1182 let repartition_file_scans = config.optimizer.repartition_file_scans;
1183 let batch_size = config.execution.batch_size;
1184 let should_use_estimates = config
1185 .execution
1186 .use_row_number_estimates_to_optimize_partitioning;
1187 let unbounded_and_pipeline_friendly = dist_context.plan.boundedness().is_unbounded()
1188 && matches!(
1189 dist_context.plan.pipeline_behavior(),
1190 EmissionType::Incremental | EmissionType::Both
1191 );
1192 // Use order preserving variants either of the conditions true
1193 // - it is desired according to config
1194 // - when plan is unbounded
1195 // - when it is pipeline friendly (can incrementally produce results)
1196 let order_preserving_variants_desirable =
1197 unbounded_and_pipeline_friendly || config.optimizer.prefer_existing_sort;
1198
1199 // Remove unnecessary repartition from the physical plan if any
1200 let DistributionContext {
1201 mut plan,
1202 data,
1203 children,
1204 } = remove_dist_changing_operators(dist_context)?;
1205
1206 if let Some(exec) = plan.as_any().downcast_ref::<WindowAggExec>() {
1207 if let Some(updated_window) = get_best_fitting_window(
1208 exec.window_expr(),
1209 exec.input(),
1210 &exec.partition_keys(),
1211 )? {
1212 plan = updated_window;
1213 }
1214 } else if let Some(exec) = plan.as_any().downcast_ref::<BoundedWindowAggExec>() {
1215 if let Some(updated_window) = get_best_fitting_window(
1216 exec.window_expr(),
1217 exec.input(),
1218 &exec.partition_keys(),
1219 )? {
1220 plan = updated_window;
1221 }
1222 };
1223
1224 let repartition_status_flags =
1225 get_repartition_requirement_status(&plan, batch_size, should_use_estimates)?;
1226 // This loop iterates over all the children to:
1227 // - Increase parallelism for every child if it is beneficial.
1228 // - Satisfy the distribution requirements of every child, if it is not
1229 // already satisfied.
1230 // We store the updated children in `new_children`.
1231 let children = izip!(
1232 children.into_iter(),
1233 plan.required_input_ordering(),
1234 plan.maintains_input_order(),
1235 repartition_status_flags.into_iter()
1236 )
1237 .map(
1238 |(
1239 mut child,
1240 required_input_ordering,
1241 maintains,
1242 RepartitionRequirementStatus {
1243 requirement,
1244 roundrobin_beneficial,
1245 roundrobin_beneficial_stats,
1246 hash_necessary,
1247 },
1248 )| {
1249 let add_roundrobin = enable_round_robin
1250 // Operator benefits from partitioning (e.g. filter):
1251 && roundrobin_beneficial
1252 && roundrobin_beneficial_stats
1253 // Unless partitioning increases the partition count, it is not beneficial:
1254 && child.plan.output_partitioning().partition_count() < target_partitions;
1255
1256 // When `repartition_file_scans` is set, attempt to increase
1257 // parallelism at the source.
1258 //
1259 // If repartitioning is not possible (a.k.a. None is returned from `ExecutionPlan::repartitioned`)
1260 // then no repartitioning will have occurred. As the default implementation returns None, it is only
1261 // specific physical plan nodes, such as certain datasources, which are repartitioned.
1262 if repartition_file_scans && roundrobin_beneficial_stats {
1263 if let Some(new_child) =
1264 child.plan.repartitioned(target_partitions, config)?
1265 {
1266 child.plan = new_child;
1267 }
1268 }
1269
1270 // Satisfy the distribution requirement if it is unmet.
1271 match &requirement {
1272 Distribution::SinglePartition => {
1273 child = add_merge_on_top(child);
1274 }
1275 Distribution::HashPartitioned(exprs) => {
1276 if add_roundrobin {
1277 // Add round-robin repartitioning on top of the operator
1278 // to increase parallelism.
1279 child = add_roundrobin_on_top(child, target_partitions)?;
1280 }
1281 // When inserting hash is necessary to satisfy hash requirement, insert hash repartition.
1282 if hash_necessary {
1283 child =
1284 add_hash_on_top(child, exprs.to_vec(), target_partitions)?;
1285 }
1286 }
1287 Distribution::UnspecifiedDistribution => {
1288 if add_roundrobin {
1289 // Add round-robin repartitioning on top of the operator
1290 // to increase parallelism.
1291 child = add_roundrobin_on_top(child, target_partitions)?;
1292 }
1293 }
1294 };
1295
1296 // There is an ordering requirement of the operator:
1297 if let Some(required_input_ordering) = required_input_ordering {
1298 // Either:
1299 // - Ordering requirement cannot be satisfied by preserving ordering through repartitions, or
1300 // - using order preserving variant is not desirable.
1301 let sort_req = required_input_ordering.into_single();
1302 let ordering_satisfied = child
1303 .plan
1304 .equivalence_properties()
1305 .ordering_satisfy_requirement(sort_req.clone())?;
1306
1307 if (!ordering_satisfied || !order_preserving_variants_desirable)
1308 && child.data
1309 {
1310 child = replace_order_preserving_variants(child)?;
1311 // If ordering requirements were satisfied before repartitioning,
1312 // make sure ordering requirements are still satisfied after.
1313 if ordering_satisfied {
1314 // Make sure to satisfy ordering requirement:
1315 child = add_sort_above_with_check(
1316 child,
1317 sort_req,
1318 plan.as_any()
1319 .downcast_ref::<OutputRequirementExec>()
1320 .map(|output| output.fetch())
1321 .unwrap_or(None),
1322 )?;
1323 }
1324 }
1325 // Stop tracking distribution changing operators
1326 child.data = false;
1327 } else {
1328 // no ordering requirement
1329 match requirement {
1330 // Operator requires specific distribution.
1331 Distribution::SinglePartition | Distribution::HashPartitioned(_) => {
1332 // Since there is no ordering requirement, preserving ordering is pointless
1333 child = replace_order_preserving_variants(child)?;
1334 }
1335 Distribution::UnspecifiedDistribution => {
1336 // Since ordering is lost, trying to preserve ordering is pointless
1337 if !maintains || plan.as_any().is::<OutputRequirementExec>() {
1338 child = replace_order_preserving_variants(child)?;
1339 }
1340 }
1341 }
1342 }
1343 Ok(child)
1344 },
1345 )
1346 .collect::<Result<Vec<_>>>()?;
1347
1348 let children_plans = children
1349 .iter()
1350 .map(|c| Arc::clone(&c.plan))
1351 .collect::<Vec<_>>();
1352
1353 plan = if plan.as_any().is::<UnionExec>()
1354 && !config.optimizer.prefer_existing_union
1355 && can_interleave(children_plans.iter())
1356 {
1357 // Add a special case for [`UnionExec`] since we want to "bubble up"
1358 // hash-partitioned data. So instead of
1359 //
1360 // Agg:
1361 // Repartition (hash):
1362 // Union:
1363 // - Agg:
1364 // Repartition (hash):
1365 // Data
1366 // - Agg:
1367 // Repartition (hash):
1368 // Data
1369 //
1370 // we can use:
1371 //
1372 // Agg:
1373 // Interleave:
1374 // - Agg:
1375 // Repartition (hash):
1376 // Data
1377 // - Agg:
1378 // Repartition (hash):
1379 // Data
1380 Arc::new(InterleaveExec::try_new(children_plans)?)
1381 } else {
1382 plan.with_new_children(children_plans)?
1383 };
1384
1385 Ok(Transformed::yes(DistributionContext::new(
1386 plan, data, children,
1387 )))
1388}
1389
1390/// Keeps track of distribution changing operators (like `RepartitionExec`,
1391/// `SortPreservingMergeExec`, `CoalescePartitionsExec`) and their ancestors.
1392/// Using this information, we can optimize distribution of the plan if/when
1393/// necessary.
1394pub type DistributionContext = PlanContext<bool>;
1395
1396fn update_children(mut dist_context: DistributionContext) -> Result<DistributionContext> {
1397 for child_context in dist_context.children.iter_mut() {
1398 let child_plan_any = child_context.plan.as_any();
1399 child_context.data =
1400 if let Some(repartition) = child_plan_any.downcast_ref::<RepartitionExec>() {
1401 !matches!(
1402 repartition.partitioning(),
1403 Partitioning::UnknownPartitioning(_)
1404 )
1405 } else {
1406 child_plan_any.is::<SortPreservingMergeExec>()
1407 || child_plan_any.is::<CoalescePartitionsExec>()
1408 || child_context.plan.children().is_empty()
1409 || child_context.children[0].data
1410 || child_context
1411 .plan
1412 .required_input_distribution()
1413 .iter()
1414 .zip(child_context.children.iter())
1415 .any(|(required_dist, child_context)| {
1416 child_context.data
1417 && matches!(
1418 required_dist,
1419 Distribution::UnspecifiedDistribution
1420 )
1421 })
1422 }
1423 }
1424
1425 dist_context.data = false;
1426 Ok(dist_context)
1427}
1428
1429// See tests in datafusion/core/tests/physical_optimizer