datafusion_datasource_parquet/
row_filter.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Utilities to push down of DataFusion filter predicates (any DataFusion
19//! `PhysicalExpr` that evaluates to a [`BooleanArray`]) to the parquet decoder
20//! level in `arrow-rs`.
21//!
22//! DataFusion will use a `ParquetRecordBatchStream` to read data from parquet
23//! into [`RecordBatch`]es.
24//!
25//! The `ParquetRecordBatchStream` takes an optional `RowFilter` which is itself
26//! a Vec of `Box<dyn ArrowPredicate>`. During decoding, the predicates are
27//! evaluated in order, to generate a mask which is used to avoid decoding rows
28//! in projected columns which do not pass the filter which can significantly
29//! reduce the amount of compute required for decoding and thus improve query
30//! performance.
31//!
32//! Since the predicates are applied serially in the order defined in the
33//! `RowFilter`, the optimal ordering depends on the exact filters. The best
34//! filters to execute first have two properties:
35//!
36//! 1. They are relatively inexpensive to evaluate (e.g. they read
37//!    column chunks which are relatively small)
38//!
39//! 2. They filter many (contiguous) rows, reducing the amount of decoding
40//!    required for subsequent filters and projected columns
41//!
42//! If requested, this code will reorder the filters based on heuristics try and
43//! reduce the evaluation cost.
44//!
45//! The basic algorithm for constructing the `RowFilter` is as follows
46//!
47//! 1. Break conjunctions into separate predicates. An expression
48//!    like `a = 1 AND (b = 2 AND c = 3)` would be
49//!    separated into the expressions `a = 1`, `b = 2`, and `c = 3`.
50//! 2. Determine whether each predicate can be evaluated as an `ArrowPredicate`.
51//! 3. Determine, for each predicate, the total compressed size of all
52//!    columns required to evaluate the predicate.
53//! 4. Determine, for each predicate, whether all columns required to
54//!    evaluate the expression are sorted.
55//! 5. Re-order the predicate by total size (from step 3).
56//! 6. Partition the predicates according to whether they are sorted (from step 4)
57//! 7. "Compile" each predicate `Expr` to a `DatafusionArrowPredicate`.
58//! 8. Build the `RowFilter` with the sorted predicates followed by
59//!    the unsorted predicates. Within each partition, predicates are
60//!    still be sorted by size.
61
62use std::cmp::Ordering;
63use std::collections::BTreeSet;
64use std::sync::Arc;
65
66use arrow::array::BooleanArray;
67use arrow::datatypes::{DataType, Schema, SchemaRef};
68use arrow::error::{ArrowError, Result as ArrowResult};
69use arrow::record_batch::RecordBatch;
70use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
71use parquet::arrow::ProjectionMask;
72use parquet::file::metadata::ParquetMetaData;
73
74use datafusion_common::cast::as_boolean_array;
75use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
76use datafusion_common::Result;
77use datafusion_datasource::schema_adapter::{SchemaAdapterFactory, SchemaMapper};
78use datafusion_physical_expr::expressions::Column;
79use datafusion_physical_expr::utils::reassign_expr_columns;
80use datafusion_physical_expr::{split_conjunction, PhysicalExpr, PhysicalExprExt};
81
82use datafusion_physical_plan::metrics;
83
84use super::ParquetFileMetrics;
85
86/// A "compiled" predicate passed to `ParquetRecordBatchStream` to perform
87/// row-level filtering during parquet decoding.
88///
89/// See the module level documentation for more information.
90///
91/// Implements the `ArrowPredicate` trait used by the parquet decoder
92///
93/// An expression can be evaluated as a `DatafusionArrowPredicate` if it:
94/// * Does not reference any projected columns
95/// * Does not reference columns with non-primitive types (e.g. structs / lists)
96#[derive(Debug)]
97pub(crate) struct DatafusionArrowPredicate {
98    /// the filter expression
99    physical_expr: Arc<dyn PhysicalExpr>,
100    /// Path to the columns in the parquet schema required to evaluate the
101    /// expression
102    projection_mask: ProjectionMask,
103    /// how many rows were filtered out by this predicate
104    rows_pruned: metrics::Count,
105    /// how many rows passed this predicate
106    rows_matched: metrics::Count,
107    /// how long was spent evaluating this predicate
108    time: metrics::Time,
109    /// used to perform type coercion while filtering rows
110    schema_mapper: Arc<dyn SchemaMapper>,
111}
112
113impl DatafusionArrowPredicate {
114    /// Create a new `DatafusionArrowPredicate` from a `FilterCandidate`
115    pub fn try_new(
116        candidate: FilterCandidate,
117        metadata: &ParquetMetaData,
118        rows_pruned: metrics::Count,
119        rows_matched: metrics::Count,
120        time: metrics::Time,
121    ) -> Result<Self> {
122        let physical_expr =
123            reassign_expr_columns(candidate.expr, &candidate.filter_schema)?;
124
125        Ok(Self {
126            physical_expr,
127            projection_mask: ProjectionMask::roots(
128                metadata.file_metadata().schema_descr(),
129                candidate.projection,
130            ),
131            rows_pruned,
132            rows_matched,
133            time,
134            schema_mapper: candidate.schema_mapper,
135        })
136    }
137}
138
139impl ArrowPredicate for DatafusionArrowPredicate {
140    fn projection(&self) -> &ProjectionMask {
141        &self.projection_mask
142    }
143
144    fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray> {
145        let batch = self.schema_mapper.map_batch(batch)?;
146
147        // scoped timer updates on drop
148        let mut timer = self.time.timer();
149
150        self.physical_expr
151            .evaluate(&batch)
152            .and_then(|v| v.into_array(batch.num_rows()))
153            .and_then(|array| {
154                let bool_arr = as_boolean_array(&array)?.clone();
155                let num_matched = bool_arr.true_count();
156                let num_pruned = bool_arr.len() - num_matched;
157                self.rows_pruned.add(num_pruned);
158                self.rows_matched.add(num_matched);
159                timer.stop();
160                Ok(bool_arr)
161            })
162            .map_err(|e| {
163                ArrowError::ComputeError(format!(
164                    "Error evaluating filter predicate: {e:?}"
165                ))
166            })
167    }
168}
169
170/// A candidate expression for creating a `RowFilter`.
171///
172/// Each candidate contains the expression as well as data to estimate the cost
173/// of evaluating the resulting expression.
174///
175/// See the module level documentation for more information.
176pub(crate) struct FilterCandidate {
177    expr: Arc<dyn PhysicalExpr>,
178    /// Estimate for the total number of bytes that will need to be processed
179    /// to evaluate this filter. This is used to estimate the cost of evaluating
180    /// the filter and to order the filters when `reorder_predicates` is true.
181    /// This is generated by summing the compressed size of all columns that the filter references.
182    required_bytes: usize,
183    /// Can this filter use an index (e.g. a page index) to prune rows?
184    can_use_index: bool,
185    /// The projection to read from the file schema to get the columns
186    /// required to pass through a `SchemaMapper` to the table schema
187    /// upon which we then evaluate the filter expression.
188    projection: Vec<usize>,
189    ///  A `SchemaMapper` used to map batches read from the file schema to
190    /// the filter's projection of the table schema.
191    schema_mapper: Arc<dyn SchemaMapper>,
192    /// The projected table schema that this filter references
193    filter_schema: SchemaRef,
194}
195
196/// Helper to build a `FilterCandidate`.
197///
198/// This will do several things
199/// 1. Determine the columns required to evaluate the expression
200/// 2. Calculate data required to estimate the cost of evaluating the filter
201/// 3. Rewrite column expressions in the predicate which reference columns not
202///    in the particular file schema.
203///
204/// # Schema Rewrite
205///
206/// When parquet files are read in the context of "schema evolution" there are
207/// potentially wo schemas:
208///
209/// 1. The table schema (the columns of the table that the parquet file is part of)
210/// 2. The file schema (the columns actually in the parquet file)
211///
212/// There are times when the table schema contains columns that are not in the
213/// file schema, such as when new columns have been added in new parquet files
214/// but old files do not have the columns.
215///
216/// When a file is missing a column from the table schema, the value of the
217/// missing column is filled in by a `SchemaAdapter` (by default as `NULL`).
218///
219/// When a predicate is pushed down to the parquet reader, the predicate is
220/// evaluated in the context of the file schema.
221/// For each predicate we build a filter schema which is the projection of the table
222/// schema that contains only the columns that this filter references.
223/// If any columns from the file schema are missing from a particular file they are
224/// added by the `SchemaAdapter`, by default as `NULL`.
225struct FilterCandidateBuilder {
226    expr: Arc<dyn PhysicalExpr>,
227    /// The schema of this parquet file.
228    /// Columns may have different types from the table schema and there may be
229    /// columns in the file schema that are not in the table schema or columns that
230    /// are in the table schema that are not in the file schema.
231    file_schema: SchemaRef,
232    /// The schema of the table (merged schema) -- columns may be in different
233    /// order than in the file and have columns that are not in the file schema
234    table_schema: SchemaRef,
235    /// A `SchemaAdapterFactory` used to map the file schema to the table schema.
236    schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
237}
238
239impl FilterCandidateBuilder {
240    pub fn new(
241        expr: Arc<dyn PhysicalExpr>,
242        file_schema: Arc<Schema>,
243        table_schema: Arc<Schema>,
244        schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
245    ) -> Self {
246        Self {
247            expr,
248            file_schema,
249            table_schema,
250            schema_adapter_factory,
251        }
252    }
253
254    /// Attempt to build a `FilterCandidate` from the expression
255    ///
256    /// # Return values
257    ///
258    /// * `Ok(Some(candidate))` if the expression can be used as an ArrowFilter
259    /// * `Ok(None)` if the expression cannot be used as an ArrowFilter
260    /// * `Err(e)` if an error occurs while building the candidate
261    pub fn build(self, metadata: &ParquetMetaData) -> Result<Option<FilterCandidate>> {
262        let Some(required_indices_into_table_schema) =
263            pushdown_columns(&self.expr, &self.table_schema)?
264        else {
265            return Ok(None);
266        };
267
268        let projected_table_schema = Arc::new(
269            self.table_schema
270                .project(&required_indices_into_table_schema)?,
271        );
272
273        let (schema_mapper, projection_into_file_schema) = self
274            .schema_adapter_factory
275            .create(Arc::clone(&projected_table_schema), self.table_schema)
276            .map_schema(&self.file_schema)?;
277
278        let required_bytes = size_of_columns(&projection_into_file_schema, metadata)?;
279        let can_use_index = columns_sorted(&projection_into_file_schema, metadata)?;
280
281        Ok(Some(FilterCandidate {
282            expr: self.expr,
283            required_bytes,
284            can_use_index,
285            projection: projection_into_file_schema,
286            schema_mapper: Arc::clone(&schema_mapper),
287            filter_schema: Arc::clone(&projected_table_schema),
288        }))
289    }
290}
291
292// a struct that implements TreeNodeRewriter to traverse a PhysicalExpr tree structure to determine
293// if any column references in the expression would prevent it from being predicate-pushed-down.
294// if non_primitive_columns || projected_columns, it can't be pushed down.
295// can't be reused between calls to `rewrite`; each construction must be used only once.
296struct PushdownChecker<'schema> {
297    /// Does the expression require any non-primitive columns (like structs)?
298    non_primitive_columns: bool,
299    /// Does the expression reference any columns that are in the table
300    /// schema but not in the file schema?
301    /// This includes partition columns and projected columns.
302    projected_columns: bool,
303    // Indices into the table schema of the columns required to evaluate the expression
304    required_columns: BTreeSet<usize>,
305    table_schema: &'schema Schema,
306}
307
308impl<'schema> PushdownChecker<'schema> {
309    fn new(table_schema: &'schema Schema) -> Self {
310        Self {
311            non_primitive_columns: false,
312            projected_columns: false,
313            required_columns: BTreeSet::default(),
314            table_schema,
315        }
316    }
317
318    fn check_single_column(&mut self, column_name: &str) -> Option<TreeNodeRecursion> {
319        if let Ok(idx) = self.table_schema.index_of(column_name) {
320            self.required_columns.insert(idx);
321            if DataType::is_nested(self.table_schema.field(idx).data_type()) {
322                self.non_primitive_columns = true;
323                return Some(TreeNodeRecursion::Jump);
324            }
325        } else {
326            // If the column does not exist in the (un-projected) table schema then
327            // it must be a projected column.
328            self.projected_columns = true;
329            return Some(TreeNodeRecursion::Jump);
330        }
331
332        None
333    }
334
335    #[inline]
336    fn prevents_pushdown(&self) -> bool {
337        self.non_primitive_columns || self.projected_columns
338    }
339
340    fn check(&mut self, node: Arc<dyn PhysicalExpr>) -> Result<TreeNodeRecursion> {
341        node.apply_with_lambdas_params(|node, lamdas_params| {
342            if let Some(column) = node.as_any().downcast_ref::<Column>() {
343                if !lamdas_params.contains(column.name()) {
344                    if let Some(recursion) = self.check_single_column(column.name()) {
345                        return Ok(recursion);
346                    }
347                }
348            }
349
350            Ok(TreeNodeRecursion::Continue)
351        })
352    }
353}
354
355impl TreeNodeVisitor<'_> for PushdownChecker<'_> {
356    type Node = Arc<dyn PhysicalExpr>;
357
358    fn f_down(&mut self, node: &Self::Node) -> Result<TreeNodeRecursion> {
359        if let Some(column) = node.as_any().downcast_ref::<Column>() {
360            if let Some(recursion) = self.check_single_column(column.name()) {
361                return Ok(recursion);
362            }
363        }
364
365        Ok(TreeNodeRecursion::Continue)
366    }
367}
368
369// Checks if a given expression can be pushed down into `DataSourceExec` as opposed to being evaluated
370// post-parquet-scan in a `FilterExec`. If it can be pushed down, this returns all the
371// columns in the given expression so that they can be used in the parquet scanning, along with the
372// expression rewritten as defined in [`PushdownChecker::f_up`]
373fn pushdown_columns(
374    expr: &Arc<dyn PhysicalExpr>,
375    table_schema: &Schema,
376) -> Result<Option<Vec<usize>>> {
377    let mut checker = PushdownChecker::new(table_schema);
378    expr.visit(&mut checker)?;
379    Ok((!checker.prevents_pushdown())
380        .then_some(checker.required_columns.into_iter().collect()))
381}
382
383/// Recurses through expr as a tree, finds all `column`s, and checks if any of them would prevent
384/// this expression from being predicate pushed down. If any of them would, this returns false.
385/// Otherwise, true.
386/// Note that the schema passed in here is *not* the physical file schema (as it is not available at that point in time);
387/// it is the schema of the table that this expression is being evaluated against minus any projected columns and partition columns.
388pub fn can_expr_be_pushed_down_with_schemas(
389    expr: &Arc<dyn PhysicalExpr>,
390    file_schema: &Schema,
391) -> bool {
392    match pushdown_columns(expr, file_schema) {
393        Ok(Some(_)) => true,
394        Ok(None) | Err(_) => false,
395    }
396}
397
398/// Calculate the total compressed size of all `Column`'s required for
399/// predicate `Expr`.
400///
401/// This value represents the total amount of IO required to evaluate the
402/// predicate.
403fn size_of_columns(columns: &[usize], metadata: &ParquetMetaData) -> Result<usize> {
404    let mut total_size = 0;
405    let row_groups = metadata.row_groups();
406    for idx in columns {
407        for rg in row_groups.iter() {
408            total_size += rg.column(*idx).compressed_size() as usize;
409        }
410    }
411
412    Ok(total_size)
413}
414
415/// For a given set of `Column`s required for predicate `Expr` determine whether
416/// all columns are sorted.
417///
418/// Sorted columns may be queried more efficiently in the presence of
419/// a PageIndex.
420fn columns_sorted(_columns: &[usize], _metadata: &ParquetMetaData) -> Result<bool> {
421    // TODO How do we know this?
422    Ok(false)
423}
424
425/// Build a [`RowFilter`] from the given predicate `Expr` if possible
426///
427/// # returns
428/// * `Ok(Some(row_filter))` if the expression can be used as RowFilter
429/// * `Ok(None)` if the expression cannot be used as an RowFilter
430/// * `Err(e)` if an error occurs while building the filter
431///
432/// Note that the returned `RowFilter` may not contains all conjuncts in the
433/// original expression. This is because some conjuncts may not be able to be
434/// evaluated as an `ArrowPredicate` and will be ignored.
435///
436/// For example, if the expression is `a = 1 AND b = 2 AND c = 3` and `b = 2`
437/// can not be evaluated for some reason, the returned `RowFilter` will contain
438/// `a = 1` and `c = 3`.
439pub fn build_row_filter(
440    expr: &Arc<dyn PhysicalExpr>,
441    physical_file_schema: &SchemaRef,
442    predicate_file_schema: &SchemaRef,
443    metadata: &ParquetMetaData,
444    reorder_predicates: bool,
445    file_metrics: &ParquetFileMetrics,
446    schema_adapter_factory: &Arc<dyn SchemaAdapterFactory>,
447) -> Result<Option<RowFilter>> {
448    let rows_pruned = &file_metrics.pushdown_rows_pruned;
449    let rows_matched = &file_metrics.pushdown_rows_matched;
450    let time = &file_metrics.row_pushdown_eval_time;
451
452    // Split into conjuncts:
453    // `a = 1 AND b = 2 AND c = 3` -> [`a = 1`, `b = 2`, `c = 3`]
454    let predicates = split_conjunction(expr);
455
456    // Determine which conjuncts can be evaluated as ArrowPredicates, if any
457    let mut candidates: Vec<FilterCandidate> = predicates
458        .into_iter()
459        .map(|expr| {
460            FilterCandidateBuilder::new(
461                Arc::clone(expr),
462                Arc::clone(physical_file_schema),
463                Arc::clone(predicate_file_schema),
464                Arc::clone(schema_adapter_factory),
465            )
466            .build(metadata)
467        })
468        .collect::<Result<Vec<_>, _>>()?
469        .into_iter()
470        .flatten()
471        .collect();
472
473    // no candidates
474    if candidates.is_empty() {
475        return Ok(None);
476    }
477
478    if reorder_predicates {
479        candidates.sort_unstable_by(|c1, c2| {
480            match c1.can_use_index.cmp(&c2.can_use_index) {
481                Ordering::Equal => c1.required_bytes.cmp(&c2.required_bytes),
482                ord => ord,
483            }
484        });
485    }
486
487    candidates
488        .into_iter()
489        .map(|candidate| {
490            DatafusionArrowPredicate::try_new(
491                candidate,
492                metadata,
493                rows_pruned.clone(),
494                rows_matched.clone(),
495                time.clone(),
496            )
497            .map(|pred| Box::new(pred) as _)
498        })
499        .collect::<Result<Vec<_>, _>>()
500        .map(|filters| Some(RowFilter::new(filters)))
501}
502
503#[cfg(test)]
504mod test {
505    use super::*;
506    use datafusion_common::ScalarValue;
507
508    use arrow::datatypes::{Field, TimeUnit::Nanosecond};
509    use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory;
510    use datafusion_expr::{col, Expr};
511    use datafusion_physical_expr::planner::logical2physical;
512    use datafusion_physical_plan::metrics::{Count, Time};
513
514    use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
515    use parquet::arrow::parquet_to_arrow_schema;
516    use parquet::file::reader::{FileReader, SerializedFileReader};
517
518    // We should ignore predicate that read non-primitive columns
519    #[test]
520    fn test_filter_candidate_builder_ignore_complex_types() {
521        let testdata = datafusion_common::test_util::parquet_test_data();
522        let file = std::fs::File::open(format!("{testdata}/list_columns.parquet"))
523            .expect("opening file");
524
525        let reader = SerializedFileReader::new(file).expect("creating reader");
526
527        let metadata = reader.metadata();
528
529        let table_schema =
530            parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None)
531                .expect("parsing schema");
532
533        let expr = col("int64_list").is_not_null();
534        let expr = logical2physical(&expr, &table_schema);
535
536        let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory);
537        let table_schema = Arc::new(table_schema.clone());
538
539        let candidate = FilterCandidateBuilder::new(
540            expr,
541            table_schema.clone(),
542            table_schema,
543            schema_adapter_factory,
544        )
545        .build(metadata)
546        .expect("building candidate");
547
548        assert!(candidate.is_none());
549    }
550
551    #[test]
552    fn test_filter_type_coercion() {
553        let testdata = datafusion_common::test_util::parquet_test_data();
554        let file = std::fs::File::open(format!("{testdata}/alltypes_plain.parquet"))
555            .expect("opening file");
556
557        let parquet_reader_builder =
558            ParquetRecordBatchReaderBuilder::try_new(file).expect("creating reader");
559        let metadata = parquet_reader_builder.metadata().clone();
560        let file_schema = parquet_reader_builder.schema().clone();
561
562        // This is the schema we would like to coerce to,
563        // which is different from the physical schema of the file.
564        let table_schema = Schema::new(vec![Field::new(
565            "timestamp_col",
566            DataType::Timestamp(Nanosecond, Some(Arc::from("UTC"))),
567            false,
568        )]);
569
570        // Test all should fail
571        let expr = col("timestamp_col").lt(Expr::Literal(
572            ScalarValue::TimestampNanosecond(Some(1), Some(Arc::from("UTC"))),
573            None,
574        ));
575        let expr = logical2physical(&expr, &table_schema);
576        let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory);
577        let table_schema = Arc::new(table_schema.clone());
578        let candidate = FilterCandidateBuilder::new(
579            expr,
580            file_schema.clone(),
581            table_schema.clone(),
582            schema_adapter_factory,
583        )
584        .build(&metadata)
585        .expect("building candidate")
586        .expect("candidate expected");
587
588        let mut row_filter = DatafusionArrowPredicate::try_new(
589            candidate,
590            &metadata,
591            Count::new(),
592            Count::new(),
593            Time::new(),
594        )
595        .expect("creating filter predicate");
596
597        let mut parquet_reader = parquet_reader_builder
598            .with_projection(row_filter.projection().clone())
599            .build()
600            .expect("building reader");
601
602        // Parquet file is small, we only need 1 record batch
603        let first_rb = parquet_reader
604            .next()
605            .expect("expected record batch")
606            .expect("expected error free record batch");
607
608        let filtered = row_filter.evaluate(first_rb.clone());
609        assert!(matches!(filtered, Ok(a) if a == BooleanArray::from(vec![false; 8])));
610
611        // Test all should pass
612        let expr = col("timestamp_col").gt(Expr::Literal(
613            ScalarValue::TimestampNanosecond(Some(0), Some(Arc::from("UTC"))),
614            None,
615        ));
616        let expr = logical2physical(&expr, &table_schema);
617        let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory);
618        let candidate = FilterCandidateBuilder::new(
619            expr,
620            file_schema,
621            table_schema,
622            schema_adapter_factory,
623        )
624        .build(&metadata)
625        .expect("building candidate")
626        .expect("candidate expected");
627
628        let mut row_filter = DatafusionArrowPredicate::try_new(
629            candidate,
630            &metadata,
631            Count::new(),
632            Count::new(),
633            Time::new(),
634        )
635        .expect("creating filter predicate");
636
637        let filtered = row_filter.evaluate(first_rb);
638        assert!(matches!(filtered, Ok(a) if a == BooleanArray::from(vec![true; 8])));
639    }
640
641    #[test]
642    fn nested_data_structures_prevent_pushdown() {
643        let table_schema = Arc::new(get_lists_table_schema());
644
645        let expr = col("utf8_list").is_not_null();
646        let expr = logical2physical(&expr, &table_schema);
647        check_expression_can_evaluate_against_schema(&expr, &table_schema);
648
649        assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
650    }
651
652    #[test]
653    fn projected_columns_prevent_pushdown() {
654        let table_schema = get_basic_table_schema();
655
656        let expr =
657            Arc::new(Column::new("nonexistent_column", 0)) as Arc<dyn PhysicalExpr>;
658
659        assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
660    }
661
662    #[test]
663    fn basic_expr_doesnt_prevent_pushdown() {
664        let table_schema = get_basic_table_schema();
665
666        let expr = col("string_col").is_null();
667        let expr = logical2physical(&expr, &table_schema);
668
669        assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
670    }
671
672    #[test]
673    fn complex_expr_doesnt_prevent_pushdown() {
674        let table_schema = get_basic_table_schema();
675
676        let expr = col("string_col")
677            .is_not_null()
678            .or(col("bigint_col").gt(Expr::Literal(ScalarValue::Int64(Some(5)), None)));
679        let expr = logical2physical(&expr, &table_schema);
680
681        assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
682    }
683
684    fn get_basic_table_schema() -> Schema {
685        let testdata = datafusion_common::test_util::parquet_test_data();
686        let file = std::fs::File::open(format!("{testdata}/alltypes_plain.parquet"))
687            .expect("opening file");
688
689        let reader = SerializedFileReader::new(file).expect("creating reader");
690
691        let metadata = reader.metadata();
692
693        parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None)
694            .expect("parsing schema")
695    }
696
697    fn get_lists_table_schema() -> Schema {
698        let testdata = datafusion_common::test_util::parquet_test_data();
699        let file = std::fs::File::open(format!("{testdata}/list_columns.parquet"))
700            .expect("opening file");
701
702        let reader = SerializedFileReader::new(file).expect("creating reader");
703
704        let metadata = reader.metadata();
705
706        parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None)
707            .expect("parsing schema")
708    }
709
710    /// Sanity check that the given expression could be evaluated against the given schema without any errors.
711    /// This will fail if the expression references columns that are not in the schema or if the types of the columns are incompatible, etc.
712    fn check_expression_can_evaluate_against_schema(
713        expr: &Arc<dyn PhysicalExpr>,
714        table_schema: &Arc<Schema>,
715    ) -> bool {
716        let batch = RecordBatch::new_empty(Arc::clone(table_schema));
717        expr.evaluate(&batch).is_ok()
718    }
719}