datafusion_physical_optimizer/
limit_pushdown.rs1use std::fmt::Debug;
22use std::sync::Arc;
23
24use crate::PhysicalOptimizerRule;
25
26use datafusion_common::config::ConfigOptions;
27use datafusion_common::error::Result;
28use datafusion_common::tree_node::{Transformed, TreeNodeRecursion};
29use datafusion_common::utils::combine_limit;
30use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
31use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
32use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
33use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
34#[derive(Default, Debug)]
37pub struct LimitPushdown {}
38
39#[derive(Default, Clone, Debug)]
49pub struct GlobalRequirements {
50 fetch: Option<usize>,
51 skip: usize,
52 satisfied: bool,
53}
54
55impl LimitPushdown {
56 #[allow(missing_docs)]
57 pub fn new() -> Self {
58 Self {}
59 }
60}
61
62impl PhysicalOptimizerRule for LimitPushdown {
63 fn optimize(
64 &self,
65 plan: Arc<dyn ExecutionPlan>,
66 _config: &ConfigOptions,
67 ) -> Result<Arc<dyn ExecutionPlan>> {
68 let global_state = GlobalRequirements {
69 fetch: None,
70 skip: 0,
71 satisfied: false,
72 };
73 pushdown_limits(plan, global_state)
74 }
75
76 fn name(&self) -> &str {
77 "LimitPushdown"
78 }
79
80 fn schema_check(&self) -> bool {
81 true
82 }
83}
84
85#[derive(Debug)]
88pub enum LimitExec {
89 Global(GlobalLimitExec),
90 Local(LocalLimitExec),
91}
92
93impl LimitExec {
94 fn input(&self) -> &Arc<dyn ExecutionPlan> {
95 match self {
96 Self::Global(global) => global.input(),
97 Self::Local(local) => local.input(),
98 }
99 }
100
101 fn fetch(&self) -> Option<usize> {
102 match self {
103 Self::Global(global) => global.fetch(),
104 Self::Local(local) => Some(local.fetch()),
105 }
106 }
107
108 fn skip(&self) -> usize {
109 match self {
110 Self::Global(global) => global.skip(),
111 Self::Local(_) => 0,
112 }
113 }
114}
115
116impl From<LimitExec> for Arc<dyn ExecutionPlan> {
117 fn from(limit_exec: LimitExec) -> Self {
118 match limit_exec {
119 LimitExec::Global(global) => Arc::new(global),
120 LimitExec::Local(local) => Arc::new(local),
121 }
122 }
123}
124
125pub fn pushdown_limit_helper(
133 mut pushdown_plan: Arc<dyn ExecutionPlan>,
134 mut global_state: GlobalRequirements,
135) -> Result<(Transformed<Arc<dyn ExecutionPlan>>, GlobalRequirements)> {
136 if let Some(limit_exec) = extract_limit(&pushdown_plan) {
138 let (skip, fetch) = combine_limit(
141 global_state.skip,
142 global_state.fetch,
143 limit_exec.skip(),
144 limit_exec.fetch(),
145 );
146 global_state.skip = skip;
147 global_state.fetch = fetch;
148
149 return Ok((
153 Transformed {
154 data: Arc::clone(limit_exec.input()),
155 transformed: true,
156 tnr: TreeNodeRecursion::Stop,
157 },
158 global_state,
159 ));
160 }
161
162 if pushdown_plan.fetch().is_some() {
165 if global_state.fetch.is_none() {
166 global_state.satisfied = true;
167 }
168 (global_state.skip, global_state.fetch) = combine_limit(
169 global_state.skip,
170 global_state.fetch,
171 0,
172 pushdown_plan.fetch(),
173 );
174 }
175
176 let Some(global_fetch) = global_state.fetch else {
177 return if global_state.skip > 0 && !global_state.satisfied {
179 global_state.satisfied = true;
181 Ok((
182 Transformed::yes(add_global_limit(
183 pushdown_plan,
184 global_state.skip,
185 None,
186 )),
187 global_state,
188 ))
189 } else {
190 Ok((Transformed::no(pushdown_plan), global_state))
192 };
193 };
194
195 let skip_and_fetch = Some(global_fetch + global_state.skip);
196
197 if pushdown_plan.supports_limit_pushdown() {
198 if !combines_input_partitions(&pushdown_plan) {
199 Ok((Transformed::no(pushdown_plan), global_state))
202 } else if let Some(plan_with_fetch) = pushdown_plan.with_fetch(skip_and_fetch) {
203 let mut new_plan = plan_with_fetch;
207 if global_state.skip > 0 {
210 new_plan =
211 add_global_limit(new_plan, global_state.skip, global_state.fetch);
212 }
213 global_state.fetch = skip_and_fetch;
214 global_state.skip = 0;
215 global_state.satisfied = true;
216 Ok((Transformed::yes(new_plan), global_state))
217 } else if global_state.satisfied {
218 Ok((Transformed::no(pushdown_plan), global_state))
220 } else {
221 global_state.satisfied = true;
222 Ok((
223 Transformed::yes(add_limit(
224 pushdown_plan,
225 global_state.skip,
226 global_fetch,
227 )),
228 global_state,
229 ))
230 }
231 } else {
232 let global_skip = global_state.skip;
238 global_state.fetch = None;
239 global_state.skip = 0;
240
241 let maybe_fetchable = pushdown_plan.with_fetch(skip_and_fetch);
242 if global_state.satisfied {
243 if let Some(plan_with_fetch) = maybe_fetchable {
244 Ok((Transformed::yes(plan_with_fetch), global_state))
245 } else {
246 Ok((Transformed::no(pushdown_plan), global_state))
247 }
248 } else {
249 global_state.satisfied = true;
250 pushdown_plan = if let Some(plan_with_fetch) = maybe_fetchable {
251 if global_skip > 0 {
252 add_global_limit(plan_with_fetch, global_skip, Some(global_fetch))
253 } else {
254 plan_with_fetch
255 }
256 } else {
257 add_limit(pushdown_plan, global_skip, global_fetch)
258 };
259 Ok((Transformed::yes(pushdown_plan), global_state))
260 }
261 }
262}
263
264pub(crate) fn pushdown_limits(
266 pushdown_plan: Arc<dyn ExecutionPlan>,
267 global_state: GlobalRequirements,
268) -> Result<Arc<dyn ExecutionPlan>> {
269 let (mut new_node, mut global_state) =
272 pushdown_limit_helper(pushdown_plan, global_state)?;
273
274 while new_node.tnr == TreeNodeRecursion::Stop {
276 (new_node, global_state) = pushdown_limit_helper(new_node.data, global_state)?;
277 }
278
279 let children = new_node.data.children();
281 let new_children = children
282 .into_iter()
283 .map(|child| {
284 pushdown_limits(Arc::<dyn ExecutionPlan>::clone(child), global_state.clone())
285 })
286 .collect::<Result<_>>()?;
287 new_node.data.with_new_children(new_children)
288}
289
290fn extract_limit(plan: &Arc<dyn ExecutionPlan>) -> Option<LimitExec> {
293 if let Some(global_limit) = plan.as_any().downcast_ref::<GlobalLimitExec>() {
294 Some(LimitExec::Global(GlobalLimitExec::new(
295 Arc::clone(global_limit.input()),
296 global_limit.skip(),
297 global_limit.fetch(),
298 )))
299 } else {
300 plan.as_any()
301 .downcast_ref::<LocalLimitExec>()
302 .map(|local_limit| {
303 LimitExec::Local(LocalLimitExec::new(
304 Arc::clone(local_limit.input()),
305 local_limit.fetch(),
306 ))
307 })
308 }
309}
310
311fn combines_input_partitions(plan: &Arc<dyn ExecutionPlan>) -> bool {
313 let plan = plan.as_any();
314 plan.is::<CoalescePartitionsExec>() || plan.is::<SortPreservingMergeExec>()
315}
316
317fn add_limit(
320 pushdown_plan: Arc<dyn ExecutionPlan>,
321 skip: usize,
322 fetch: usize,
323) -> Arc<dyn ExecutionPlan> {
324 if skip > 0 || pushdown_plan.output_partitioning().partition_count() == 1 {
325 add_global_limit(pushdown_plan, skip, Some(fetch))
326 } else {
327 Arc::new(LocalLimitExec::new(pushdown_plan, fetch + skip)) as _
328 }
329}
330
331fn add_global_limit(
333 pushdown_plan: Arc<dyn ExecutionPlan>,
334 skip: usize,
335 fetch: Option<usize>,
336) -> Arc<dyn ExecutionPlan> {
337 Arc::new(GlobalLimitExec::new(pushdown_plan, skip, fetch)) as _
338}
339
340