datafusion_physical_optimizer/
filter_pushdown.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Filter Pushdown Optimization Process
19//!
20//! The filter pushdown mechanism involves four key steps:
21//! 1. **Optimizer Asks Parent for a Filter Pushdown Plan**: The optimizer calls [`ExecutionPlan::gather_filters_for_pushdown`]
22//!    on the parent node, passing in parent predicates and phase. The parent node creates a [`FilterDescription`]
23//!    by inspecting its logic and children's schemas, determining which filters can be pushed to each child.
24//! 2. **Optimizer Executes Pushdown**: The optimizer recursively calls `push_down_filters` in this module on each child,
25//!    passing the appropriate filters (`Vec<Arc<dyn PhysicalExpr>>`) for that child.
26//! 3. **Optimizer Gathers Results**: The optimizer collects [`FilterPushdownPropagation`] results from children,
27//!    containing information about which filters were successfully pushed down vs. unsupported.
28//! 4. **Parent Responds**: The optimizer calls [`ExecutionPlan::handle_child_pushdown_result`] on the parent,
29//!    passing a [`ChildPushdownResult`] containing the aggregated pushdown outcomes. The parent decides
30//!    how to handle filters that couldn't be pushed down (e.g., keep them as FilterExec nodes).
31//!
32//! [`FilterDescription`]: datafusion_physical_plan::filter_pushdown::FilterDescription
33
34use std::sync::Arc;
35
36use crate::PhysicalOptimizerRule;
37
38use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
39use datafusion_common::{config::ConfigOptions, internal_err, Result};
40use datafusion_physical_expr::PhysicalExpr;
41use datafusion_physical_expr_common::physical_expr::is_volatile;
42use datafusion_physical_plan::filter_pushdown::{
43    ChildFilterPushdownResult, ChildPushdownResult, FilterPushdownPhase,
44    FilterPushdownPropagation, PushedDown,
45};
46use datafusion_physical_plan::{with_new_children_if_necessary, ExecutionPlan};
47
48use itertools::{izip, Itertools};
49
50/// Attempts to recursively push given filters from the top of the tree into leaves.
51///
52/// # Default Implementation
53///
54/// The default implementation in [`ExecutionPlan::gather_filters_for_pushdown`]
55/// and [`ExecutionPlan::handle_child_pushdown_result`] assumes that:
56///
57/// * Parent filters can't be passed onto children (determined by [`ExecutionPlan::gather_filters_for_pushdown`])
58/// * This node has no filters to contribute (determined by [`ExecutionPlan::gather_filters_for_pushdown`]).
59/// * Any filters that could not be pushed down to the children are marked as unsupported (determined by [`ExecutionPlan::handle_child_pushdown_result`]).
60///
61/// # Example: Push filter into a `DataSourceExec`
62///
63/// For example, consider the following plan:
64///
65/// ```text
66/// ┌──────────────────────┐
67/// │ CoalesceBatchesExec  │
68/// └──────────────────────┘
69///             │
70///             ▼
71/// ┌──────────────────────┐
72/// │      FilterExec      │
73/// │  filters = [ id=1]   │
74/// └──────────────────────┘
75///             │
76///             ▼
77/// ┌──────────────────────┐
78/// │    DataSourceExec    │
79/// │    projection = *    │
80/// └──────────────────────┘
81/// ```
82///
83/// Our goal is to move the `id = 1` filter from the [`FilterExec`] node to the `DataSourceExec` node.
84///
85/// If this filter is selective pushing it into the scan can avoid massive
86/// amounts of data being read from the source (the projection is `*` so all
87/// matching columns are read).
88///
89/// The new plan looks like:
90///
91/// ```text
92/// ┌──────────────────────┐
93/// │ CoalesceBatchesExec  │
94/// └──────────────────────┘
95///           │
96///           ▼
97/// ┌──────────────────────┐
98/// │    DataSourceExec    │
99/// │    projection = *    │
100/// │   filters = [ id=1]  │
101/// └──────────────────────┘
102/// ```
103///
104/// # Example: Push filters with `ProjectionExec`
105///
106/// Let's consider a more complex example involving a [`ProjectionExec`]
107/// node in between the [`FilterExec`] and `DataSourceExec` nodes that
108/// creates a new column that the filter depends on.
109///
110/// ```text
111/// ┌──────────────────────┐
112/// │ CoalesceBatchesExec  │
113/// └──────────────────────┘
114///             │
115///             ▼
116/// ┌──────────────────────┐
117/// │      FilterExec      │
118/// │    filters =         │
119/// │     [cost>50,id=1]   │
120/// └──────────────────────┘
121///             │
122///             ▼
123/// ┌──────────────────────┐
124/// │    ProjectionExec    │
125/// │ cost = price * 1.2   │
126/// └──────────────────────┘
127///             │
128///             ▼
129/// ┌──────────────────────┐
130/// │    DataSourceExec    │
131/// │    projection = *    │
132/// └──────────────────────┘
133/// ```
134///
135/// We want to push down the filters `[id=1]` to the `DataSourceExec` node,
136/// but can't push down `cost>50` because it requires the [`ProjectionExec`]
137/// node to be executed first. A simple thing to do would be to split up the
138/// filter into two separate filters and push down the first one:
139///
140/// ```text
141/// ┌──────────────────────┐
142/// │ CoalesceBatchesExec  │
143/// └──────────────────────┘
144///             │
145///             ▼
146/// ┌──────────────────────┐
147/// │      FilterExec      │
148/// │    filters =         │
149/// │     [cost>50]        │
150/// └──────────────────────┘
151///             │
152///             ▼
153/// ┌──────────────────────┐
154/// │    ProjectionExec    │
155/// │ cost = price * 1.2   │
156/// └──────────────────────┘
157///             │
158///             ▼
159/// ┌──────────────────────┐
160/// │    DataSourceExec    │
161/// │    projection = *    │
162/// │   filters = [ id=1]  │
163/// └──────────────────────┘
164/// ```
165///
166/// We can actually however do better by pushing down `price * 1.2 > 50`
167/// instead of `cost > 50`:
168///
169/// ```text
170/// ┌──────────────────────┐
171/// │ CoalesceBatchesExec  │
172/// └──────────────────────┘
173///            │
174///            ▼
175/// ┌──────────────────────┐
176/// │    ProjectionExec    │
177/// │ cost = price * 1.2   │
178/// └──────────────────────┘
179///            │
180///            ▼
181/// ┌──────────────────────┐
182/// │    DataSourceExec    │
183/// │    projection = *    │
184/// │   filters = [id=1,   │
185/// │   price * 1.2 > 50]  │
186/// └──────────────────────┘
187/// ```
188///
189/// # Example: Push filters within a subtree
190///
191/// There are also cases where we may be able to push down filters within a
192/// subtree but not the entire tree. A good example of this is aggregation
193/// nodes:
194///
195/// ```text
196/// ┌──────────────────────┐
197/// │ ProjectionExec       │
198/// │ projection = *       │
199/// └──────────────────────┘
200///           │
201///           ▼
202/// ┌──────────────────────┐
203/// │ FilterExec           │
204/// │ filters = [sum > 10] │
205/// └──────────────────────┘
206///           │
207///           ▼
208/// ┌───────────────────────┐
209/// │     AggregateExec     │
210/// │    group by = [id]    │
211/// │    aggregate =        │
212/// │      [sum(price)]     │
213/// └───────────────────────┘
214///           │
215///           ▼
216/// ┌──────────────────────┐
217/// │ FilterExec           │
218/// │ filters = [id=1]     │
219/// └──────────────────────┘
220///          │
221///          ▼
222/// ┌──────────────────────┐
223/// │ DataSourceExec       │
224/// │ projection = *       │
225/// └──────────────────────┘
226/// ```
227///
228/// The transformation here is to push down the `id=1` filter to the
229/// `DataSourceExec` node:
230///
231/// ```text
232/// ┌──────────────────────┐
233/// │ ProjectionExec       │
234/// │ projection = *       │
235/// └──────────────────────┘
236///           │
237///           ▼
238/// ┌──────────────────────┐
239/// │ FilterExec           │
240/// │ filters = [sum > 10] │
241/// └──────────────────────┘
242///           │
243///           ▼
244/// ┌───────────────────────┐
245/// │     AggregateExec     │
246/// │    group by = [id]    │
247/// │    aggregate =        │
248/// │      [sum(price)]     │
249/// └───────────────────────┘
250///           │
251///           ▼
252/// ┌──────────────────────┐
253/// │ DataSourceExec       │
254/// │ projection = *       │
255/// │ filters = [id=1]     │
256/// └──────────────────────┘
257/// ```
258///
259/// The point here is that:
260/// 1. We cannot push down `sum > 10` through the [`AggregateExec`] node into the `DataSourceExec` node.
261///    Any filters above the [`AggregateExec`] node are not pushed down.
262///    This is determined by calling [`ExecutionPlan::gather_filters_for_pushdown`] on the [`AggregateExec`] node.
263/// 2. We need to keep recursing into the tree so that we can discover the other [`FilterExec`] node and push
264///    down the `id=1` filter.
265///
266/// # Example: Push filters through Joins
267///
268/// It is also possible to push down filters through joins and filters that
269/// originate from joins. For example, a hash join where we build a hash
270/// table of the left side and probe the right side (ignoring why we would
271/// choose this order, typically it depends on the size of each table,
272/// etc.).
273///
274/// ```text
275///              ┌─────────────────────┐
276///              │     FilterExec      │
277///              │ filters =           │
278///              │  [d.size > 100]     │
279///              └─────────────────────┘
280///                         │
281///                         │
282///              ┌──────────▼──────────┐
283///              │                     │
284///              │    HashJoinExec     │
285///              │ [u.dept@hash(d.id)] │
286///              │                     │
287///              └─────────────────────┘
288///                         │
289///            ┌────────────┴────────────┐
290/// ┌──────────▼──────────┐   ┌──────────▼──────────┐
291/// │   DataSourceExec    │   │   DataSourceExec    │
292/// │  alias [users as u] │   │  alias [dept as d]  │
293/// │                     │   │                     │
294/// └─────────────────────┘   └─────────────────────┘
295/// ```
296///
297/// There are two pushdowns we can do here:
298/// 1. Push down the `d.size > 100` filter through the `HashJoinExec` node to the `DataSourceExec`
299///    node for the `departments` table.
300/// 2. Push down the hash table state from the `HashJoinExec` node to the `DataSourceExec` node to avoid reading
301///    rows from the `users` table that will be eliminated by the join.
302///    This can be done via a bloom filter or similar and is not (yet) supported
303///    in DataFusion. See <https://github.com/apache/datafusion/issues/7955>.
304///
305/// ```text
306///              ┌─────────────────────┐
307///              │                     │
308///              │    HashJoinExec     │
309///              │ [u.dept@hash(d.id)] │
310///              │                     │
311///              └─────────────────────┘
312///                         │
313///            ┌────────────┴────────────┐
314/// ┌──────────▼──────────┐   ┌──────────▼──────────┐
315/// │   DataSourceExec    │   │   DataSourceExec    │
316/// │  alias [users as u] │   │  alias [dept as d]  │
317/// │ filters =           │   │  filters =          │
318/// │   [depg@hash(d.id)] │   │    [ d.size > 100]  │
319/// └─────────────────────┘   └─────────────────────┘
320/// ```
321///
322/// You may notice in this case that the filter is *dynamic*: the hash table
323/// is built _after_ the `departments` table is read and at runtime. We
324/// don't have a concrete `InList` filter or similar to push down at
325/// optimization time. These sorts of dynamic filters are handled by
326/// building a specialized [`PhysicalExpr`] that can be evaluated at runtime
327/// and internally maintains a reference to the hash table or other state.
328///
329/// To make working with these sorts of dynamic filters more tractable we have the method [`PhysicalExpr::snapshot`]
330/// which attempts to simplify a dynamic filter into a "basic" non-dynamic filter.
331/// For a join this could mean converting it to an `InList` filter or a min/max filter for example.
332/// See `datafusion/physical-plan/src/dynamic_filters.rs` for more details.
333///
334/// # Example: Push TopK filters into Scans
335///
336/// Another form of dynamic filter is pushing down the state of a `TopK`
337/// operator for queries like `SELECT * FROM t ORDER BY id LIMIT 10`:
338///
339/// ```text
340/// ┌──────────────────────┐
341/// │       TopK           │
342/// │     limit = 10       │
343/// │   order by = [id]    │
344/// └──────────────────────┘
345///            │
346///            ▼
347/// ┌──────────────────────┐
348/// │    DataSourceExec    │
349/// │    projection = *    │
350/// └──────────────────────┘
351/// ```
352///
353/// We can avoid large amounts of data processing by transforming this into:
354///
355/// ```text
356/// ┌──────────────────────┐
357/// │       TopK           │
358/// │     limit = 10       │
359/// │   order by = [id]    │
360/// └──────────────────────┘
361///            │
362///            ▼
363/// ┌──────────────────────┐
364/// │    DataSourceExec    │
365/// │    projection = *    │
366/// │ filters =            │
367/// │    [id < @ TopKHeap] │
368/// └──────────────────────┘
369/// ```
370///
371/// Now as we fill our `TopK` heap we can push down the state of the heap to
372/// the `DataSourceExec` node to avoid reading files / row groups / pages /
373/// rows that could not possibly be in the top 10.
374///
375/// This is not yet implemented in DataFusion. See
376/// <https://github.com/apache/datafusion/issues/15037>
377///
378/// [`PhysicalExpr`]: datafusion_physical_plan::PhysicalExpr
379/// [`PhysicalExpr::snapshot`]: datafusion_physical_plan::PhysicalExpr::snapshot
380/// [`FilterExec`]: datafusion_physical_plan::filter::FilterExec
381/// [`ProjectionExec`]: datafusion_physical_plan::projection::ProjectionExec
382/// [`AggregateExec`]: datafusion_physical_plan::aggregates::AggregateExec
383#[derive(Debug)]
384pub struct FilterPushdown {
385    phase: FilterPushdownPhase,
386    name: String,
387}
388
389impl FilterPushdown {
390    fn new_with_phase(phase: FilterPushdownPhase) -> Self {
391        let name = match phase {
392            FilterPushdownPhase::Pre => "FilterPushdown",
393            FilterPushdownPhase::Post => "FilterPushdown(Post)",
394        }
395        .to_string();
396        Self { phase, name }
397    }
398
399    /// Create a new [`FilterPushdown`] optimizer rule that runs in the pre-optimization phase.
400    /// See [`FilterPushdownPhase`] for more details.
401    pub fn new() -> Self {
402        Self::new_with_phase(FilterPushdownPhase::Pre)
403    }
404
405    /// Create a new [`FilterPushdown`] optimizer rule that runs in the post-optimization phase.
406    /// See [`FilterPushdownPhase`] for more details.
407    pub fn new_post_optimization() -> Self {
408        Self::new_with_phase(FilterPushdownPhase::Post)
409    }
410}
411
412impl Default for FilterPushdown {
413    fn default() -> Self {
414        Self::new()
415    }
416}
417
418impl PhysicalOptimizerRule for FilterPushdown {
419    fn optimize(
420        &self,
421        plan: Arc<dyn ExecutionPlan>,
422        config: &ConfigOptions,
423    ) -> Result<Arc<dyn ExecutionPlan>> {
424        Ok(
425            push_down_filters(Arc::clone(&plan), vec![], config, self.phase)?
426                .updated_node
427                .unwrap_or(plan),
428        )
429    }
430
431    fn name(&self) -> &str {
432        &self.name
433    }
434
435    fn schema_check(&self) -> bool {
436        true // Filter pushdown does not change the schema of the plan
437    }
438}
439
440fn push_down_filters(
441    node: Arc<dyn ExecutionPlan>,
442    parent_predicates: Vec<Arc<dyn PhysicalExpr>>,
443    config: &ConfigOptions,
444    phase: FilterPushdownPhase,
445) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
446    let mut parent_filter_pushdown_supports: Vec<Vec<PushedDown>> =
447        vec![vec![]; parent_predicates.len()];
448    let mut self_filters_pushdown_supports = vec![];
449    let mut new_children = Vec::with_capacity(node.children().len());
450
451    let children = node.children();
452
453    // Filter out expressions that are not allowed for pushdown
454    let parent_filtered = FilteredVec::new(&parent_predicates, allow_pushdown_for_expr);
455
456    let filter_description = node.gather_filters_for_pushdown(
457        phase,
458        parent_filtered.items().to_vec(),
459        config,
460    )?;
461
462    let filter_description_parent_filters = filter_description.parent_filters();
463    let filter_description_self_filters = filter_description.self_filters();
464    if filter_description_parent_filters.len() != children.len() {
465        return internal_err!(
466            "Filter pushdown expected FilterDescription to have parent filters for {}, but got {} for node {}",
467            children.len(),
468            filter_description_parent_filters.len(),
469            node.name()
470        );
471    }
472    if filter_description_self_filters.len() != children.len() {
473        return internal_err!(
474            "Filter pushdown expected FilterDescription to have self filters for {}, but got {} for node {}",
475            children.len(),
476            filter_description_self_filters.len(),
477            node.name()
478        );
479    }
480
481    for (child_idx, (child, parent_filters, self_filters)) in izip!(
482        children,
483        filter_description.parent_filters(),
484        filter_description.self_filters()
485    )
486    .enumerate()
487    {
488        // Here, `parent_filters` are the predicates which are provided by the parent node of
489        // the current node, and tried to be pushed down over the child which the loop points
490        // currently. `self_filters` are the predicates which are provided by the current node,
491        // and tried to be pushed down over the child similarly.
492
493        // Filter out self_filters that contain volatile expressions and track indices
494        let self_filtered = FilteredVec::new(&self_filters, allow_pushdown_for_expr);
495
496        let num_self_filters = self_filtered.len();
497        let mut all_predicates = self_filtered.items().to_vec();
498
499        // Apply second filter pass: collect indices of parent filters that can be pushed down
500        let parent_filters_for_child = parent_filtered
501            .chain_filter_slice(&parent_filters, |filter| {
502                matches!(filter.discriminant, PushedDown::Yes)
503            });
504
505        // Add the filtered parent predicates to all_predicates
506        for filter in parent_filters_for_child.items() {
507            all_predicates.push(Arc::clone(&filter.predicate));
508        }
509
510        let num_parent_filters = all_predicates.len() - num_self_filters;
511
512        // Any filters that could not be pushed down to a child are marked as not-supported to our parents
513        let result = push_down_filters(Arc::clone(child), all_predicates, config, phase)?;
514
515        if let Some(new_child) = result.updated_node {
516            // If we have a filter pushdown result, we need to update our children
517            new_children.push(new_child);
518        } else {
519            // If we don't have a filter pushdown result, we need to update our children
520            new_children.push(Arc::clone(child));
521        }
522
523        // Our child doesn't know the difference between filters that were passed down
524        // from our parents and filters that the current node injected. We need to de-entangle
525        // this since we do need to distinguish between them.
526        let mut all_filters = result.filters.into_iter().collect_vec();
527        if all_filters.len() != num_self_filters + num_parent_filters {
528            return internal_err!(
529                "Filter pushdown did not return the expected number of filters: expected {} self filters and {} parent filters, but got {}. Likely culprit is {}",
530                num_self_filters,
531                num_parent_filters,
532                all_filters.len(),
533                child.name()
534            );
535        }
536        let parent_filters = all_filters
537            .split_off(num_self_filters)
538            .into_iter()
539            .collect_vec();
540        // Map the results from filtered self filters back to their original positions using FilteredVec
541        let mapped_self_results =
542            self_filtered.map_results_to_original(all_filters, PushedDown::No);
543
544        // Wrap each result with its corresponding expression
545        let self_filter_results: Vec<_> = mapped_self_results
546            .into_iter()
547            .zip(self_filters)
548            .map(|(support, filter)| support.wrap_expression(filter))
549            .collect();
550
551        self_filters_pushdown_supports.push(self_filter_results);
552
553        // Start by marking all parent filters as unsupported for this child
554        for parent_filter_pushdown_support in parent_filter_pushdown_supports.iter_mut() {
555            parent_filter_pushdown_support.push(PushedDown::No);
556            assert_eq!(
557                parent_filter_pushdown_support.len(),
558                child_idx + 1,
559                "Parent filter pushdown supports should have the same length as the number of children"
560            );
561        }
562        // Map results from pushed-down filters back to original parent filter indices
563        let mapped_parent_results = parent_filters_for_child
564            .map_results_to_original(parent_filters, PushedDown::No);
565
566        // Update parent_filter_pushdown_supports with the mapped results
567        // mapped_parent_results already has the results at their original indices
568        for (idx, support) in parent_filter_pushdown_supports.iter_mut().enumerate() {
569            support[child_idx] = mapped_parent_results[idx];
570        }
571    }
572
573    // Re-create this node with new children
574    let updated_node = with_new_children_if_necessary(Arc::clone(&node), new_children)?;
575
576    // TODO: by calling `handle_child_pushdown_result` we are assuming that the
577    // `ExecutionPlan` implementation will not change the plan itself.
578    // Should we have a separate method for dynamic pushdown that does not allow modifying the plan?
579    let mut res = updated_node.handle_child_pushdown_result(
580        phase,
581        ChildPushdownResult {
582            parent_filters: parent_predicates
583                .into_iter()
584                .enumerate()
585                .map(
586                    |(parent_filter_idx, parent_filter)| ChildFilterPushdownResult {
587                        filter: parent_filter,
588                        child_results: parent_filter_pushdown_supports[parent_filter_idx]
589                            .clone(),
590                    },
591                )
592                .collect(),
593            self_filters: self_filters_pushdown_supports,
594        },
595        config,
596    )?;
597    // Compare pointers for new_node and node, if they are different we must replace
598    // ourselves because of changes in our children.
599    if res.updated_node.is_none() && !Arc::ptr_eq(&updated_node, &node) {
600        res.updated_node = Some(updated_node)
601    }
602    Ok(res)
603}
604
605/// A helper structure for filtering elements from a vector through multiple passes while
606/// tracking their original indices, allowing results to be mapped back to the original positions.
607struct FilteredVec<T> {
608    items: Vec<T>,
609    // Chain of index mappings: each Vec maps from current level to previous level
610    // index_mappings[0] maps from first filter to original indices
611    // index_mappings[1] maps from second filter to first filter indices, etc.
612    index_mappings: Vec<Vec<usize>>,
613    original_len: usize,
614}
615
616impl<T: Clone> FilteredVec<T> {
617    /// Creates a new FilteredVec by filtering items based on the given predicate
618    fn new<F>(items: &[T], predicate: F) -> Self
619    where
620        F: Fn(&T) -> bool,
621    {
622        let mut filtered_items = Vec::new();
623        let mut original_indices = Vec::new();
624
625        for (idx, item) in items.iter().enumerate() {
626            if predicate(item) {
627                filtered_items.push(item.clone());
628                original_indices.push(idx);
629            }
630        }
631
632        Self {
633            items: filtered_items,
634            index_mappings: vec![original_indices],
635            original_len: items.len(),
636        }
637    }
638
639    /// Returns a reference to the filtered items
640    fn items(&self) -> &[T] {
641        &self.items
642    }
643
644    /// Returns the number of filtered items
645    fn len(&self) -> usize {
646        self.items.len()
647    }
648
649    /// Maps results from the filtered items back to their original positions
650    /// Returns a vector with the same length as the original input, filled with default_value
651    /// and updated with results at their original positions
652    fn map_results_to_original<R: Clone>(
653        &self,
654        results: Vec<R>,
655        default_value: R,
656    ) -> Vec<R> {
657        let mut mapped_results = vec![default_value; self.original_len];
658
659        for (result_idx, result) in results.into_iter().enumerate() {
660            let original_idx = self.trace_to_original_index(result_idx);
661            mapped_results[original_idx] = result;
662        }
663
664        mapped_results
665    }
666
667    /// Traces a filtered index back to its original index through all filter passes
668    fn trace_to_original_index(&self, mut current_idx: usize) -> usize {
669        // Work backwards through the chain of index mappings
670        for mapping in self.index_mappings.iter().rev() {
671            current_idx = mapping[current_idx];
672        }
673        current_idx
674    }
675
676    /// Apply a filter to a new set of items while chaining the index mapping from self (parent)
677    /// This is useful when you have filtered items and then get a transformed slice
678    /// (e.g., from gather_filters_for_pushdown) that you need to filter again
679    fn chain_filter_slice<U: Clone, F>(&self, items: &[U], predicate: F) -> FilteredVec<U>
680    where
681        F: Fn(&U) -> bool,
682    {
683        let mut filtered_items = Vec::new();
684        let mut filtered_indices = Vec::new();
685
686        for (idx, item) in items.iter().enumerate() {
687            if predicate(item) {
688                filtered_items.push(item.clone());
689                filtered_indices.push(idx);
690            }
691        }
692
693        // Chain the index mappings from parent (self)
694        let mut index_mappings = self.index_mappings.clone();
695        index_mappings.push(filtered_indices);
696
697        FilteredVec {
698            items: filtered_items,
699            index_mappings,
700            original_len: self.original_len,
701        }
702    }
703}
704
705fn allow_pushdown_for_expr(expr: &Arc<dyn PhysicalExpr>) -> bool {
706    let mut allow_pushdown = true;
707    expr.apply(|e| {
708        allow_pushdown = allow_pushdown && !is_volatile(e);
709        if allow_pushdown {
710            Ok(TreeNodeRecursion::Continue)
711        } else {
712            Ok(TreeNodeRecursion::Stop)
713        }
714    })
715    .expect("Infallible traversal of PhysicalExpr tree failed");
716    allow_pushdown
717}
718
719#[cfg(test)]
720mod tests {
721    use super::*;
722
723    #[test]
724    fn test_filtered_vec_single_pass() {
725        let items = vec![1, 2, 3, 4, 5, 6];
726        let filtered = FilteredVec::new(&items, |&x| x % 2 == 0);
727
728        // Check filtered items
729        assert_eq!(filtered.items(), &[2, 4, 6]);
730        assert_eq!(filtered.len(), 3);
731
732        // Check index mapping
733        let results = vec!["a", "b", "c"];
734        let mapped = filtered.map_results_to_original(results, "default");
735        assert_eq!(mapped, vec!["default", "a", "default", "b", "default", "c"]);
736    }
737
738    #[test]
739    fn test_filtered_vec_empty_filter() {
740        let items = vec![1, 3, 5];
741        let filtered = FilteredVec::new(&items, |&x| x % 2 == 0);
742
743        assert_eq!(filtered.items(), &[] as &[i32]);
744        assert_eq!(filtered.len(), 0);
745
746        let results: Vec<&str> = vec![];
747        let mapped = filtered.map_results_to_original(results, "default");
748        assert_eq!(mapped, vec!["default", "default", "default"]);
749    }
750
751    #[test]
752    fn test_filtered_vec_all_pass() {
753        let items = vec![2, 4, 6];
754        let filtered = FilteredVec::new(&items, |&x| x % 2 == 0);
755
756        assert_eq!(filtered.items(), &[2, 4, 6]);
757        assert_eq!(filtered.len(), 3);
758
759        let results = vec!["a", "b", "c"];
760        let mapped = filtered.map_results_to_original(results, "default");
761        assert_eq!(mapped, vec!["a", "b", "c"]);
762    }
763
764    #[test]
765    fn test_chain_filter_slice_different_types() {
766        // First pass: filter numbers
767        let numbers = vec![1, 2, 3, 4, 5, 6];
768        let first_pass = FilteredVec::new(&numbers, |&x| x > 3);
769        assert_eq!(first_pass.items(), &[4, 5, 6]);
770
771        // Transform to strings (simulating gather_filters_for_pushdown transformation)
772        let strings = vec!["four", "five", "six"];
773
774        // Second pass: filter strings that contain 'i'
775        let second_pass = first_pass.chain_filter_slice(&strings, |s| s.contains('i'));
776        assert_eq!(second_pass.items(), &["five", "six"]);
777
778        // Map results back to original indices
779        let results = vec![100, 200];
780        let mapped = second_pass.map_results_to_original(results, 0);
781        // "five" was at index 4 (1-based: 5), "six" was at index 5 (1-based: 6)
782        assert_eq!(mapped, vec![0, 0, 0, 0, 100, 200]);
783    }
784
785    #[test]
786    fn test_chain_filter_slice_complex_scenario() {
787        // Simulating the filter pushdown scenario
788        // Parent predicates: [A, B, C, D, E]
789        let parent_predicates = vec!["A", "B", "C", "D", "E"];
790
791        // First pass: filter out some predicates (simulating allow_pushdown_for_expr)
792        let first_pass = FilteredVec::new(&parent_predicates, |s| *s != "B" && *s != "D");
793        assert_eq!(first_pass.items(), &["A", "C", "E"]);
794
795        // After gather_filters_for_pushdown, we get transformed results for a specific child
796        // Let's say child gets [A_transformed, C_transformed, E_transformed]
797        // but only C and E can be pushed down
798        #[derive(Clone, Debug, PartialEq)]
799        struct TransformedPredicate {
800            name: String,
801            can_push: bool,
802        }
803
804        let child_predicates = vec![
805            TransformedPredicate {
806                name: "A_transformed".to_string(),
807                can_push: false,
808            },
809            TransformedPredicate {
810                name: "C_transformed".to_string(),
811                can_push: true,
812            },
813            TransformedPredicate {
814                name: "E_transformed".to_string(),
815                can_push: true,
816            },
817        ];
818
819        // Second pass: filter based on can_push
820        let second_pass =
821            first_pass.chain_filter_slice(&child_predicates, |p| p.can_push);
822        assert_eq!(second_pass.len(), 2);
823        assert_eq!(second_pass.items()[0].name, "C_transformed");
824        assert_eq!(second_pass.items()[1].name, "E_transformed");
825
826        // Simulate getting results back from child
827        let child_results = vec!["C_result", "E_result"];
828        let mapped = second_pass.map_results_to_original(child_results, "no_result");
829
830        // Results should be at original positions: C was at index 2, E was at index 4
831        assert_eq!(
832            mapped,
833            vec![
834                "no_result",
835                "no_result",
836                "C_result",
837                "no_result",
838                "E_result"
839            ]
840        );
841    }
842
843    #[test]
844    fn test_trace_to_original_index() {
845        let items = vec![10, 20, 30, 40, 50];
846        let filtered = FilteredVec::new(&items, |&x| x != 20 && x != 40);
847
848        // filtered items are [10, 30, 50] at original indices [0, 2, 4]
849        assert_eq!(filtered.trace_to_original_index(0), 0); // 10 was at index 0
850        assert_eq!(filtered.trace_to_original_index(1), 2); // 30 was at index 2
851        assert_eq!(filtered.trace_to_original_index(2), 4); // 50 was at index 4
852    }
853
854    #[test]
855    fn test_chain_filter_preserves_original_len() {
856        let items = vec![1, 2, 3, 4, 5];
857        let first = FilteredVec::new(&items, |&x| x > 2);
858
859        let strings = vec!["three", "four", "five"];
860        let second = first.chain_filter_slice(&strings, |s| s.len() == 4);
861
862        // Original length should still be 5
863        let results = vec!["x", "y"];
864        let mapped = second.map_results_to_original(results, "-");
865        assert_eq!(mapped.len(), 5);
866    }
867}