datafusion_physical_optimizer/
limit_pushdown.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`LimitPushdown`] pushes `LIMIT` down through `ExecutionPlan`s to reduce
19//! data transfer as much as possible.
20
21use std::fmt::Debug;
22use std::sync::Arc;
23
24use crate::PhysicalOptimizerRule;
25
26use datafusion_common::config::ConfigOptions;
27use datafusion_common::error::Result;
28use datafusion_common::tree_node::{Transformed, TreeNodeRecursion};
29use datafusion_common::utils::combine_limit;
30use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
31use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
32use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
33use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
34/// This rule inspects [`ExecutionPlan`]'s and pushes down the fetch limit from
35/// the parent to the child if applicable.
36#[derive(Default, Debug)]
37pub struct LimitPushdown {}
38
39/// This is a "data class" we use within the [`LimitPushdown`] rule to push
40/// down [`LimitExec`] in the plan. GlobalRequirements are hold as a rule-wide state
41/// and holds the fetch and skip information. The struct also has a field named
42/// satisfied which means if the "current" plan is valid in terms of limits or not.
43///
44/// For example: If the plan is satisfied with current fetch info, we decide to not add a LocalLimit
45///
46/// [`LimitPushdown`]: crate::limit_pushdown::LimitPushdown
47/// [`LimitExec`]: crate::limit_pushdown::LimitExec
48#[derive(Default, Clone, Debug)]
49pub struct GlobalRequirements {
50    fetch: Option<usize>,
51    skip: usize,
52    satisfied: bool,
53}
54
55impl LimitPushdown {
56    #[allow(missing_docs)]
57    pub fn new() -> Self {
58        Self {}
59    }
60}
61
62impl PhysicalOptimizerRule for LimitPushdown {
63    fn optimize(
64        &self,
65        plan: Arc<dyn ExecutionPlan>,
66        _config: &ConfigOptions,
67    ) -> Result<Arc<dyn ExecutionPlan>> {
68        let global_state = GlobalRequirements {
69            fetch: None,
70            skip: 0,
71            satisfied: false,
72        };
73        pushdown_limits(plan, global_state)
74    }
75
76    fn name(&self) -> &str {
77        "LimitPushdown"
78    }
79
80    fn schema_check(&self) -> bool {
81        true
82    }
83}
84
85/// This enumeration makes `skip` and `fetch` calculations easier by providing
86/// a single API for both local and global limit operators.
87#[derive(Debug)]
88pub enum LimitExec {
89    Global(GlobalLimitExec),
90    Local(LocalLimitExec),
91}
92
93impl LimitExec {
94    fn input(&self) -> &Arc<dyn ExecutionPlan> {
95        match self {
96            Self::Global(global) => global.input(),
97            Self::Local(local) => local.input(),
98        }
99    }
100
101    fn fetch(&self) -> Option<usize> {
102        match self {
103            Self::Global(global) => global.fetch(),
104            Self::Local(local) => Some(local.fetch()),
105        }
106    }
107
108    fn skip(&self) -> usize {
109        match self {
110            Self::Global(global) => global.skip(),
111            Self::Local(_) => 0,
112        }
113    }
114}
115
116impl From<LimitExec> for Arc<dyn ExecutionPlan> {
117    fn from(limit_exec: LimitExec) -> Self {
118        match limit_exec {
119            LimitExec::Global(global) => Arc::new(global),
120            LimitExec::Local(local) => Arc::new(local),
121        }
122    }
123}
124
125/// This function is the main helper function of the `LimitPushDown` rule.
126/// The helper takes an `ExecutionPlan` and a global (algorithm) state which is
127/// an instance of `GlobalRequirements` and modifies these parameters while
128/// checking if the limits can be pushed down or not.
129///
130/// If a limit is encountered, a [`TreeNodeRecursion::Stop`] is returned. Otherwise,
131/// return a [`TreeNodeRecursion::Continue`].
132pub fn pushdown_limit_helper(
133    mut pushdown_plan: Arc<dyn ExecutionPlan>,
134    mut global_state: GlobalRequirements,
135) -> Result<(Transformed<Arc<dyn ExecutionPlan>>, GlobalRequirements)> {
136    // Extract limit, if exist, and return child inputs.
137    if let Some(limit_exec) = extract_limit(&pushdown_plan) {
138        // If we have fetch/skip info in the global state already, we need to
139        // decide which one to continue with:
140        let (skip, fetch) = combine_limit(
141            global_state.skip,
142            global_state.fetch,
143            limit_exec.skip(),
144            limit_exec.fetch(),
145        );
146        global_state.skip = skip;
147        global_state.fetch = fetch;
148
149        // Now the global state has the most recent information, we can remove
150        // the `LimitExec` plan. We will decide later if we should add it again
151        // or not.
152        return Ok((
153            Transformed {
154                data: Arc::clone(limit_exec.input()),
155                transformed: true,
156                tnr: TreeNodeRecursion::Stop,
157            },
158            global_state,
159        ));
160    }
161
162    // If we have a non-limit operator with fetch capability, update global
163    // state as necessary:
164    if pushdown_plan.fetch().is_some() {
165        if global_state.fetch.is_none() {
166            global_state.satisfied = true;
167        }
168        (global_state.skip, global_state.fetch) = combine_limit(
169            global_state.skip,
170            global_state.fetch,
171            0,
172            pushdown_plan.fetch(),
173        );
174    }
175
176    let Some(global_fetch) = global_state.fetch else {
177        // There's no valid fetch information, exit early:
178        return if global_state.skip > 0 && !global_state.satisfied {
179            // There might be a case with only offset, if so add a global limit:
180            global_state.satisfied = true;
181            Ok((
182                Transformed::yes(add_global_limit(
183                    pushdown_plan,
184                    global_state.skip,
185                    None,
186                )),
187                global_state,
188            ))
189        } else {
190            // There's no info on offset or fetch, nothing to do:
191            Ok((Transformed::no(pushdown_plan), global_state))
192        };
193    };
194
195    let skip_and_fetch = Some(global_fetch + global_state.skip);
196
197    if pushdown_plan.supports_limit_pushdown() {
198        if !combines_input_partitions(&pushdown_plan) {
199            // We have information in the global state and the plan pushes down,
200            // continue:
201            Ok((Transformed::no(pushdown_plan), global_state))
202        } else if let Some(plan_with_fetch) = pushdown_plan.with_fetch(skip_and_fetch) {
203            // This plan is combining input partitions, so we need to add the
204            // fetch info to plan if possible. If not, we must add a `LimitExec`
205            // with the information from the global state.
206            let mut new_plan = plan_with_fetch;
207            // Execution plans can't (yet) handle skip, so if we have one,
208            // we still need to add a global limit
209            if global_state.skip > 0 {
210                new_plan =
211                    add_global_limit(new_plan, global_state.skip, global_state.fetch);
212            }
213            global_state.fetch = skip_and_fetch;
214            global_state.skip = 0;
215            global_state.satisfied = true;
216            Ok((Transformed::yes(new_plan), global_state))
217        } else if global_state.satisfied {
218            // If the plan is already satisfied, do not add a limit:
219            Ok((Transformed::no(pushdown_plan), global_state))
220        } else {
221            global_state.satisfied = true;
222            Ok((
223                Transformed::yes(add_limit(
224                    pushdown_plan,
225                    global_state.skip,
226                    global_fetch,
227                )),
228                global_state,
229            ))
230        }
231    } else {
232        // The plan does not support push down and it is not a limit. We will need
233        // to add a limit or a fetch. If the plan is already satisfied, we will try
234        // to add the fetch info and return the plan.
235
236        // There's no push down, change fetch & skip to default values:
237        let global_skip = global_state.skip;
238        global_state.fetch = None;
239        global_state.skip = 0;
240
241        let maybe_fetchable = pushdown_plan.with_fetch(skip_and_fetch);
242        if global_state.satisfied {
243            if let Some(plan_with_fetch) = maybe_fetchable {
244                Ok((Transformed::yes(plan_with_fetch), global_state))
245            } else {
246                Ok((Transformed::no(pushdown_plan), global_state))
247            }
248        } else {
249            global_state.satisfied = true;
250            pushdown_plan = if let Some(plan_with_fetch) = maybe_fetchable {
251                if global_skip > 0 {
252                    add_global_limit(plan_with_fetch, global_skip, Some(global_fetch))
253                } else {
254                    plan_with_fetch
255                }
256            } else {
257                add_limit(pushdown_plan, global_skip, global_fetch)
258            };
259            Ok((Transformed::yes(pushdown_plan), global_state))
260        }
261    }
262}
263
264/// Pushes down the limit through the plan.
265pub(crate) fn pushdown_limits(
266    pushdown_plan: Arc<dyn ExecutionPlan>,
267    global_state: GlobalRequirements,
268) -> Result<Arc<dyn ExecutionPlan>> {
269    // Call pushdown_limit_helper.
270    // This will either extract the limit node (returning the child), or apply the limit pushdown.
271    let (mut new_node, mut global_state) =
272        pushdown_limit_helper(pushdown_plan, global_state)?;
273
274    // While limits exist, continue combining the global_state.
275    while new_node.tnr == TreeNodeRecursion::Stop {
276        (new_node, global_state) = pushdown_limit_helper(new_node.data, global_state)?;
277    }
278
279    // Apply pushdown limits in children
280    let children = new_node.data.children();
281    let new_children = children
282        .into_iter()
283        .map(|child| {
284            pushdown_limits(Arc::<dyn ExecutionPlan>::clone(child), global_state.clone())
285        })
286        .collect::<Result<_>>()?;
287    new_node.data.with_new_children(new_children)
288}
289
290/// Transforms the [`ExecutionPlan`] into a [`LimitExec`] if it is a
291/// [`GlobalLimitExec`] or a [`LocalLimitExec`].
292fn extract_limit(plan: &Arc<dyn ExecutionPlan>) -> Option<LimitExec> {
293    if let Some(global_limit) = plan.as_any().downcast_ref::<GlobalLimitExec>() {
294        Some(LimitExec::Global(GlobalLimitExec::new(
295            Arc::clone(global_limit.input()),
296            global_limit.skip(),
297            global_limit.fetch(),
298        )))
299    } else {
300        plan.as_any()
301            .downcast_ref::<LocalLimitExec>()
302            .map(|local_limit| {
303                LimitExec::Local(LocalLimitExec::new(
304                    Arc::clone(local_limit.input()),
305                    local_limit.fetch(),
306                ))
307            })
308    }
309}
310
311/// Checks if the given plan combines input partitions.
312fn combines_input_partitions(plan: &Arc<dyn ExecutionPlan>) -> bool {
313    let plan = plan.as_any();
314    plan.is::<CoalescePartitionsExec>() || plan.is::<SortPreservingMergeExec>()
315}
316
317/// Adds a limit to the plan, chooses between global and local limits based on
318/// skip value and the number of partitions.
319fn add_limit(
320    pushdown_plan: Arc<dyn ExecutionPlan>,
321    skip: usize,
322    fetch: usize,
323) -> Arc<dyn ExecutionPlan> {
324    if skip > 0 || pushdown_plan.output_partitioning().partition_count() == 1 {
325        add_global_limit(pushdown_plan, skip, Some(fetch))
326    } else {
327        Arc::new(LocalLimitExec::new(pushdown_plan, fetch + skip)) as _
328    }
329}
330
331/// Adds a global limit to the plan.
332fn add_global_limit(
333    pushdown_plan: Arc<dyn ExecutionPlan>,
334    skip: usize,
335    fetch: Option<usize>,
336) -> Arc<dyn ExecutionPlan> {
337    Arc::new(GlobalLimitExec::new(pushdown_plan, skip, fetch)) as _
338}
339
340// See tests in datafusion/core/tests/physical_optimizer