datafusion_physical_optimizer/
update_aggr_exprs.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! An optimizer rule that checks ordering requirements of aggregate expressions
19//! and modifies the expressions to work more efficiently if possible.
20
21use std::sync::Arc;
22
23use datafusion_common::config::ConfigOptions;
24use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
25use datafusion_common::{plan_datafusion_err, Result};
26use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
27use datafusion_physical_expr::{EquivalenceProperties, PhysicalSortRequirement};
28use datafusion_physical_plan::aggregates::{concat_slices, AggregateExec};
29use datafusion_physical_plan::windows::get_ordered_partition_by_indices;
30use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
31
32use crate::PhysicalOptimizerRule;
33
34/// This optimizer rule checks ordering requirements of aggregate expressions.
35///
36/// There are 3 kinds of aggregators in terms of ordering requirements:
37/// - `AggregateOrderSensitivity::Insensitive`, meaning that ordering is not
38///   important.
39/// - `AggregateOrderSensitivity::HardRequirement`, meaning that the aggregator
40///   requires a specific ordering.
41/// - `AggregateOrderSensitivity::Beneficial`, meaning that the aggregator can
42///   handle unordered input, but can run more efficiently if its input conforms
43///   to a specific ordering.
44///
45/// This rule analyzes aggregate expressions of type `Beneficial` to see whether
46/// their input ordering requirements are satisfied. If this is the case, the
47/// aggregators are modified to run in a more efficient mode.
48#[derive(Default, Debug)]
49pub struct OptimizeAggregateOrder {}
50
51impl OptimizeAggregateOrder {
52    #[allow(missing_docs)]
53    pub fn new() -> Self {
54        Self::default()
55    }
56}
57
58impl PhysicalOptimizerRule for OptimizeAggregateOrder {
59    /// Applies the `OptimizeAggregateOrder` rule to the provided execution plan.
60    ///
61    /// This function traverses the execution plan tree, identifies `AggregateExec` nodes,
62    /// and optimizes their aggregate expressions based on existing input orderings.
63    /// If optimizations are applied, it returns a modified execution plan.
64    ///
65    /// # Arguments
66    ///
67    /// * `plan` - The root of the execution plan to optimize.
68    /// * `_config` - Configuration options (currently unused).
69    ///
70    /// # Returns
71    ///
72    /// A `Result` containing the potentially optimized execution plan or an error.
73    fn optimize(
74        &self,
75        plan: Arc<dyn ExecutionPlan>,
76        _config: &ConfigOptions,
77    ) -> Result<Arc<dyn ExecutionPlan>> {
78        plan.transform_up(|plan| {
79            if let Some(aggr_exec) = plan.as_any().downcast_ref::<AggregateExec>() {
80                // Final stage implementations do not rely on ordering -- those
81                // ordering fields may be pruned out by first stage aggregates.
82                // Hence, necessary information for proper merge is added during
83                // the first stage to the state field, which the final stage uses.
84                if !aggr_exec.mode().is_first_stage() {
85                    return Ok(Transformed::no(plan));
86                }
87                let input = aggr_exec.input();
88                let mut aggr_exprs = aggr_exec.aggr_expr().to_vec();
89
90                let groupby_exprs = aggr_exec.group_expr().input_exprs();
91                // If the existing ordering satisfies a prefix of the GROUP BY
92                // expressions, prefix requirements with this section. In this
93                // case, aggregation will work more efficiently.
94                let indices = get_ordered_partition_by_indices(&groupby_exprs, input)?;
95                let requirement = indices
96                    .iter()
97                    .map(|&idx| {
98                        PhysicalSortRequirement::new(
99                            Arc::clone(&groupby_exprs[idx]),
100                            None,
101                        )
102                    })
103                    .collect::<Vec<_>>();
104
105                aggr_exprs = try_convert_aggregate_if_better(
106                    aggr_exprs,
107                    &requirement,
108                    input.equivalence_properties(),
109                )?;
110
111                let aggr_exec = aggr_exec.with_new_aggr_exprs(aggr_exprs);
112
113                Ok(Transformed::yes(Arc::new(aggr_exec) as _))
114            } else {
115                Ok(Transformed::no(plan))
116            }
117        })
118        .data()
119    }
120
121    fn name(&self) -> &str {
122        "OptimizeAggregateOrder"
123    }
124
125    fn schema_check(&self) -> bool {
126        true
127    }
128}
129
130/// Tries to convert each aggregate expression to a potentially more efficient
131/// version.
132///
133/// # Parameters
134///
135/// * `aggr_exprs` - A vector of `AggregateFunctionExpr` representing the
136///   aggregate expressions to be optimized.
137/// * `prefix_requirement` - An array slice representing the ordering
138///   requirements preceding the aggregate expressions.
139/// * `eq_properties` - A reference to the `EquivalenceProperties` object
140///   containing ordering information.
141///
142/// # Returns
143///
144/// Returns `Ok(converted_aggr_exprs)` if the conversion process completes
145/// successfully. Any errors occurring during the conversion process are
146/// passed through.
147fn try_convert_aggregate_if_better(
148    aggr_exprs: Vec<Arc<AggregateFunctionExpr>>,
149    prefix_requirement: &[PhysicalSortRequirement],
150    eq_properties: &EquivalenceProperties,
151) -> Result<Vec<Arc<AggregateFunctionExpr>>> {
152    aggr_exprs
153        .into_iter()
154        .map(|aggr_expr| {
155            let order_bys = aggr_expr.order_bys();
156            // If the aggregate expression benefits from input ordering, and
157            // there is an actual ordering enabling this, try to update the
158            // aggregate expression to benefit from the existing ordering.
159            // Otherwise, leave it as is.
160            if !aggr_expr.order_sensitivity().is_beneficial() {
161                Ok(aggr_expr)
162            } else if !order_bys.is_empty() {
163                if eq_properties.ordering_satisfy_requirement(concat_slices(
164                    prefix_requirement,
165                    &order_bys
166                        .iter()
167                        .map(|e| e.clone().into())
168                        .collect::<Vec<_>>(),
169                ))? {
170                    // Existing ordering satisfies the aggregator requirements:
171                    aggr_expr.with_beneficial_ordering(true)?.map(Arc::new)
172                } else if eq_properties.ordering_satisfy_requirement(concat_slices(
173                    prefix_requirement,
174                    &order_bys
175                        .iter()
176                        .map(|e| e.reverse().into())
177                        .collect::<Vec<_>>(),
178                ))? {
179                    // Converting to reverse enables more efficient execution
180                    // given the existing ordering (if possible):
181                    aggr_expr
182                        .reverse_expr()
183                        .map(Arc::new)
184                        .unwrap_or(aggr_expr)
185                        .with_beneficial_ordering(true)?
186                        .map(Arc::new)
187                } else {
188                    // There is no beneficial ordering present -- aggregation
189                    // will still work albeit in a less efficient mode.
190                    aggr_expr.with_beneficial_ordering(false)?.map(Arc::new)
191                }
192                .ok_or_else(|| {
193                    plan_datafusion_err!(
194                    "Expects an aggregate expression that can benefit from input ordering"
195                )
196                })
197            } else {
198                Ok(aggr_expr)
199            }
200        })
201        .collect()
202}