datafusion_physical_optimizer/
limit_pushdown_past_window.rs1use crate::PhysicalOptimizerRule;
19use datafusion_common::config::ConfigOptions;
20use datafusion_common::tree_node::{Transformed, TreeNode};
21use datafusion_common::ScalarValue;
22use datafusion_expr::{LimitEffect, WindowFrameBound, WindowFrameUnits};
23use datafusion_physical_expr::window::{
24 PlainAggregateWindowExpr, SlidingAggregateWindowExpr, StandardWindowExpr,
25 StandardWindowFunctionExpr, WindowExpr,
26};
27use datafusion_physical_plan::execution_plan::CardinalityEffect;
28use datafusion_physical_plan::limit::GlobalLimitExec;
29use datafusion_physical_plan::repartition::RepartitionExec;
30use datafusion_physical_plan::sorts::sort::SortExec;
31use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
32use datafusion_physical_plan::windows::{BoundedWindowAggExec, WindowUDFExpr};
33use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
34use std::cmp;
35use std::sync::Arc;
36
37#[derive(Default, Clone, Debug)]
42pub struct LimitPushPastWindows;
43
44impl LimitPushPastWindows {
45 pub fn new() -> Self {
46 Self
47 }
48}
49
50#[derive(Eq, PartialEq)]
51enum Phase {
52 FindOrGrow,
53 Apply,
54}
55
56#[derive(Default)]
57struct TraverseState {
58 pub limit: Option<usize>,
59 pub lookahead: usize,
60}
61
62impl TraverseState {
63 pub fn reset_limit(&mut self, limit: Option<usize>) {
64 self.limit = limit;
65 self.lookahead = 0;
66 }
67
68 pub fn max_lookahead(&mut self, new_val: usize) {
69 self.lookahead = self.lookahead.max(new_val);
70 }
71}
72
73impl PhysicalOptimizerRule for LimitPushPastWindows {
74 fn optimize(
75 &self,
76 original: Arc<dyn ExecutionPlan>,
77 config: &ConfigOptions,
78 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
79 if !config.optimizer.enable_window_limits {
80 return Ok(original);
81 }
82 let mut ctx = TraverseState::default();
83 let mut phase = Phase::FindOrGrow;
84 let result = original.transform_down(|node| {
85 let reset = |node,
87 ctx: &mut TraverseState|
88 -> datafusion_common::Result<
89 Transformed<Arc<dyn ExecutionPlan>>,
90 > {
91 ctx.limit = None;
92 ctx.lookahead = 0;
93 Ok(Transformed::no(node))
94 };
95
96 if node.children().len() > 1 {
98 return reset(node, &mut ctx);
99 }
100
101 if phase == Phase::FindOrGrow && get_limit(&node, &mut ctx) {
103 return Ok(Transformed::no(node));
104 }
105
106 if let Some(window) = node.as_any().downcast_ref::<BoundedWindowAggExec>() {
108 phase = Phase::Apply;
109 if !grow_limit(window, &mut ctx) {
110 return reset(node, &mut ctx);
111 }
112 return Ok(Transformed::no(node));
113 }
114
115 if phase == Phase::Apply {
117 if let Some(out) = apply_limit(&node, &mut ctx) {
118 return Ok(out);
119 }
120 }
121
122 if !node.supports_limit_pushdown() {
124 return reset(node, &mut ctx);
125 }
126 if let Some(part) = node.as_any().downcast_ref::<RepartitionExec>() {
127 let output = part.partitioning().partition_count();
128 let input = part.input().output_partitioning().partition_count();
129 if output < input {
130 return reset(node, &mut ctx);
131 }
132 }
133 match node.cardinality_effect() {
134 CardinalityEffect::Unknown => return reset(node, &mut ctx),
135 CardinalityEffect::LowerEqual => return reset(node, &mut ctx),
136 CardinalityEffect::Equal => {}
137 CardinalityEffect::GreaterEqual => {}
138 }
139
140 Ok(Transformed::no(node))
141 })?;
142 Ok(result.data)
143 }
144
145 fn name(&self) -> &str {
146 "LimitPushPastWindows"
147 }
148
149 fn schema_check(&self) -> bool {
150 false }
152}
153
154fn grow_limit(window: &BoundedWindowAggExec, ctx: &mut TraverseState) -> bool {
155 let mut max_rel = 0;
156 for expr in window.window_expr().iter() {
157 match get_limit_effect(expr) {
159 LimitEffect::None => {}
160 LimitEffect::Unknown => return false,
161 LimitEffect::Relative(rel) => max_rel = max_rel.max(rel),
162 LimitEffect::Absolute(val) => {
163 let cur = ctx.limit.unwrap_or(0);
164 ctx.limit = Some(cur.max(val))
165 }
166 }
167
168 let frame = expr.get_window_frame();
170 if frame.units != WindowFrameUnits::Rows {
171 return false; }
173 let Some(end_bound) = bound_to_usize(&frame.end_bound) else {
174 return false; };
176 ctx.max_lookahead(end_bound);
177 }
178
179 ctx.max_lookahead(ctx.lookahead + max_rel);
181 true
182}
183
184fn apply_limit(
185 node: &Arc<dyn ExecutionPlan>,
186 ctx: &mut TraverseState,
187) -> Option<Transformed<Arc<dyn ExecutionPlan>>> {
188 if !node.as_any().is::<SortExec>() && !node.as_any().is::<SortPreservingMergeExec>() {
189 return None;
190 }
191 let latest = ctx.limit.take();
192 let Some(fetch) = latest else {
193 ctx.limit = None;
194 ctx.lookahead = 0;
195 return Some(Transformed::no(Arc::clone(node)));
196 };
197 let fetch = match node.fetch() {
198 None => fetch + ctx.lookahead,
199 Some(existing) => cmp::min(existing, fetch + ctx.lookahead),
200 };
201 Some(Transformed::complete(node.with_fetch(Some(fetch)).unwrap()))
202}
203
204fn get_limit(node: &Arc<dyn ExecutionPlan>, ctx: &mut TraverseState) -> bool {
205 if let Some(limit) = node.as_any().downcast_ref::<GlobalLimitExec>() {
206 ctx.reset_limit(limit.fetch().map(|fetch| fetch + limit.skip()));
207 return true;
208 }
209 if let Some(limit) = node.as_any().downcast_ref::<SortPreservingMergeExec>() {
210 ctx.reset_limit(limit.fetch());
211 return true;
212 }
213 false
214}
215
216fn get_limit_effect(expr: &Arc<dyn WindowExpr>) -> LimitEffect {
229 if expr.as_any().is::<PlainAggregateWindowExpr>()
231 || expr.as_any().is::<SlidingAggregateWindowExpr>()
232 {
233 return LimitEffect::None;
234 }
235
236 let Some(swe) = expr.as_any().downcast_ref::<StandardWindowExpr>() else {
238 return LimitEffect::Unknown; };
240 let swfe = swe.get_standard_func_expr();
241 let Some(udf) = swfe.as_any().downcast_ref::<WindowUDFExpr>() else {
242 return LimitEffect::Unknown; };
244 udf.limit_effect()
245}
246
247fn bound_to_usize(bound: &WindowFrameBound) -> Option<usize> {
248 match bound {
249 WindowFrameBound::Preceding(_) => Some(0),
250 WindowFrameBound::CurrentRow => Some(0),
251 WindowFrameBound::Following(ScalarValue::UInt64(Some(scalar))) => {
252 Some(*scalar as usize)
253 }
254 _ => None,
255 }
256}