datafusion_physical_optimizer/filter_pushdown.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Filter Pushdown Optimization Process
19//!
20//! The filter pushdown mechanism involves four key steps:
21//! 1. **Optimizer Asks Parent for a Filter Pushdown Plan**: The optimizer calls [`ExecutionPlan::gather_filters_for_pushdown`]
22//! on the parent node, passing in parent predicates and phase. The parent node creates a [`FilterDescription`]
23//! by inspecting its logic and children's schemas, determining which filters can be pushed to each child.
24//! 2. **Optimizer Executes Pushdown**: The optimizer recursively calls `push_down_filters` in this module on each child,
25//! passing the appropriate filters (`Vec<Arc<dyn PhysicalExpr>>`) for that child.
26//! 3. **Optimizer Gathers Results**: The optimizer collects [`FilterPushdownPropagation`] results from children,
27//! containing information about which filters were successfully pushed down vs. unsupported.
28//! 4. **Parent Responds**: The optimizer calls [`ExecutionPlan::handle_child_pushdown_result`] on the parent,
29//! passing a [`ChildPushdownResult`] containing the aggregated pushdown outcomes. The parent decides
30//! how to handle filters that couldn't be pushed down (e.g., keep them as FilterExec nodes).
31//!
32//! [`FilterDescription`]: datafusion_physical_plan::filter_pushdown::FilterDescription
33
34use std::sync::Arc;
35
36use crate::PhysicalOptimizerRule;
37
38use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
39use datafusion_common::{config::ConfigOptions, internal_err, Result};
40use datafusion_physical_expr::PhysicalExpr;
41use datafusion_physical_expr_common::physical_expr::is_volatile;
42use datafusion_physical_plan::filter_pushdown::{
43 ChildFilterPushdownResult, ChildPushdownResult, FilterPushdownPhase,
44 FilterPushdownPropagation, PushedDown,
45};
46use datafusion_physical_plan::{with_new_children_if_necessary, ExecutionPlan};
47
48use itertools::{izip, Itertools};
49
50/// Attempts to recursively push given filters from the top of the tree into leaves.
51///
52/// # Default Implementation
53///
54/// The default implementation in [`ExecutionPlan::gather_filters_for_pushdown`]
55/// and [`ExecutionPlan::handle_child_pushdown_result`] assumes that:
56///
57/// * Parent filters can't be passed onto children (determined by [`ExecutionPlan::gather_filters_for_pushdown`])
58/// * This node has no filters to contribute (determined by [`ExecutionPlan::gather_filters_for_pushdown`]).
59/// * Any filters that could not be pushed down to the children are marked as unsupported (determined by [`ExecutionPlan::handle_child_pushdown_result`]).
60///
61/// # Example: Push filter into a `DataSourceExec`
62///
63/// For example, consider the following plan:
64///
65/// ```text
66/// ┌──────────────────────┐
67/// │ CoalesceBatchesExec │
68/// └──────────────────────┘
69/// │
70/// ▼
71/// ┌──────────────────────┐
72/// │ FilterExec │
73/// │ filters = [ id=1] │
74/// └──────────────────────┘
75/// │
76/// ▼
77/// ┌──────────────────────┐
78/// │ DataSourceExec │
79/// │ projection = * │
80/// └──────────────────────┘
81/// ```
82///
83/// Our goal is to move the `id = 1` filter from the [`FilterExec`] node to the `DataSourceExec` node.
84///
85/// If this filter is selective pushing it into the scan can avoid massive
86/// amounts of data being read from the source (the projection is `*` so all
87/// matching columns are read).
88///
89/// The new plan looks like:
90///
91/// ```text
92/// ┌──────────────────────┐
93/// │ CoalesceBatchesExec │
94/// └──────────────────────┘
95/// │
96/// ▼
97/// ┌──────────────────────┐
98/// │ DataSourceExec │
99/// │ projection = * │
100/// │ filters = [ id=1] │
101/// └──────────────────────┘
102/// ```
103///
104/// # Example: Push filters with `ProjectionExec`
105///
106/// Let's consider a more complex example involving a [`ProjectionExec`]
107/// node in between the [`FilterExec`] and `DataSourceExec` nodes that
108/// creates a new column that the filter depends on.
109///
110/// ```text
111/// ┌──────────────────────┐
112/// │ CoalesceBatchesExec │
113/// └──────────────────────┘
114/// │
115/// ▼
116/// ┌──────────────────────┐
117/// │ FilterExec │
118/// │ filters = │
119/// │ [cost>50,id=1] │
120/// └──────────────────────┘
121/// │
122/// ▼
123/// ┌──────────────────────┐
124/// │ ProjectionExec │
125/// │ cost = price * 1.2 │
126/// └──────────────────────┘
127/// │
128/// ▼
129/// ┌──────────────────────┐
130/// │ DataSourceExec │
131/// │ projection = * │
132/// └──────────────────────┘
133/// ```
134///
135/// We want to push down the filters `[id=1]` to the `DataSourceExec` node,
136/// but can't push down `cost>50` because it requires the [`ProjectionExec`]
137/// node to be executed first. A simple thing to do would be to split up the
138/// filter into two separate filters and push down the first one:
139///
140/// ```text
141/// ┌──────────────────────┐
142/// │ CoalesceBatchesExec │
143/// └──────────────────────┘
144/// │
145/// ▼
146/// ┌──────────────────────┐
147/// │ FilterExec │
148/// │ filters = │
149/// │ [cost>50] │
150/// └──────────────────────┘
151/// │
152/// ▼
153/// ┌──────────────────────┐
154/// │ ProjectionExec │
155/// │ cost = price * 1.2 │
156/// └──────────────────────┘
157/// │
158/// ▼
159/// ┌──────────────────────┐
160/// │ DataSourceExec │
161/// │ projection = * │
162/// │ filters = [ id=1] │
163/// └──────────────────────┘
164/// ```
165///
166/// We can actually however do better by pushing down `price * 1.2 > 50`
167/// instead of `cost > 50`:
168///
169/// ```text
170/// ┌──────────────────────┐
171/// │ CoalesceBatchesExec │
172/// └──────────────────────┘
173/// │
174/// ▼
175/// ┌──────────────────────┐
176/// │ ProjectionExec │
177/// │ cost = price * 1.2 │
178/// └──────────────────────┘
179/// │
180/// ▼
181/// ┌──────────────────────┐
182/// │ DataSourceExec │
183/// │ projection = * │
184/// │ filters = [id=1, │
185/// │ price * 1.2 > 50] │
186/// └──────────────────────┘
187/// ```
188///
189/// # Example: Push filters within a subtree
190///
191/// There are also cases where we may be able to push down filters within a
192/// subtree but not the entire tree. A good example of this is aggregation
193/// nodes:
194///
195/// ```text
196/// ┌──────────────────────┐
197/// │ ProjectionExec │
198/// │ projection = * │
199/// └──────────────────────┘
200/// │
201/// ▼
202/// ┌──────────────────────┐
203/// │ FilterExec │
204/// │ filters = [sum > 10] │
205/// └──────────────────────┘
206/// │
207/// ▼
208/// ┌───────────────────────┐
209/// │ AggregateExec │
210/// │ group by = [id] │
211/// │ aggregate = │
212/// │ [sum(price)] │
213/// └───────────────────────┘
214/// │
215/// ▼
216/// ┌──────────────────────┐
217/// │ FilterExec │
218/// │ filters = [id=1] │
219/// └──────────────────────┘
220/// │
221/// ▼
222/// ┌──────────────────────┐
223/// │ DataSourceExec │
224/// │ projection = * │
225/// └──────────────────────┘
226/// ```
227///
228/// The transformation here is to push down the `id=1` filter to the
229/// `DataSourceExec` node:
230///
231/// ```text
232/// ┌──────────────────────┐
233/// │ ProjectionExec │
234/// │ projection = * │
235/// └──────────────────────┘
236/// │
237/// ▼
238/// ┌──────────────────────┐
239/// │ FilterExec │
240/// │ filters = [sum > 10] │
241/// └──────────────────────┘
242/// │
243/// ▼
244/// ┌───────────────────────┐
245/// │ AggregateExec │
246/// │ group by = [id] │
247/// │ aggregate = │
248/// │ [sum(price)] │
249/// └───────────────────────┘
250/// │
251/// ▼
252/// ┌──────────────────────┐
253/// │ DataSourceExec │
254/// │ projection = * │
255/// │ filters = [id=1] │
256/// └──────────────────────┘
257/// ```
258///
259/// The point here is that:
260/// 1. We cannot push down `sum > 10` through the [`AggregateExec`] node into the `DataSourceExec` node.
261/// Any filters above the [`AggregateExec`] node are not pushed down.
262/// This is determined by calling [`ExecutionPlan::gather_filters_for_pushdown`] on the [`AggregateExec`] node.
263/// 2. We need to keep recursing into the tree so that we can discover the other [`FilterExec`] node and push
264/// down the `id=1` filter.
265///
266/// # Example: Push filters through Joins
267///
268/// It is also possible to push down filters through joins and filters that
269/// originate from joins. For example, a hash join where we build a hash
270/// table of the left side and probe the right side (ignoring why we would
271/// choose this order, typically it depends on the size of each table,
272/// etc.).
273///
274/// ```text
275/// ┌─────────────────────┐
276/// │ FilterExec │
277/// │ filters = │
278/// │ [d.size > 100] │
279/// └─────────────────────┘
280/// │
281/// │
282/// ┌──────────▼──────────┐
283/// │ │
284/// │ HashJoinExec │
285/// │ [u.dept@hash(d.id)] │
286/// │ │
287/// └─────────────────────┘
288/// │
289/// ┌────────────┴────────────┐
290/// ┌──────────▼──────────┐ ┌──────────▼──────────┐
291/// │ DataSourceExec │ │ DataSourceExec │
292/// │ alias [users as u] │ │ alias [dept as d] │
293/// │ │ │ │
294/// └─────────────────────┘ └─────────────────────┘
295/// ```
296///
297/// There are two pushdowns we can do here:
298/// 1. Push down the `d.size > 100` filter through the `HashJoinExec` node to the `DataSourceExec`
299/// node for the `departments` table.
300/// 2. Push down the hash table state from the `HashJoinExec` node to the `DataSourceExec` node to avoid reading
301/// rows from the `users` table that will be eliminated by the join.
302/// This can be done via a bloom filter or similar and is not (yet) supported
303/// in DataFusion. See <https://github.com/apache/datafusion/issues/7955>.
304///
305/// ```text
306/// ┌─────────────────────┐
307/// │ │
308/// │ HashJoinExec │
309/// │ [u.dept@hash(d.id)] │
310/// │ │
311/// └─────────────────────┘
312/// │
313/// ┌────────────┴────────────┐
314/// ┌──────────▼──────────┐ ┌──────────▼──────────┐
315/// │ DataSourceExec │ │ DataSourceExec │
316/// │ alias [users as u] │ │ alias [dept as d] │
317/// │ filters = │ │ filters = │
318/// │ [depg@hash(d.id)] │ │ [ d.size > 100] │
319/// └─────────────────────┘ └─────────────────────┘
320/// ```
321///
322/// You may notice in this case that the filter is *dynamic*: the hash table
323/// is built _after_ the `departments` table is read and at runtime. We
324/// don't have a concrete `InList` filter or similar to push down at
325/// optimization time. These sorts of dynamic filters are handled by
326/// building a specialized [`PhysicalExpr`] that can be evaluated at runtime
327/// and internally maintains a reference to the hash table or other state.
328///
329/// To make working with these sorts of dynamic filters more tractable we have the method [`PhysicalExpr::snapshot`]
330/// which attempts to simplify a dynamic filter into a "basic" non-dynamic filter.
331/// For a join this could mean converting it to an `InList` filter or a min/max filter for example.
332/// See `datafusion/physical-plan/src/dynamic_filters.rs` for more details.
333///
334/// # Example: Push TopK filters into Scans
335///
336/// Another form of dynamic filter is pushing down the state of a `TopK`
337/// operator for queries like `SELECT * FROM t ORDER BY id LIMIT 10`:
338///
339/// ```text
340/// ┌──────────────────────┐
341/// │ TopK │
342/// │ limit = 10 │
343/// │ order by = [id] │
344/// └──────────────────────┘
345/// │
346/// ▼
347/// ┌──────────────────────┐
348/// │ DataSourceExec │
349/// │ projection = * │
350/// └──────────────────────┘
351/// ```
352///
353/// We can avoid large amounts of data processing by transforming this into:
354///
355/// ```text
356/// ┌──────────────────────┐
357/// │ TopK │
358/// │ limit = 10 │
359/// │ order by = [id] │
360/// └──────────────────────┘
361/// │
362/// ▼
363/// ┌──────────────────────┐
364/// │ DataSourceExec │
365/// │ projection = * │
366/// │ filters = │
367/// │ [id < @ TopKHeap] │
368/// └──────────────────────┘
369/// ```
370///
371/// Now as we fill our `TopK` heap we can push down the state of the heap to
372/// the `DataSourceExec` node to avoid reading files / row groups / pages /
373/// rows that could not possibly be in the top 10.
374///
375/// This is not yet implemented in DataFusion. See
376/// <https://github.com/apache/datafusion/issues/15037>
377///
378/// [`PhysicalExpr`]: datafusion_physical_plan::PhysicalExpr
379/// [`PhysicalExpr::snapshot`]: datafusion_physical_plan::PhysicalExpr::snapshot
380/// [`FilterExec`]: datafusion_physical_plan::filter::FilterExec
381/// [`ProjectionExec`]: datafusion_physical_plan::projection::ProjectionExec
382/// [`AggregateExec`]: datafusion_physical_plan::aggregates::AggregateExec
383#[derive(Debug)]
384pub struct FilterPushdown {
385 phase: FilterPushdownPhase,
386 name: String,
387}
388
389impl FilterPushdown {
390 fn new_with_phase(phase: FilterPushdownPhase) -> Self {
391 let name = match phase {
392 FilterPushdownPhase::Pre => "FilterPushdown",
393 FilterPushdownPhase::Post => "FilterPushdown(Post)",
394 }
395 .to_string();
396 Self { phase, name }
397 }
398
399 /// Create a new [`FilterPushdown`] optimizer rule that runs in the pre-optimization phase.
400 /// See [`FilterPushdownPhase`] for more details.
401 pub fn new() -> Self {
402 Self::new_with_phase(FilterPushdownPhase::Pre)
403 }
404
405 /// Create a new [`FilterPushdown`] optimizer rule that runs in the post-optimization phase.
406 /// See [`FilterPushdownPhase`] for more details.
407 pub fn new_post_optimization() -> Self {
408 Self::new_with_phase(FilterPushdownPhase::Post)
409 }
410}
411
412impl Default for FilterPushdown {
413 fn default() -> Self {
414 Self::new()
415 }
416}
417
418impl PhysicalOptimizerRule for FilterPushdown {
419 fn optimize(
420 &self,
421 plan: Arc<dyn ExecutionPlan>,
422 config: &ConfigOptions,
423 ) -> Result<Arc<dyn ExecutionPlan>> {
424 Ok(
425 push_down_filters(Arc::clone(&plan), vec![], config, self.phase)?
426 .updated_node
427 .unwrap_or(plan),
428 )
429 }
430
431 fn name(&self) -> &str {
432 &self.name
433 }
434
435 fn schema_check(&self) -> bool {
436 true // Filter pushdown does not change the schema of the plan
437 }
438}
439
440fn push_down_filters(
441 node: Arc<dyn ExecutionPlan>,
442 parent_predicates: Vec<Arc<dyn PhysicalExpr>>,
443 config: &ConfigOptions,
444 phase: FilterPushdownPhase,
445) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
446 let mut parent_filter_pushdown_supports: Vec<Vec<PushedDown>> =
447 vec![vec![]; parent_predicates.len()];
448 let mut self_filters_pushdown_supports = vec![];
449 let mut new_children = Vec::with_capacity(node.children().len());
450
451 let children = node.children();
452
453 // Filter out expressions that are not allowed for pushdown
454 let parent_filtered = FilteredVec::new(&parent_predicates, allow_pushdown_for_expr);
455
456 let filter_description = node.gather_filters_for_pushdown(
457 phase,
458 parent_filtered.items().to_vec(),
459 config,
460 )?;
461
462 let filter_description_parent_filters = filter_description.parent_filters();
463 let filter_description_self_filters = filter_description.self_filters();
464 if filter_description_parent_filters.len() != children.len() {
465 return internal_err!(
466 "Filter pushdown expected FilterDescription to have parent filters for {}, but got {} for node {}",
467 children.len(),
468 filter_description_parent_filters.len(),
469 node.name()
470 );
471 }
472 if filter_description_self_filters.len() != children.len() {
473 return internal_err!(
474 "Filter pushdown expected FilterDescription to have self filters for {}, but got {} for node {}",
475 children.len(),
476 filter_description_self_filters.len(),
477 node.name()
478 );
479 }
480
481 for (child_idx, (child, parent_filters, self_filters)) in izip!(
482 children,
483 filter_description.parent_filters(),
484 filter_description.self_filters()
485 )
486 .enumerate()
487 {
488 // Here, `parent_filters` are the predicates which are provided by the parent node of
489 // the current node, and tried to be pushed down over the child which the loop points
490 // currently. `self_filters` are the predicates which are provided by the current node,
491 // and tried to be pushed down over the child similarly.
492
493 // Filter out self_filters that contain volatile expressions and track indices
494 let self_filtered = FilteredVec::new(&self_filters, allow_pushdown_for_expr);
495
496 let num_self_filters = self_filtered.len();
497 let mut all_predicates = self_filtered.items().to_vec();
498
499 // Apply second filter pass: collect indices of parent filters that can be pushed down
500 let parent_filters_for_child = parent_filtered
501 .chain_filter_slice(&parent_filters, |filter| {
502 matches!(filter.discriminant, PushedDown::Yes)
503 });
504
505 // Add the filtered parent predicates to all_predicates
506 for filter in parent_filters_for_child.items() {
507 all_predicates.push(Arc::clone(&filter.predicate));
508 }
509
510 let num_parent_filters = all_predicates.len() - num_self_filters;
511
512 // Any filters that could not be pushed down to a child are marked as not-supported to our parents
513 let result = push_down_filters(Arc::clone(child), all_predicates, config, phase)?;
514
515 if let Some(new_child) = result.updated_node {
516 // If we have a filter pushdown result, we need to update our children
517 new_children.push(new_child);
518 } else {
519 // If we don't have a filter pushdown result, we need to update our children
520 new_children.push(Arc::clone(child));
521 }
522
523 // Our child doesn't know the difference between filters that were passed down
524 // from our parents and filters that the current node injected. We need to de-entangle
525 // this since we do need to distinguish between them.
526 let mut all_filters = result.filters.into_iter().collect_vec();
527 if all_filters.len() != num_self_filters + num_parent_filters {
528 return internal_err!(
529 "Filter pushdown did not return the expected number of filters: expected {} self filters and {} parent filters, but got {}. Likely culprit is {}",
530 num_self_filters,
531 num_parent_filters,
532 all_filters.len(),
533 child.name()
534 );
535 }
536 let parent_filters = all_filters
537 .split_off(num_self_filters)
538 .into_iter()
539 .collect_vec();
540 // Map the results from filtered self filters back to their original positions using FilteredVec
541 let mapped_self_results =
542 self_filtered.map_results_to_original(all_filters, PushedDown::No);
543
544 // Wrap each result with its corresponding expression
545 let self_filter_results: Vec<_> = mapped_self_results
546 .into_iter()
547 .zip(self_filters)
548 .map(|(support, filter)| support.wrap_expression(filter))
549 .collect();
550
551 self_filters_pushdown_supports.push(self_filter_results);
552
553 // Start by marking all parent filters as unsupported for this child
554 for parent_filter_pushdown_support in parent_filter_pushdown_supports.iter_mut() {
555 parent_filter_pushdown_support.push(PushedDown::No);
556 assert_eq!(
557 parent_filter_pushdown_support.len(),
558 child_idx + 1,
559 "Parent filter pushdown supports should have the same length as the number of children"
560 );
561 }
562 // Map results from pushed-down filters back to original parent filter indices
563 let mapped_parent_results = parent_filters_for_child
564 .map_results_to_original(parent_filters, PushedDown::No);
565
566 // Update parent_filter_pushdown_supports with the mapped results
567 // mapped_parent_results already has the results at their original indices
568 for (idx, support) in parent_filter_pushdown_supports.iter_mut().enumerate() {
569 support[child_idx] = mapped_parent_results[idx];
570 }
571 }
572
573 // Re-create this node with new children
574 let updated_node = with_new_children_if_necessary(Arc::clone(&node), new_children)?;
575
576 // TODO: by calling `handle_child_pushdown_result` we are assuming that the
577 // `ExecutionPlan` implementation will not change the plan itself.
578 // Should we have a separate method for dynamic pushdown that does not allow modifying the plan?
579 let mut res = updated_node.handle_child_pushdown_result(
580 phase,
581 ChildPushdownResult {
582 parent_filters: parent_predicates
583 .into_iter()
584 .enumerate()
585 .map(
586 |(parent_filter_idx, parent_filter)| ChildFilterPushdownResult {
587 filter: parent_filter,
588 child_results: parent_filter_pushdown_supports[parent_filter_idx]
589 .clone(),
590 },
591 )
592 .collect(),
593 self_filters: self_filters_pushdown_supports,
594 },
595 config,
596 )?;
597 // Compare pointers for new_node and node, if they are different we must replace
598 // ourselves because of changes in our children.
599 if res.updated_node.is_none() && !Arc::ptr_eq(&updated_node, &node) {
600 res.updated_node = Some(updated_node)
601 }
602 Ok(res)
603}
604
605/// A helper structure for filtering elements from a vector through multiple passes while
606/// tracking their original indices, allowing results to be mapped back to the original positions.
607struct FilteredVec<T> {
608 items: Vec<T>,
609 // Chain of index mappings: each Vec maps from current level to previous level
610 // index_mappings[0] maps from first filter to original indices
611 // index_mappings[1] maps from second filter to first filter indices, etc.
612 index_mappings: Vec<Vec<usize>>,
613 original_len: usize,
614}
615
616impl<T: Clone> FilteredVec<T> {
617 /// Creates a new FilteredVec by filtering items based on the given predicate
618 fn new<F>(items: &[T], predicate: F) -> Self
619 where
620 F: Fn(&T) -> bool,
621 {
622 let mut filtered_items = Vec::new();
623 let mut original_indices = Vec::new();
624
625 for (idx, item) in items.iter().enumerate() {
626 if predicate(item) {
627 filtered_items.push(item.clone());
628 original_indices.push(idx);
629 }
630 }
631
632 Self {
633 items: filtered_items,
634 index_mappings: vec![original_indices],
635 original_len: items.len(),
636 }
637 }
638
639 /// Returns a reference to the filtered items
640 fn items(&self) -> &[T] {
641 &self.items
642 }
643
644 /// Returns the number of filtered items
645 fn len(&self) -> usize {
646 self.items.len()
647 }
648
649 /// Maps results from the filtered items back to their original positions
650 /// Returns a vector with the same length as the original input, filled with default_value
651 /// and updated with results at their original positions
652 fn map_results_to_original<R: Clone>(
653 &self,
654 results: Vec<R>,
655 default_value: R,
656 ) -> Vec<R> {
657 let mut mapped_results = vec![default_value; self.original_len];
658
659 for (result_idx, result) in results.into_iter().enumerate() {
660 let original_idx = self.trace_to_original_index(result_idx);
661 mapped_results[original_idx] = result;
662 }
663
664 mapped_results
665 }
666
667 /// Traces a filtered index back to its original index through all filter passes
668 fn trace_to_original_index(&self, mut current_idx: usize) -> usize {
669 // Work backwards through the chain of index mappings
670 for mapping in self.index_mappings.iter().rev() {
671 current_idx = mapping[current_idx];
672 }
673 current_idx
674 }
675
676 /// Apply a filter to a new set of items while chaining the index mapping from self (parent)
677 /// This is useful when you have filtered items and then get a transformed slice
678 /// (e.g., from gather_filters_for_pushdown) that you need to filter again
679 fn chain_filter_slice<U: Clone, F>(&self, items: &[U], predicate: F) -> FilteredVec<U>
680 where
681 F: Fn(&U) -> bool,
682 {
683 let mut filtered_items = Vec::new();
684 let mut filtered_indices = Vec::new();
685
686 for (idx, item) in items.iter().enumerate() {
687 if predicate(item) {
688 filtered_items.push(item.clone());
689 filtered_indices.push(idx);
690 }
691 }
692
693 // Chain the index mappings from parent (self)
694 let mut index_mappings = self.index_mappings.clone();
695 index_mappings.push(filtered_indices);
696
697 FilteredVec {
698 items: filtered_items,
699 index_mappings,
700 original_len: self.original_len,
701 }
702 }
703}
704
705fn allow_pushdown_for_expr(expr: &Arc<dyn PhysicalExpr>) -> bool {
706 let mut allow_pushdown = true;
707 expr.apply(|e| {
708 allow_pushdown = allow_pushdown && !is_volatile(e);
709 if allow_pushdown {
710 Ok(TreeNodeRecursion::Continue)
711 } else {
712 Ok(TreeNodeRecursion::Stop)
713 }
714 })
715 .expect("Infallible traversal of PhysicalExpr tree failed");
716 allow_pushdown
717}
718
719#[cfg(test)]
720mod tests {
721 use super::*;
722
723 #[test]
724 fn test_filtered_vec_single_pass() {
725 let items = vec![1, 2, 3, 4, 5, 6];
726 let filtered = FilteredVec::new(&items, |&x| x % 2 == 0);
727
728 // Check filtered items
729 assert_eq!(filtered.items(), &[2, 4, 6]);
730 assert_eq!(filtered.len(), 3);
731
732 // Check index mapping
733 let results = vec!["a", "b", "c"];
734 let mapped = filtered.map_results_to_original(results, "default");
735 assert_eq!(mapped, vec!["default", "a", "default", "b", "default", "c"]);
736 }
737
738 #[test]
739 fn test_filtered_vec_empty_filter() {
740 let items = vec![1, 3, 5];
741 let filtered = FilteredVec::new(&items, |&x| x % 2 == 0);
742
743 assert_eq!(filtered.items(), &[] as &[i32]);
744 assert_eq!(filtered.len(), 0);
745
746 let results: Vec<&str> = vec![];
747 let mapped = filtered.map_results_to_original(results, "default");
748 assert_eq!(mapped, vec!["default", "default", "default"]);
749 }
750
751 #[test]
752 fn test_filtered_vec_all_pass() {
753 let items = vec![2, 4, 6];
754 let filtered = FilteredVec::new(&items, |&x| x % 2 == 0);
755
756 assert_eq!(filtered.items(), &[2, 4, 6]);
757 assert_eq!(filtered.len(), 3);
758
759 let results = vec!["a", "b", "c"];
760 let mapped = filtered.map_results_to_original(results, "default");
761 assert_eq!(mapped, vec!["a", "b", "c"]);
762 }
763
764 #[test]
765 fn test_chain_filter_slice_different_types() {
766 // First pass: filter numbers
767 let numbers = vec![1, 2, 3, 4, 5, 6];
768 let first_pass = FilteredVec::new(&numbers, |&x| x > 3);
769 assert_eq!(first_pass.items(), &[4, 5, 6]);
770
771 // Transform to strings (simulating gather_filters_for_pushdown transformation)
772 let strings = vec!["four", "five", "six"];
773
774 // Second pass: filter strings that contain 'i'
775 let second_pass = first_pass.chain_filter_slice(&strings, |s| s.contains('i'));
776 assert_eq!(second_pass.items(), &["five", "six"]);
777
778 // Map results back to original indices
779 let results = vec![100, 200];
780 let mapped = second_pass.map_results_to_original(results, 0);
781 // "five" was at index 4 (1-based: 5), "six" was at index 5 (1-based: 6)
782 assert_eq!(mapped, vec![0, 0, 0, 0, 100, 200]);
783 }
784
785 #[test]
786 fn test_chain_filter_slice_complex_scenario() {
787 // Simulating the filter pushdown scenario
788 // Parent predicates: [A, B, C, D, E]
789 let parent_predicates = vec!["A", "B", "C", "D", "E"];
790
791 // First pass: filter out some predicates (simulating allow_pushdown_for_expr)
792 let first_pass = FilteredVec::new(&parent_predicates, |s| *s != "B" && *s != "D");
793 assert_eq!(first_pass.items(), &["A", "C", "E"]);
794
795 // After gather_filters_for_pushdown, we get transformed results for a specific child
796 // Let's say child gets [A_transformed, C_transformed, E_transformed]
797 // but only C and E can be pushed down
798 #[derive(Clone, Debug, PartialEq)]
799 struct TransformedPredicate {
800 name: String,
801 can_push: bool,
802 }
803
804 let child_predicates = vec![
805 TransformedPredicate {
806 name: "A_transformed".to_string(),
807 can_push: false,
808 },
809 TransformedPredicate {
810 name: "C_transformed".to_string(),
811 can_push: true,
812 },
813 TransformedPredicate {
814 name: "E_transformed".to_string(),
815 can_push: true,
816 },
817 ];
818
819 // Second pass: filter based on can_push
820 let second_pass =
821 first_pass.chain_filter_slice(&child_predicates, |p| p.can_push);
822 assert_eq!(second_pass.len(), 2);
823 assert_eq!(second_pass.items()[0].name, "C_transformed");
824 assert_eq!(second_pass.items()[1].name, "E_transformed");
825
826 // Simulate getting results back from child
827 let child_results = vec!["C_result", "E_result"];
828 let mapped = second_pass.map_results_to_original(child_results, "no_result");
829
830 // Results should be at original positions: C was at index 2, E was at index 4
831 assert_eq!(
832 mapped,
833 vec![
834 "no_result",
835 "no_result",
836 "C_result",
837 "no_result",
838 "E_result"
839 ]
840 );
841 }
842
843 #[test]
844 fn test_trace_to_original_index() {
845 let items = vec![10, 20, 30, 40, 50];
846 let filtered = FilteredVec::new(&items, |&x| x != 20 && x != 40);
847
848 // filtered items are [10, 30, 50] at original indices [0, 2, 4]
849 assert_eq!(filtered.trace_to_original_index(0), 0); // 10 was at index 0
850 assert_eq!(filtered.trace_to_original_index(1), 2); // 30 was at index 2
851 assert_eq!(filtered.trace_to_original_index(2), 4); // 50 was at index 4
852 }
853
854 #[test]
855 fn test_chain_filter_preserves_original_len() {
856 let items = vec![1, 2, 3, 4, 5];
857 let first = FilteredVec::new(&items, |&x| x > 2);
858
859 let strings = vec!["three", "four", "five"];
860 let second = first.chain_filter_slice(&strings, |s| s.len() == 4);
861
862 // Original length should still be 5
863 let results = vec!["x", "y"];
864 let mapped = second.map_results_to_original(results, "-");
865 assert_eq!(mapped.len(), 5);
866 }
867}