datafusion_physical_optimizer/
limited_distinct_aggregation.rs1use std::sync::Arc;
22
23use datafusion_physical_plan::aggregates::AggregateExec;
24use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
25use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
26
27use datafusion_common::config::ConfigOptions;
28use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
29use datafusion_common::Result;
30
31use crate::PhysicalOptimizerRule;
32use itertools::Itertools;
33
34#[derive(Debug)]
39pub struct LimitedDistinctAggregation {}
40
41impl LimitedDistinctAggregation {
42 pub fn new() -> Self {
44 Self {}
45 }
46
47 fn transform_agg(
48 aggr: &AggregateExec,
49 limit: usize,
50 ) -> Option<Arc<dyn ExecutionPlan>> {
51 if !aggr.is_unordered_unfiltered_group_by_distinct() {
53 return None;
54 }
55
56 let new_aggr = AggregateExec::try_new(
58 *aggr.mode(),
59 aggr.group_expr().clone(),
60 aggr.aggr_expr().to_vec(),
61 aggr.filter_expr().to_vec(),
62 aggr.input().to_owned(),
63 aggr.input_schema(),
64 )
65 .expect("Unable to copy Aggregate!")
66 .with_limit(Some(limit));
67 Some(Arc::new(new_aggr))
68 }
69
70 fn transform_limit(plan: Arc<dyn ExecutionPlan>) -> Option<Arc<dyn ExecutionPlan>> {
75 let limit: usize;
76 let mut global_fetch: Option<usize> = None;
77 let mut global_skip: usize = 0;
78 let children: Vec<Arc<dyn ExecutionPlan>>;
79 let mut is_global_limit = false;
80 if let Some(local_limit) = plan.as_any().downcast_ref::<LocalLimitExec>() {
81 limit = local_limit.fetch();
82 children = local_limit.children().into_iter().cloned().collect();
83 } else if let Some(global_limit) = plan.as_any().downcast_ref::<GlobalLimitExec>()
84 {
85 global_fetch = global_limit.fetch();
86 global_fetch?;
87 global_skip = global_limit.skip();
88 limit = global_fetch.unwrap() + global_skip;
90 children = global_limit.children().into_iter().cloned().collect();
91 is_global_limit = true
92 } else {
93 return None;
94 }
95 let child = children.iter().exactly_one().ok()?;
96 if plan.output_ordering().is_some() {
98 return None;
99 }
100 if plan.required_input_ordering()[0].is_some() {
102 return None;
103 }
104
105 let mut match_aggr: Arc<dyn ExecutionPlan> = plan;
108 let mut found_match_aggr = false;
109
110 let mut rewrite_applicable = true;
111 let closure = |plan: Arc<dyn ExecutionPlan>| {
112 if !rewrite_applicable {
113 return Ok(Transformed::no(plan));
114 }
115 if let Some(aggr) = plan.as_any().downcast_ref::<AggregateExec>() {
116 if found_match_aggr {
117 if let Some(parent_aggr) =
118 match_aggr.as_any().downcast_ref::<AggregateExec>()
119 {
120 if !parent_aggr.group_expr().eq(aggr.group_expr()) {
121 rewrite_applicable = false;
124 return Ok(Transformed::no(plan));
125 }
126 }
127 }
128 match Self::transform_agg(aggr, limit) {
131 None => {}
132 Some(new_aggr) => {
133 match_aggr = plan;
134 found_match_aggr = true;
135 return Ok(Transformed::yes(new_aggr));
136 }
137 }
138 }
139 rewrite_applicable = false;
140 Ok(Transformed::no(plan))
141 };
142 let child = child.to_owned().transform_down(closure).data().ok()?;
143 if is_global_limit {
144 return Some(Arc::new(GlobalLimitExec::new(
145 child,
146 global_skip,
147 global_fetch,
148 )));
149 }
150 Some(Arc::new(LocalLimitExec::new(child, limit)))
151 }
152}
153
154impl Default for LimitedDistinctAggregation {
155 fn default() -> Self {
156 Self::new()
157 }
158}
159
160impl PhysicalOptimizerRule for LimitedDistinctAggregation {
161 fn optimize(
162 &self,
163 plan: Arc<dyn ExecutionPlan>,
164 config: &ConfigOptions,
165 ) -> Result<Arc<dyn ExecutionPlan>> {
166 if config.optimizer.enable_distinct_aggregation_soft_limit {
167 plan.transform_down(|plan| {
168 Ok(
169 if let Some(plan) =
170 LimitedDistinctAggregation::transform_limit(plan.to_owned())
171 {
172 Transformed::yes(plan)
173 } else {
174 Transformed::no(plan)
175 },
176 )
177 })
178 .data()
179 } else {
180 Ok(plan)
181 }
182 }
183
184 fn name(&self) -> &str {
185 "LimitedDistinctAggregation"
186 }
187
188 fn schema_check(&self) -> bool {
189 true
190 }
191}
192
193