datafusion_physical_optimizer/
utils.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use datafusion_common::Result;
21use datafusion_physical_expr::{LexOrdering, LexRequirement};
22use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
23use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
24use datafusion_physical_plan::repartition::RepartitionExec;
25use datafusion_physical_plan::sorts::sort::SortExec;
26use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
27use datafusion_physical_plan::tree_node::PlanContext;
28use datafusion_physical_plan::union::UnionExec;
29use datafusion_physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
30use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
31
32/// This utility function adds a `SortExec` above an operator according to the
33/// given ordering requirements while preserving the original partitioning.
34///
35/// Note that this updates the plan in both the `PlanContext.children` and
36/// the `PlanContext.plan`'s children. Therefore its not required to sync
37/// the child plans with [`PlanContext::update_plan_from_children`].
38pub fn add_sort_above<T: Clone + Default>(
39    node: PlanContext<T>,
40    sort_requirements: LexRequirement,
41    fetch: Option<usize>,
42) -> PlanContext<T> {
43    let mut sort_reqs: Vec<_> = sort_requirements.into();
44    sort_reqs.retain(|sort_expr| {
45        node.plan
46            .equivalence_properties()
47            .is_expr_constant(&sort_expr.expr)
48            .is_none()
49    });
50    let sort_exprs = sort_reqs.into_iter().map(Into::into).collect::<Vec<_>>();
51    let Some(ordering) = LexOrdering::new(sort_exprs) else {
52        return node;
53    };
54    let mut new_sort = SortExec::new(ordering, Arc::clone(&node.plan)).with_fetch(fetch);
55    if node.plan.output_partitioning().partition_count() > 1 {
56        new_sort = new_sort.with_preserve_partitioning(true);
57    }
58    PlanContext::new(Arc::new(new_sort), T::default(), vec![node])
59}
60
61/// This utility function adds a `SortExec` above an operator according to the
62/// given ordering requirements while preserving the original partitioning. If
63/// requirement is already satisfied no `SortExec` is added.
64pub fn add_sort_above_with_check<T: Clone + Default>(
65    node: PlanContext<T>,
66    sort_requirements: LexRequirement,
67    fetch: Option<usize>,
68) -> Result<PlanContext<T>> {
69    if !node
70        .plan
71        .equivalence_properties()
72        .ordering_satisfy_requirement(sort_requirements.clone())?
73    {
74        Ok(add_sort_above(node, sort_requirements, fetch))
75    } else {
76        Ok(node)
77    }
78}
79
80/// Checks whether the given operator is a [`SortExec`].
81pub fn is_sort(plan: &Arc<dyn ExecutionPlan>) -> bool {
82    plan.as_any().is::<SortExec>()
83}
84
85/// Checks whether the given operator is a window;
86/// i.e. either a [`WindowAggExec`] or a [`BoundedWindowAggExec`].
87pub fn is_window(plan: &Arc<dyn ExecutionPlan>) -> bool {
88    plan.as_any().is::<WindowAggExec>() || plan.as_any().is::<BoundedWindowAggExec>()
89}
90
91/// Checks whether the given operator is a [`UnionExec`].
92pub fn is_union(plan: &Arc<dyn ExecutionPlan>) -> bool {
93    plan.as_any().is::<UnionExec>()
94}
95
96/// Checks whether the given operator is a [`SortPreservingMergeExec`].
97pub fn is_sort_preserving_merge(plan: &Arc<dyn ExecutionPlan>) -> bool {
98    plan.as_any().is::<SortPreservingMergeExec>()
99}
100
101/// Checks whether the given operator is a [`CoalescePartitionsExec`].
102pub fn is_coalesce_partitions(plan: &Arc<dyn ExecutionPlan>) -> bool {
103    plan.as_any().is::<CoalescePartitionsExec>()
104}
105
106/// Checks whether the given operator is a [`RepartitionExec`].
107pub fn is_repartition(plan: &Arc<dyn ExecutionPlan>) -> bool {
108    plan.as_any().is::<RepartitionExec>()
109}
110
111/// Checks whether the given operator is a limit;
112/// i.e. either a [`LocalLimitExec`] or a [`GlobalLimitExec`].
113pub fn is_limit(plan: &Arc<dyn ExecutionPlan>) -> bool {
114    plan.as_any().is::<GlobalLimitExec>() || plan.as_any().is::<LocalLimitExec>()
115}