datafusion_datasource_parquet/
row_group_filter.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::{HashMap, HashSet};
19use std::sync::Arc;
20
21use super::{ParquetAccessPlan, ParquetFileMetrics};
22use arrow::array::{ArrayRef, BooleanArray};
23use arrow::datatypes::Schema;
24use datafusion_common::pruning::PruningStatistics;
25use datafusion_common::{Column, Result, ScalarValue};
26use datafusion_datasource::FileRange;
27use datafusion_pruning::PruningPredicate;
28use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
29use parquet::arrow::parquet_column;
30use parquet::basic::Type;
31use parquet::data_type::Decimal;
32use parquet::schema::types::SchemaDescriptor;
33use parquet::{
34    arrow::{async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder},
35    bloom_filter::Sbbf,
36    file::metadata::RowGroupMetaData,
37};
38
39/// Reduces the [`ParquetAccessPlan`] based on row group level metadata.
40///
41/// This struct implements the various types of pruning that are applied to a
42/// set of row groups within a parquet file, progressively narrowing down the
43/// set of row groups (and ranges/selections within those row groups) that
44/// should be scanned, based on the available metadata.
45#[derive(Debug, Clone, PartialEq)]
46pub struct RowGroupAccessPlanFilter {
47    /// which row groups should be accessed
48    access_plan: ParquetAccessPlan,
49}
50
51impl RowGroupAccessPlanFilter {
52    /// Create a new `RowGroupPlanBuilder` for pruning out the groups to scan
53    /// based on metadata and statistics
54    pub fn new(access_plan: ParquetAccessPlan) -> Self {
55        Self { access_plan }
56    }
57
58    /// Return true if there are no row groups
59    pub fn is_empty(&self) -> bool {
60        self.access_plan.is_empty()
61    }
62
63    /// Return the number of row groups that are currently expected to be scanned
64    pub fn remaining_row_group_count(&self) -> usize {
65        self.access_plan.row_group_index_iter().count()
66    }
67
68    /// Returns the inner access plan
69    pub fn build(self) -> ParquetAccessPlan {
70        self.access_plan
71    }
72
73    /// Prune remaining row groups to only those  within the specified range.
74    ///
75    /// Updates this set to mark row groups that should not be scanned
76    ///
77    /// # Panics
78    /// if `groups.len() != self.len()`
79    pub fn prune_by_range(&mut self, groups: &[RowGroupMetaData], range: &FileRange) {
80        assert_eq!(groups.len(), self.access_plan.len());
81        for (idx, metadata) in groups.iter().enumerate() {
82            if !self.access_plan.should_scan(idx) {
83                continue;
84            }
85
86            // Skip the row group if the first dictionary/data page are not
87            // within the range.
88            //
89            // note don't use the location of metadata
90            // <https://github.com/apache/datafusion/issues/5995>
91            let col = metadata.column(0);
92            let offset = col
93                .dictionary_page_offset()
94                .unwrap_or_else(|| col.data_page_offset());
95            if !range.contains(offset) {
96                self.access_plan.skip(idx);
97            }
98        }
99    }
100    /// Prune remaining row groups using min/max/null_count statistics and
101    /// the [`PruningPredicate`] to determine if the predicate can not be true.
102    ///
103    /// Updates this set to mark row groups that should not be scanned
104    ///
105    /// Note: This method currently ignores ColumnOrder
106    /// <https://github.com/apache/datafusion/issues/8335>
107    ///
108    /// # Panics
109    /// if `groups.len() != self.len()`
110    pub fn prune_by_statistics(
111        &mut self,
112        arrow_schema: &Schema,
113        parquet_schema: &SchemaDescriptor,
114        groups: &[RowGroupMetaData],
115        predicate: &PruningPredicate,
116        metrics: &ParquetFileMetrics,
117    ) {
118        // scoped timer updates on drop
119        let _timer_guard = metrics.statistics_eval_time.timer();
120
121        assert_eq!(groups.len(), self.access_plan.len());
122        // Indexes of row groups still to scan
123        let row_group_indexes = self.access_plan.row_group_indexes();
124        let row_group_metadatas = row_group_indexes
125            .iter()
126            .map(|&i| &groups[i])
127            .collect::<Vec<_>>();
128
129        let pruning_stats = RowGroupPruningStatistics {
130            parquet_schema,
131            row_group_metadatas,
132            arrow_schema,
133        };
134
135        // try to prune the row groups in a single call
136        match predicate.prune(&pruning_stats) {
137            Ok(values) => {
138                // values[i] is false means the predicate could not be true for row group i
139                for (idx, &value) in row_group_indexes.iter().zip(values.iter()) {
140                    if !value {
141                        self.access_plan.skip(*idx);
142                        metrics.row_groups_pruned_statistics.add_pruned(1);
143                    } else {
144                        metrics.row_groups_pruned_statistics.add_matched(1);
145                    }
146                }
147            }
148            // stats filter array could not be built, so we can't prune
149            Err(e) => {
150                log::debug!("Error evaluating row group predicate values {e}");
151                metrics.predicate_evaluation_errors.add(1);
152            }
153        }
154    }
155
156    /// Prune remaining row groups using available bloom filters and the
157    /// [`PruningPredicate`].
158    ///
159    /// Updates this set with row groups that should not be scanned
160    ///
161    /// # Panics
162    /// if the builder does not have the same number of row groups as this set
163    pub async fn prune_by_bloom_filters<T: AsyncFileReader + Send + 'static>(
164        &mut self,
165        arrow_schema: &Schema,
166        builder: &mut ParquetRecordBatchStreamBuilder<T>,
167        predicate: &PruningPredicate,
168        metrics: &ParquetFileMetrics,
169    ) {
170        // scoped timer updates on drop
171        let _timer_guard = metrics.bloom_filter_eval_time.timer();
172
173        assert_eq!(builder.metadata().num_row_groups(), self.access_plan.len());
174        for idx in 0..self.access_plan.len() {
175            if !self.access_plan.should_scan(idx) {
176                continue;
177            }
178
179            // Attempt to find bloom filters for filtering this row group
180            let literal_columns = predicate.literal_columns();
181            let mut column_sbbf = HashMap::with_capacity(literal_columns.len());
182
183            for column_name in literal_columns {
184                let Some((column_idx, _field)) =
185                    parquet_column(builder.parquet_schema(), arrow_schema, &column_name)
186                else {
187                    continue;
188                };
189
190                let bf = match builder
191                    .get_row_group_column_bloom_filter(idx, column_idx)
192                    .await
193                {
194                    Ok(Some(bf)) => bf,
195                    Ok(None) => continue, // no bloom filter for this column
196                    Err(e) => {
197                        log::debug!("Ignoring error reading bloom filter: {e}");
198                        metrics.predicate_evaluation_errors.add(1);
199                        continue;
200                    }
201                };
202                let physical_type =
203                    builder.parquet_schema().column(column_idx).physical_type();
204
205                column_sbbf.insert(column_name.to_string(), (bf, physical_type));
206            }
207
208            let stats = BloomFilterStatistics { column_sbbf };
209
210            // Can this group be pruned?
211            let prune_group = match predicate.prune(&stats) {
212                Ok(values) => !values[0],
213                Err(e) => {
214                    log::debug!(
215                        "Error evaluating row group predicate on bloom filter: {e}"
216                    );
217                    metrics.predicate_evaluation_errors.add(1);
218                    false
219                }
220            };
221
222            if prune_group {
223                metrics.row_groups_pruned_bloom_filter.add_pruned(1);
224                self.access_plan.skip(idx)
225            } else {
226                metrics.row_groups_pruned_bloom_filter.add_matched(1);
227            }
228        }
229    }
230}
231/// Implements [`PruningStatistics`] for Parquet Split Block Bloom Filters (SBBF)
232struct BloomFilterStatistics {
233    /// Maps column name to the parquet bloom filter and parquet physical type
234    column_sbbf: HashMap<String, (Sbbf, Type)>,
235}
236
237impl BloomFilterStatistics {
238    /// Helper function for checking if [`Sbbf`] filter contains [`ScalarValue`].
239    ///
240    /// In case the type of scalar is not supported, returns `true`, assuming that the
241    /// value may be present.
242    fn check_scalar(sbbf: &Sbbf, value: &ScalarValue, parquet_type: &Type) -> bool {
243        match value {
244            ScalarValue::Utf8(Some(v))
245            | ScalarValue::Utf8View(Some(v))
246            | ScalarValue::LargeUtf8(Some(v)) => sbbf.check(&v.as_str()),
247            ScalarValue::Binary(Some(v))
248            | ScalarValue::BinaryView(Some(v))
249            | ScalarValue::LargeBinary(Some(v)) => sbbf.check(v),
250            ScalarValue::FixedSizeBinary(_size, Some(v)) => sbbf.check(v),
251            ScalarValue::Boolean(Some(v)) => sbbf.check(v),
252            ScalarValue::Float64(Some(v)) => sbbf.check(v),
253            ScalarValue::Float32(Some(v)) => sbbf.check(v),
254            ScalarValue::Int64(Some(v)) => sbbf.check(v),
255            ScalarValue::Int32(Some(v)) => sbbf.check(v),
256            ScalarValue::UInt64(Some(v)) => sbbf.check(v),
257            ScalarValue::UInt32(Some(v)) => sbbf.check(v),
258            ScalarValue::Decimal128(Some(v), p, s) => match parquet_type {
259                Type::INT32 => {
260                    //https://github.com/apache/parquet-format/blob/eb4b31c1d64a01088d02a2f9aefc6c17c54cc6fc/Encodings.md?plain=1#L35-L42
261                    // All physical type  are little-endian
262                    if *p > 9 {
263                        //DECIMAL can be used to annotate the following types:
264                        //
265                        // int32: for 1 <= precision <= 9
266                        // int64: for 1 <= precision <= 18
267                        return true;
268                    }
269                    let b = (*v as i32).to_le_bytes();
270                    // Use Decimal constructor after https://github.com/apache/arrow-rs/issues/5325
271                    let decimal = Decimal::Int32 {
272                        value: b,
273                        precision: *p as i32,
274                        scale: *s as i32,
275                    };
276                    sbbf.check(&decimal)
277                }
278                Type::INT64 => {
279                    if *p > 18 {
280                        return true;
281                    }
282                    let b = (*v as i64).to_le_bytes();
283                    let decimal = Decimal::Int64 {
284                        value: b,
285                        precision: *p as i32,
286                        scale: *s as i32,
287                    };
288                    sbbf.check(&decimal)
289                }
290                Type::FIXED_LEN_BYTE_ARRAY => {
291                    // keep with from_bytes_to_i128
292                    let b = v.to_be_bytes().to_vec();
293                    // Use Decimal constructor after https://github.com/apache/arrow-rs/issues/5325
294                    let decimal = Decimal::Bytes {
295                        value: b.into(),
296                        precision: *p as i32,
297                        scale: *s as i32,
298                    };
299                    sbbf.check(&decimal)
300                }
301                _ => true,
302            },
303            ScalarValue::Dictionary(_, inner) => {
304                BloomFilterStatistics::check_scalar(sbbf, inner, parquet_type)
305            }
306            _ => true,
307        }
308    }
309}
310
311impl PruningStatistics for BloomFilterStatistics {
312    fn min_values(&self, _column: &Column) -> Option<ArrayRef> {
313        None
314    }
315
316    fn max_values(&self, _column: &Column) -> Option<ArrayRef> {
317        None
318    }
319
320    fn num_containers(&self) -> usize {
321        1
322    }
323
324    fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
325        None
326    }
327
328    fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
329        None
330    }
331
332    /// Use bloom filters to determine if we are sure this column can not
333    /// possibly contain `values`
334    ///
335    /// The `contained` API returns false if the bloom filters knows that *ALL*
336    /// of the values in a column are not present.
337    fn contained(
338        &self,
339        column: &Column,
340        values: &HashSet<ScalarValue>,
341    ) -> Option<BooleanArray> {
342        let (sbbf, parquet_type) = self.column_sbbf.get(column.name.as_str())?;
343
344        // Bloom filters are probabilistic data structures that can return false
345        // positives (i.e. it might return true even if the value is not
346        // present) however, the bloom filter will return `false` if the value is
347        // definitely not present.
348
349        let known_not_present = values
350            .iter()
351            .map(|value| BloomFilterStatistics::check_scalar(sbbf, value, parquet_type))
352            // The row group doesn't contain any of the values if
353            // all the checks are false
354            .all(|v| !v);
355
356        let contains = if known_not_present {
357            Some(false)
358        } else {
359            // Given the bloom filter is probabilistic, we can't be sure that
360            // the row group actually contains the values. Return `None` to
361            // indicate this uncertainty
362            None
363        };
364
365        Some(BooleanArray::from(vec![contains]))
366    }
367}
368
369/// Wraps a slice of [`RowGroupMetaData`] in a way that implements [`PruningStatistics`]
370struct RowGroupPruningStatistics<'a> {
371    parquet_schema: &'a SchemaDescriptor,
372    row_group_metadatas: Vec<&'a RowGroupMetaData>,
373    arrow_schema: &'a Schema,
374}
375
376impl<'a> RowGroupPruningStatistics<'a> {
377    /// Return an iterator over the row group metadata
378    fn metadata_iter(&'a self) -> impl Iterator<Item = &'a RowGroupMetaData> + 'a {
379        self.row_group_metadatas.iter().copied()
380    }
381
382    fn statistics_converter<'b>(
383        &'a self,
384        column: &'b Column,
385    ) -> Result<StatisticsConverter<'a>> {
386        Ok(StatisticsConverter::try_new(
387            &column.name,
388            self.arrow_schema,
389            self.parquet_schema,
390        )?)
391    }
392}
393
394impl PruningStatistics for RowGroupPruningStatistics<'_> {
395    fn min_values(&self, column: &Column) -> Option<ArrayRef> {
396        self.statistics_converter(column)
397            .and_then(|c| Ok(c.row_group_mins(self.metadata_iter())?))
398            .ok()
399    }
400
401    fn max_values(&self, column: &Column) -> Option<ArrayRef> {
402        self.statistics_converter(column)
403            .and_then(|c| Ok(c.row_group_maxes(self.metadata_iter())?))
404            .ok()
405    }
406
407    fn num_containers(&self) -> usize {
408        self.row_group_metadatas.len()
409    }
410
411    fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
412        self.statistics_converter(column)
413            .and_then(|c| Ok(c.row_group_null_counts(self.metadata_iter())?))
414            .ok()
415            .map(|counts| Arc::new(counts) as ArrayRef)
416    }
417
418    fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
419        // row counts are the same for all columns in a row group
420        self.statistics_converter(column)
421            .and_then(|c| Ok(c.row_group_row_counts(self.metadata_iter())?))
422            .ok()
423            .flatten()
424            .map(|counts| Arc::new(counts) as ArrayRef)
425    }
426
427    fn contained(
428        &self,
429        _column: &Column,
430        _values: &HashSet<ScalarValue>,
431    ) -> Option<BooleanArray> {
432        None
433    }
434}
435
436#[cfg(test)]
437mod tests {
438    use std::ops::Rem;
439    use std::sync::Arc;
440
441    use super::*;
442    use crate::reader::ParquetFileReader;
443
444    use arrow::datatypes::DataType::Decimal128;
445    use arrow::datatypes::{DataType, Field};
446    use datafusion_common::Result;
447    use datafusion_expr::{cast, col, lit, Expr};
448    use datafusion_physical_expr::planner::logical2physical;
449    use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
450    use parquet::arrow::async_reader::ParquetObjectReader;
451    use parquet::arrow::ArrowSchemaConverter;
452    use parquet::basic::LogicalType;
453    use parquet::data_type::{ByteArray, FixedLenByteArray};
454    use parquet::file::metadata::ColumnChunkMetaData;
455    use parquet::{
456        basic::Type as PhysicalType, file::statistics::Statistics as ParquetStatistics,
457        schema::types::SchemaDescPtr,
458    };
459
460    struct PrimitiveTypeField {
461        name: &'static str,
462        physical_ty: PhysicalType,
463        logical_ty: Option<LogicalType>,
464        precision: Option<i32>,
465        scale: Option<i32>,
466        byte_len: Option<i32>,
467    }
468
469    impl PrimitiveTypeField {
470        fn new(name: &'static str, physical_ty: PhysicalType) -> Self {
471            Self {
472                name,
473                physical_ty,
474                logical_ty: None,
475                precision: None,
476                scale: None,
477                byte_len: None,
478            }
479        }
480
481        fn with_logical_type(mut self, logical_type: LogicalType) -> Self {
482            self.logical_ty = Some(logical_type);
483            self
484        }
485
486        fn with_precision(mut self, precision: i32) -> Self {
487            self.precision = Some(precision);
488            self
489        }
490
491        fn with_scale(mut self, scale: i32) -> Self {
492            self.scale = Some(scale);
493            self
494        }
495
496        fn with_byte_len(mut self, byte_len: i32) -> Self {
497            self.byte_len = Some(byte_len);
498            self
499        }
500    }
501
502    #[test]
503    fn remaining_row_group_count_reports_non_skipped_groups() {
504        let mut filter = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(4));
505        assert_eq!(filter.remaining_row_group_count(), 4);
506
507        filter.access_plan.skip(1);
508        assert_eq!(filter.remaining_row_group_count(), 3);
509
510        filter.access_plan.skip(3);
511        assert_eq!(filter.remaining_row_group_count(), 2);
512    }
513
514    #[test]
515    fn row_group_pruning_predicate_simple_expr() {
516        use datafusion_expr::{col, lit};
517        // int > 1 => c1_max > 1
518        let schema =
519            Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
520        let expr = col("c1").gt(lit(15));
521        let expr = logical2physical(&expr, &schema);
522        let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
523
524        let field = PrimitiveTypeField::new("c1", PhysicalType::INT32);
525        let schema_descr = get_test_schema_descr(vec![field]);
526        let rgm1 = get_row_group_meta_data(
527            &schema_descr,
528            vec![ParquetStatistics::int32(
529                Some(1),
530                Some(10),
531                None,
532                Some(0),
533                false,
534            )],
535        );
536        let rgm2 = get_row_group_meta_data(
537            &schema_descr,
538            vec![ParquetStatistics::int32(
539                Some(11),
540                Some(20),
541                None,
542                Some(0),
543                false,
544            )],
545        );
546
547        let metrics = parquet_file_metrics();
548        let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2));
549        row_groups.prune_by_statistics(
550            &schema,
551            &schema_descr,
552            &[rgm1, rgm2],
553            &pruning_predicate,
554            &metrics,
555        );
556        assert_pruned(row_groups, ExpectedPruning::Some(vec![1]))
557    }
558
559    #[test]
560    fn row_group_pruning_predicate_missing_stats() {
561        use datafusion_expr::{col, lit};
562        // int > 1 => c1_max > 1
563        let schema =
564            Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
565        let expr = col("c1").gt(lit(15));
566        let expr = logical2physical(&expr, &schema);
567        let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
568
569        let field = PrimitiveTypeField::new("c1", PhysicalType::INT32);
570        let schema_descr = get_test_schema_descr(vec![field]);
571        let rgm1 = get_row_group_meta_data(
572            &schema_descr,
573            vec![ParquetStatistics::int32(None, None, None, Some(0), false)],
574        );
575        let rgm2 = get_row_group_meta_data(
576            &schema_descr,
577            vec![ParquetStatistics::int32(
578                Some(11),
579                Some(20),
580                None,
581                Some(0),
582                false,
583            )],
584        );
585        let metrics = parquet_file_metrics();
586        // missing statistics for first row group mean that the result from the predicate expression
587        // is null / undefined so the first row group can't be filtered out
588        let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2));
589        row_groups.prune_by_statistics(
590            &schema,
591            &schema_descr,
592            &[rgm1, rgm2],
593            &pruning_predicate,
594            &metrics,
595        );
596        assert_pruned(row_groups, ExpectedPruning::None);
597    }
598
599    #[test]
600    fn row_group_pruning_predicate_partial_expr() {
601        use datafusion_expr::{col, lit};
602        // test row group predicate with partially supported expression
603        // (int > 1) and ((int % 2) = 0) => c1_max > 1 and true
604        let schema = Arc::new(Schema::new(vec![
605            Field::new("c1", DataType::Int32, false),
606            Field::new("c2", DataType::Int32, false),
607        ]));
608        let expr = col("c1").gt(lit(15)).and(col("c2").rem(lit(2)).eq(lit(0)));
609        let expr = logical2physical(&expr, &schema);
610        let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
611
612        let schema_descr = get_test_schema_descr(vec![
613            PrimitiveTypeField::new("c1", PhysicalType::INT32),
614            PrimitiveTypeField::new("c2", PhysicalType::INT32),
615        ]);
616        let rgm1 = get_row_group_meta_data(
617            &schema_descr,
618            vec![
619                ParquetStatistics::int32(Some(1), Some(10), None, Some(0), false),
620                ParquetStatistics::int32(Some(1), Some(10), None, Some(0), false),
621            ],
622        );
623        let rgm2 = get_row_group_meta_data(
624            &schema_descr,
625            vec![
626                ParquetStatistics::int32(Some(11), Some(20), None, Some(0), false),
627                ParquetStatistics::int32(Some(11), Some(20), None, Some(0), false),
628            ],
629        );
630
631        let metrics = parquet_file_metrics();
632        let groups = &[rgm1, rgm2];
633        // the first row group is still filtered out because the predicate expression can be partially evaluated
634        // when conditions are joined using AND
635        let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2));
636        row_groups.prune_by_statistics(
637            &schema,
638            &schema_descr,
639            groups,
640            &pruning_predicate,
641            &metrics,
642        );
643        assert_pruned(row_groups, ExpectedPruning::Some(vec![1]));
644
645        // if conditions in predicate are joined with OR and an unsupported expression is used
646        // this bypasses the entire predicate expression and no row groups are filtered out
647        let expr = col("c1").gt(lit(15)).or(col("c2").rem(lit(2)).eq(lit(0)));
648        let expr = logical2physical(&expr, &schema);
649        let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
650
651        // if conditions in predicate are joined with OR and an unsupported expression is used
652        // this bypasses the entire predicate expression and no row groups are filtered out
653        let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2));
654        row_groups.prune_by_statistics(
655            &schema,
656            &schema_descr,
657            groups,
658            &pruning_predicate,
659            &metrics,
660        );
661        assert_pruned(row_groups, ExpectedPruning::None);
662    }
663
664    #[test]
665    fn row_group_pruning_predicate_file_schema() {
666        use datafusion_expr::{col, lit};
667        // test row group predicate when file schema is different than table schema
668        // c1 > 0
669        let table_schema = Arc::new(Schema::new(vec![
670            Field::new("c1", DataType::Int32, false),
671            Field::new("c2", DataType::Int32, false),
672        ]));
673        let expr = col("c1").gt(lit(0));
674        let expr = logical2physical(&expr, &table_schema);
675        let pruning_predicate =
676            PruningPredicate::try_new(expr, table_schema.clone()).unwrap();
677
678        // Model a file schema's column order c2 then c1, which is the opposite
679        // of the table schema
680        let file_schema = Arc::new(Schema::new(vec![
681            Field::new("c2", DataType::Int32, false),
682            Field::new("c1", DataType::Int32, false),
683        ]));
684        let schema_descr = get_test_schema_descr(vec![
685            PrimitiveTypeField::new("c2", PhysicalType::INT32),
686            PrimitiveTypeField::new("c1", PhysicalType::INT32),
687        ]);
688        // rg1 has c2 less than zero, c1 greater than zero
689        let rgm1 = get_row_group_meta_data(
690            &schema_descr,
691            vec![
692                ParquetStatistics::int32(Some(-10), Some(-1), None, Some(0), false), // c2
693                ParquetStatistics::int32(Some(1), Some(10), None, Some(0), false),
694            ],
695        );
696        // rg1 has c2 greater than zero, c1 less than zero
697        let rgm2 = get_row_group_meta_data(
698            &schema_descr,
699            vec![
700                ParquetStatistics::int32(Some(1), Some(10), None, Some(0), false),
701                ParquetStatistics::int32(Some(-10), Some(-1), None, Some(0), false),
702            ],
703        );
704
705        let metrics = parquet_file_metrics();
706        let groups = &[rgm1, rgm2];
707        // the first row group should be left because c1 is greater than zero
708        // the second should be filtered out because c1 is less than zero
709        let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2));
710        row_groups.prune_by_statistics(
711            &file_schema,
712            &schema_descr,
713            groups,
714            &pruning_predicate,
715            &metrics,
716        );
717        assert_pruned(row_groups, ExpectedPruning::Some(vec![0]));
718    }
719
720    fn gen_row_group_meta_data_for_pruning_predicate() -> Vec<RowGroupMetaData> {
721        let schema_descr = get_test_schema_descr(vec![
722            PrimitiveTypeField::new("c1", PhysicalType::INT32),
723            PrimitiveTypeField::new("c2", PhysicalType::BOOLEAN),
724        ]);
725        let rgm1 = get_row_group_meta_data(
726            &schema_descr,
727            vec![
728                ParquetStatistics::int32(Some(1), Some(10), None, Some(0), false),
729                ParquetStatistics::boolean(Some(false), Some(true), None, Some(0), false),
730            ],
731        );
732        let rgm2 = get_row_group_meta_data(
733            &schema_descr,
734            vec![
735                ParquetStatistics::int32(Some(11), Some(20), None, Some(0), false),
736                ParquetStatistics::boolean(Some(false), Some(true), None, Some(1), false),
737            ],
738        );
739        vec![rgm1, rgm2]
740    }
741
742    #[test]
743    fn row_group_pruning_predicate_null_expr() {
744        use datafusion_expr::{col, lit};
745        // int > 1 and IsNull(bool) => c1_max > 1 and bool_null_count > 0
746        let schema = Arc::new(Schema::new(vec![
747            Field::new("c1", DataType::Int32, false),
748            Field::new("c2", DataType::Boolean, false),
749        ]));
750        let schema_descr = ArrowSchemaConverter::new().convert(&schema).unwrap();
751        let expr = col("c1").gt(lit(15)).and(col("c2").is_null());
752        let expr = logical2physical(&expr, &schema);
753        let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
754        let groups = gen_row_group_meta_data_for_pruning_predicate();
755
756        let metrics = parquet_file_metrics();
757        // First row group was filtered out because it contains no null value on "c2".
758        let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2));
759        row_groups.prune_by_statistics(
760            &schema,
761            &schema_descr,
762            &groups,
763            &pruning_predicate,
764            &metrics,
765        );
766        assert_pruned(row_groups, ExpectedPruning::Some(vec![1]));
767    }
768
769    #[test]
770    fn row_group_pruning_predicate_eq_null_expr() {
771        use datafusion_expr::{col, lit};
772        // test row group predicate with an unknown (Null) expr
773        //
774        // int > 1 and bool = NULL => c1_max > 1 and null
775        let schema = Arc::new(Schema::new(vec![
776            Field::new("c1", DataType::Int32, false),
777            Field::new("c2", DataType::Boolean, false),
778        ]));
779        let schema_descr = ArrowSchemaConverter::new().convert(&schema).unwrap();
780        let expr = col("c1")
781            .gt(lit(15))
782            .and(col("c2").eq(lit(ScalarValue::Boolean(None))));
783        let expr = logical2physical(&expr, &schema);
784        let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
785        let groups = gen_row_group_meta_data_for_pruning_predicate();
786
787        let metrics = parquet_file_metrics();
788        // bool = NULL always evaluates to NULL (and thus will not
789        // pass predicates. Ideally these should both be false
790        let mut row_groups =
791            RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(groups.len()));
792        row_groups.prune_by_statistics(
793            &schema,
794            &schema_descr,
795            &groups,
796            &pruning_predicate,
797            &metrics,
798        );
799        assert_pruned(row_groups, ExpectedPruning::Some(vec![1]));
800    }
801
802    #[test]
803    fn row_group_pruning_predicate_decimal_type() {
804        // For the decimal data type, parquet can use `INT32`, `INT64`, `BYTE_ARRAY`, `FIXED_LENGTH_BYTE_ARRAY` to
805        // store the data.
806        // In this case, construct four types of statistics to filtered with the decimal predication.
807
808        // INT32: c1 > 5, the c1 is decimal(9,2)
809        // The type of scalar value if decimal(9,2), don't need to do cast
810        let schema =
811            Arc::new(Schema::new(vec![Field::new("c1", Decimal128(9, 2), false)]));
812        let field = PrimitiveTypeField::new("c1", PhysicalType::INT32)
813            .with_logical_type(LogicalType::Decimal {
814                scale: 2,
815                precision: 9,
816            })
817            .with_scale(2)
818            .with_precision(9);
819        let schema_descr = get_test_schema_descr(vec![field]);
820        let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2)));
821        let expr = logical2physical(&expr, &schema);
822        let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
823        let rgm1 = get_row_group_meta_data(
824            &schema_descr,
825            // [1.00, 6.00]
826            // c1 > 5, this row group will be included in the results.
827            vec![ParquetStatistics::int32(
828                Some(100),
829                Some(600),
830                None,
831                Some(0),
832                false,
833            )],
834        );
835        let rgm2 = get_row_group_meta_data(
836            &schema_descr,
837            // [0.1, 0.2]
838            // c1 > 5, this row group will not be included in the results.
839            vec![ParquetStatistics::int32(
840                Some(10),
841                Some(20),
842                None,
843                Some(0),
844                false,
845            )],
846        );
847        let rgm3 = get_row_group_meta_data(
848            &schema_descr,
849            // [1, None]
850            // c1 > 5, this row group can not be filtered out, so will be included in the results.
851            vec![ParquetStatistics::int32(
852                Some(100),
853                None,
854                None,
855                Some(0),
856                false,
857            )],
858        );
859        let metrics = parquet_file_metrics();
860        let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(3));
861        row_groups.prune_by_statistics(
862            &schema,
863            &schema_descr,
864            &[rgm1, rgm2, rgm3],
865            &pruning_predicate,
866            &metrics,
867        );
868        assert_pruned(row_groups, ExpectedPruning::Some(vec![0, 2]));
869    }
870
871    #[test]
872    fn row_group_pruning_predicate_decimal_type2() {
873        // INT32: c1 > 5, but parquet decimal type has different precision or scale to arrow decimal
874        // The c1 type is decimal(9,0) in the parquet file, and the type of scalar is decimal(5,2).
875        // We should convert all type to the coercion type, which is decimal(11,2)
876        // The decimal of arrow is decimal(5,2), the decimal of parquet is decimal(9,0)
877        let schema =
878            Arc::new(Schema::new(vec![Field::new("c1", Decimal128(9, 0), false)]));
879
880        let field = PrimitiveTypeField::new("c1", PhysicalType::INT32)
881            .with_logical_type(LogicalType::Decimal {
882                scale: 0,
883                precision: 9,
884            })
885            .with_scale(0)
886            .with_precision(9);
887        let schema_descr = get_test_schema_descr(vec![field]);
888        let expr = cast(col("c1"), Decimal128(11, 2)).gt(cast(
889            lit(ScalarValue::Decimal128(Some(500), 5, 2)),
890            Decimal128(11, 2),
891        ));
892        let expr = logical2physical(&expr, &schema);
893        let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
894        let rgm1 = get_row_group_meta_data(
895            &schema_descr,
896            // [100, 600]
897            // c1 > 5, this row group will be included in the results.
898            vec![ParquetStatistics::int32(
899                Some(100),
900                Some(600),
901                None,
902                Some(0),
903                false,
904            )],
905        );
906        let rgm2 = get_row_group_meta_data(
907            &schema_descr,
908            // [10, 20]
909            // c1 > 5, this row group will be included in the results.
910            vec![ParquetStatistics::int32(
911                Some(10),
912                Some(20),
913                None,
914                Some(0),
915                false,
916            )],
917        );
918        let rgm3 = get_row_group_meta_data(
919            &schema_descr,
920            // [0, 2]
921            // c1 > 5, this row group will not be included in the results.
922            vec![ParquetStatistics::int32(
923                Some(0),
924                Some(2),
925                None,
926                Some(0),
927                false,
928            )],
929        );
930        let rgm4 = get_row_group_meta_data(
931            &schema_descr,
932            // [None, 2]
933            // c1 > 5, this row group will also not be included in the results
934            // (the min value is unknown, but the max value is 2, so no values can be greater than 5)
935            vec![ParquetStatistics::int32(
936                None,
937                Some(2),
938                None,
939                Some(0),
940                false,
941            )],
942        );
943        let rgm5 = get_row_group_meta_data(
944            &schema_descr,
945            // [2, None]
946            // c1 > 5, this row group must be included
947            // (the min value is 2, but the max value is unknown, so it may have values greater than 5)
948            vec![ParquetStatistics::int32(
949                Some(2),
950                None,
951                None,
952                Some(0),
953                false,
954            )],
955        );
956        let metrics = parquet_file_metrics();
957        let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(5));
958        row_groups.prune_by_statistics(
959            &schema,
960            &schema_descr,
961            &[rgm1, rgm2, rgm3, rgm4, rgm5],
962            &pruning_predicate,
963            &metrics,
964        );
965        assert_pruned(row_groups, ExpectedPruning::Some(vec![0, 1, 4]));
966    }
967    #[test]
968    fn row_group_pruning_predicate_decimal_type3() {
969        // INT64: c1 < 5, the c1 is decimal(18,2)
970        let schema = Arc::new(Schema::new(vec![Field::new(
971            "c1",
972            Decimal128(18, 2),
973            false,
974        )]));
975        let field = PrimitiveTypeField::new("c1", PhysicalType::INT64)
976            .with_logical_type(LogicalType::Decimal {
977                scale: 2,
978                precision: 18,
979            })
980            .with_scale(2)
981            .with_precision(18);
982        let schema_descr = get_test_schema_descr(vec![field]);
983        let expr = col("c1").lt(lit(ScalarValue::Decimal128(Some(500), 18, 2)));
984        let expr = logical2physical(&expr, &schema);
985        let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
986        let rgm1 = get_row_group_meta_data(
987            &schema_descr,
988            // [6.00, 8.00]
989            vec![ParquetStatistics::int32(
990                Some(600),
991                Some(800),
992                None,
993                Some(0),
994                false,
995            )],
996        );
997        let rgm2 = get_row_group_meta_data(
998            &schema_descr,
999            // [0.1, 0.2]
1000            vec![ParquetStatistics::int64(
1001                Some(10),
1002                Some(20),
1003                None,
1004                Some(0),
1005                false,
1006            )],
1007        );
1008        let rgm3 = get_row_group_meta_data(
1009            &schema_descr,
1010            // [0.1, 0.2]
1011            vec![ParquetStatistics::int64(None, None, None, Some(0), false)],
1012        );
1013        let metrics = parquet_file_metrics();
1014        let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(3));
1015        row_groups.prune_by_statistics(
1016            &schema,
1017            &schema_descr,
1018            &[rgm1, rgm2, rgm3],
1019            &pruning_predicate,
1020            &metrics,
1021        );
1022        assert_pruned(row_groups, ExpectedPruning::Some(vec![1, 2]));
1023    }
1024    #[test]
1025    fn row_group_pruning_predicate_decimal_type4() {
1026        // FIXED_LENGTH_BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2)
1027        // the type of parquet is decimal(18,2)
1028        let schema = Arc::new(Schema::new(vec![Field::new(
1029            "c1",
1030            Decimal128(18, 2),
1031            false,
1032        )]));
1033        let field = PrimitiveTypeField::new("c1", PhysicalType::FIXED_LEN_BYTE_ARRAY)
1034            .with_logical_type(LogicalType::Decimal {
1035                scale: 2,
1036                precision: 18,
1037            })
1038            .with_scale(2)
1039            .with_precision(18)
1040            .with_byte_len(16);
1041        let schema_descr = get_test_schema_descr(vec![field]);
1042        // cast the type of c1 to decimal(28,3)
1043        let left = cast(col("c1"), Decimal128(28, 3));
1044        let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
1045        let expr = logical2physical(&expr, &schema);
1046        let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
1047        // we must use the big-endian when encode the i128 to bytes or vec[u8].
1048        let rgm1 = get_row_group_meta_data(
1049            &schema_descr,
1050            vec![ParquetStatistics::fixed_len_byte_array(
1051                // 5.00
1052                Some(FixedLenByteArray::from(ByteArray::from(
1053                    500i128.to_be_bytes().to_vec(),
1054                ))),
1055                // 80.00
1056                Some(FixedLenByteArray::from(ByteArray::from(
1057                    8000i128.to_be_bytes().to_vec(),
1058                ))),
1059                None,
1060                Some(0),
1061                false,
1062            )],
1063        );
1064        let rgm2 = get_row_group_meta_data(
1065            &schema_descr,
1066            vec![ParquetStatistics::fixed_len_byte_array(
1067                // 5.00
1068                Some(FixedLenByteArray::from(ByteArray::from(
1069                    500i128.to_be_bytes().to_vec(),
1070                ))),
1071                // 200.00
1072                Some(FixedLenByteArray::from(ByteArray::from(
1073                    20000i128.to_be_bytes().to_vec(),
1074                ))),
1075                None,
1076                Some(0),
1077                false,
1078            )],
1079        );
1080
1081        let rgm3 = get_row_group_meta_data(
1082            &schema_descr,
1083            vec![ParquetStatistics::fixed_len_byte_array(
1084                None,
1085                None,
1086                None,
1087                Some(0),
1088                false,
1089            )],
1090        );
1091        let metrics = parquet_file_metrics();
1092        let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(3));
1093        row_groups.prune_by_statistics(
1094            &schema,
1095            &schema_descr,
1096            &[rgm1, rgm2, rgm3],
1097            &pruning_predicate,
1098            &metrics,
1099        );
1100        assert_pruned(row_groups, ExpectedPruning::Some(vec![1, 2]));
1101    }
1102    #[test]
1103    fn row_group_pruning_predicate_decimal_type5() {
1104        // BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2)
1105        // the type of parquet is decimal(18,2)
1106        let schema = Arc::new(Schema::new(vec![Field::new(
1107            "c1",
1108            Decimal128(18, 2),
1109            false,
1110        )]));
1111        let field = PrimitiveTypeField::new("c1", PhysicalType::BYTE_ARRAY)
1112            .with_logical_type(LogicalType::Decimal {
1113                scale: 2,
1114                precision: 18,
1115            })
1116            .with_scale(2)
1117            .with_precision(18)
1118            .with_byte_len(16);
1119        let schema_descr = get_test_schema_descr(vec![field]);
1120        // cast the type of c1 to decimal(28,3)
1121        let left = cast(col("c1"), Decimal128(28, 3));
1122        let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
1123        let expr = logical2physical(&expr, &schema);
1124        let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
1125        // we must use the big-endian when encode the i128 to bytes or vec[u8].
1126        let rgm1 = get_row_group_meta_data(
1127            &schema_descr,
1128            vec![ParquetStatistics::byte_array(
1129                // 5.00
1130                Some(ByteArray::from(500i128.to_be_bytes().to_vec())),
1131                // 80.00
1132                Some(ByteArray::from(8000i128.to_be_bytes().to_vec())),
1133                None,
1134                Some(0),
1135                false,
1136            )],
1137        );
1138        let rgm2 = get_row_group_meta_data(
1139            &schema_descr,
1140            vec![ParquetStatistics::byte_array(
1141                // 5.00
1142                Some(ByteArray::from(500i128.to_be_bytes().to_vec())),
1143                // 200.00
1144                Some(ByteArray::from(20000i128.to_be_bytes().to_vec())),
1145                None,
1146                Some(0),
1147                false,
1148            )],
1149        );
1150        let rgm3 = get_row_group_meta_data(
1151            &schema_descr,
1152            vec![ParquetStatistics::byte_array(
1153                None,
1154                None,
1155                None,
1156                Some(0),
1157                false,
1158            )],
1159        );
1160        let metrics = parquet_file_metrics();
1161        let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(3));
1162        row_groups.prune_by_statistics(
1163            &schema,
1164            &schema_descr,
1165            &[rgm1, rgm2, rgm3],
1166            &pruning_predicate,
1167            &metrics,
1168        );
1169        assert_pruned(row_groups, ExpectedPruning::Some(vec![1, 2]));
1170    }
1171
1172    fn get_row_group_meta_data(
1173        schema_descr: &SchemaDescPtr,
1174        column_statistics: Vec<ParquetStatistics>,
1175    ) -> RowGroupMetaData {
1176        let mut columns = vec![];
1177        let number_row = 1000;
1178        for (i, s) in column_statistics.iter().enumerate() {
1179            let column = ColumnChunkMetaData::builder(schema_descr.column(i))
1180                .set_statistics(s.clone())
1181                .set_num_values(number_row)
1182                .build()
1183                .unwrap();
1184            columns.push(column);
1185        }
1186        RowGroupMetaData::builder(schema_descr.clone())
1187            .set_num_rows(number_row)
1188            .set_total_byte_size(2000)
1189            .set_column_metadata(columns)
1190            .build()
1191            .unwrap()
1192    }
1193
1194    fn get_test_schema_descr(fields: Vec<PrimitiveTypeField>) -> SchemaDescPtr {
1195        use parquet::schema::types::Type as SchemaType;
1196        let schema_fields = fields
1197            .iter()
1198            .map(|field| {
1199                let mut builder =
1200                    SchemaType::primitive_type_builder(field.name, field.physical_ty);
1201                // add logical type for the parquet field
1202                if let Some(logical_type) = &field.logical_ty {
1203                    builder = builder.with_logical_type(Some(logical_type.clone()));
1204                }
1205                if let Some(precision) = field.precision {
1206                    builder = builder.with_precision(precision);
1207                }
1208                if let Some(scale) = field.scale {
1209                    builder = builder.with_scale(scale);
1210                }
1211                if let Some(byte_len) = field.byte_len {
1212                    builder = builder.with_length(byte_len);
1213                }
1214                Arc::new(builder.build().unwrap())
1215            })
1216            .collect::<Vec<_>>();
1217        let schema = SchemaType::group_type_builder("schema")
1218            .with_fields(schema_fields)
1219            .build()
1220            .unwrap();
1221
1222        Arc::new(SchemaDescriptor::new(Arc::new(schema)))
1223    }
1224
1225    fn parquet_file_metrics() -> ParquetFileMetrics {
1226        let metrics = Arc::new(ExecutionPlanMetricsSet::new());
1227        ParquetFileMetrics::new(0, "file.parquet", &metrics)
1228    }
1229
1230    #[tokio::test]
1231    async fn test_row_group_bloom_filter_pruning_predicate_simple_expr() {
1232        BloomFilterTest::new_data_index_bloom_encoding_stats()
1233            .with_expect_all_pruned()
1234            // generate pruning predicate `(String = "Hello_Not_exists")`
1235            .run(col(r#""String""#).eq(lit("Hello_Not_Exists")))
1236            .await
1237    }
1238
1239    #[tokio::test]
1240    async fn test_row_group_bloom_filter_pruning_predicate_multiple_expr() {
1241        BloomFilterTest::new_data_index_bloom_encoding_stats()
1242            .with_expect_all_pruned()
1243            // generate pruning predicate `(String = "Hello_Not_exists" OR String = "Hello_Not_exists2")`
1244            .run(
1245                lit("1").eq(lit("1")).and(
1246                    col(r#""String""#)
1247                        .eq(lit("Hello_Not_Exists"))
1248                        .or(col(r#""String""#).eq(lit("Hello_Not_Exists2"))),
1249                ),
1250            )
1251            .await
1252    }
1253
1254    #[tokio::test]
1255    async fn test_row_group_bloom_filter_pruning_predicate_multiple_expr_view() {
1256        BloomFilterTest::new_data_index_bloom_encoding_stats()
1257            .with_expect_all_pruned()
1258            // generate pruning predicate `(String = "Hello_Not_exists" OR String = "Hello_Not_exists2")`
1259            .run(
1260                lit("1").eq(lit("1")).and(
1261                    col(r#""String""#)
1262                        .eq(Expr::Literal(
1263                            ScalarValue::Utf8View(Some(String::from("Hello_Not_Exists"))),
1264                            None,
1265                        ))
1266                        .or(col(r#""String""#).eq(Expr::Literal(
1267                            ScalarValue::Utf8View(Some(String::from(
1268                                "Hello_Not_Exists2",
1269                            ))),
1270                            None,
1271                        ))),
1272                ),
1273            )
1274            .await
1275    }
1276
1277    #[tokio::test]
1278    async fn test_row_group_bloom_filter_pruning_predicate_sql_in() {
1279        // load parquet file
1280        let testdata = datafusion_common::test_util::parquet_test_data();
1281        let file_name = "data_index_bloom_encoding_stats.parquet";
1282        let path = format!("{testdata}/{file_name}");
1283        let data = bytes::Bytes::from(std::fs::read(path).unwrap());
1284
1285        // generate pruning predicate
1286        let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]);
1287
1288        let expr = col(r#""String""#).in_list(
1289            (1..25)
1290                .map(|i| lit(format!("Hello_Not_Exists{i}")))
1291                .collect::<Vec<_>>(),
1292            false,
1293        );
1294        let expr = logical2physical(&expr, &schema);
1295        let pruning_predicate =
1296            PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
1297
1298        let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate(
1299            file_name,
1300            data,
1301            &pruning_predicate,
1302        )
1303        .await
1304        .unwrap();
1305        assert!(pruned_row_groups.access_plan.row_group_indexes().is_empty());
1306    }
1307
1308    #[tokio::test]
1309    async fn test_row_group_bloom_filter_pruning_predicate_with_exists_value() {
1310        BloomFilterTest::new_data_index_bloom_encoding_stats()
1311            .with_expect_none_pruned()
1312            // generate pruning predicate `(String = "Hello")`
1313            .run(col(r#""String""#).eq(lit("Hello")))
1314            .await
1315    }
1316
1317    #[tokio::test]
1318    async fn test_row_group_bloom_filter_pruning_predicate_with_exists_2_values() {
1319        BloomFilterTest::new_data_index_bloom_encoding_stats()
1320            .with_expect_none_pruned()
1321            // generate pruning predicate `(String = "Hello") OR (String = "the quick")`
1322            .run(
1323                col(r#""String""#)
1324                    .eq(lit("Hello"))
1325                    .or(col(r#""String""#).eq(lit("the quick"))),
1326            )
1327            .await
1328    }
1329
1330    #[tokio::test]
1331    async fn test_row_group_bloom_filter_pruning_predicate_with_exists_3_values() {
1332        BloomFilterTest::new_data_index_bloom_encoding_stats()
1333            .with_expect_none_pruned()
1334            // generate pruning predicate `(String = "Hello") OR (String = "the quick") OR (String = "are you")`
1335            .run(
1336                col(r#""String""#)
1337                    .eq(lit("Hello"))
1338                    .or(col(r#""String""#).eq(lit("the quick")))
1339                    .or(col(r#""String""#).eq(lit("are you"))),
1340            )
1341            .await
1342    }
1343
1344    #[tokio::test]
1345    async fn test_row_group_bloom_filter_pruning_predicate_with_exists_3_values_view() {
1346        BloomFilterTest::new_data_index_bloom_encoding_stats()
1347            .with_expect_none_pruned()
1348            // generate pruning predicate `(String = "Hello") OR (String = "the quick") OR (String = "are you")`
1349            .run(
1350                col(r#""String""#)
1351                    .eq(Expr::Literal(
1352                        ScalarValue::Utf8View(Some(String::from("Hello"))),
1353                        None,
1354                    ))
1355                    .or(col(r#""String""#).eq(Expr::Literal(
1356                        ScalarValue::Utf8View(Some(String::from("the quick"))),
1357                        None,
1358                    )))
1359                    .or(col(r#""String""#).eq(Expr::Literal(
1360                        ScalarValue::Utf8View(Some(String::from("are you"))),
1361                        None,
1362                    ))),
1363            )
1364            .await
1365    }
1366
1367    #[tokio::test]
1368    async fn test_row_group_bloom_filter_pruning_predicate_with_or_not_eq() {
1369        BloomFilterTest::new_data_index_bloom_encoding_stats()
1370            .with_expect_none_pruned()
1371            // generate pruning predicate `(String = "foo") OR (String != "bar")`
1372            .run(
1373                col(r#""String""#)
1374                    .not_eq(lit("foo"))
1375                    .or(col(r#""String""#).not_eq(lit("bar"))),
1376            )
1377            .await
1378    }
1379
1380    #[tokio::test]
1381    async fn test_row_group_bloom_filter_pruning_predicate_without_bloom_filter() {
1382        // generate pruning predicate on a column without a bloom filter
1383        BloomFilterTest::new_all_types()
1384            .with_expect_none_pruned()
1385            .run(col(r#""string_col""#).eq(lit("0")))
1386            .await
1387    }
1388
1389    // What row groups are expected to be left after pruning
1390    #[derive(Debug)]
1391    enum ExpectedPruning {
1392        All,
1393        /// Only the specified row groups are expected to REMAIN (not what is pruned)
1394        Some(Vec<usize>),
1395        None,
1396    }
1397
1398    impl ExpectedPruning {
1399        /// asserts that the pruned row group match this expectation
1400        fn assert(&self, row_groups: &RowGroupAccessPlanFilter) {
1401            let num_row_groups = row_groups.access_plan.len();
1402            assert!(num_row_groups > 0);
1403            let num_pruned = (0..num_row_groups)
1404                .filter_map(|i| {
1405                    if row_groups.access_plan.should_scan(i) {
1406                        None
1407                    } else {
1408                        Some(1)
1409                    }
1410                })
1411                .sum::<usize>();
1412
1413            match self {
1414                Self::All => {
1415                    assert_eq!(
1416                        num_row_groups, num_pruned,
1417                        "Expected all row groups to be pruned, but got {row_groups:?}"
1418                    );
1419                }
1420                ExpectedPruning::None => {
1421                    assert_eq!(
1422                        num_pruned, 0,
1423                        "Expected no row groups to be pruned, but got {row_groups:?}"
1424                    );
1425                }
1426                ExpectedPruning::Some(expected) => {
1427                    let actual = row_groups.access_plan.row_group_indexes();
1428                    assert_eq!(expected, &actual, "Unexpected row groups pruned. Expected {expected:?}, got {actual:?}");
1429                }
1430            }
1431        }
1432    }
1433
1434    fn assert_pruned(row_groups: RowGroupAccessPlanFilter, expected: ExpectedPruning) {
1435        expected.assert(&row_groups);
1436    }
1437
1438    struct BloomFilterTest {
1439        file_name: String,
1440        schema: Schema,
1441        // which row groups are expected to be left after pruning
1442        post_pruning_row_groups: ExpectedPruning,
1443    }
1444
1445    impl BloomFilterTest {
1446        /// Return a test for data_index_bloom_encoding_stats.parquet
1447        /// Note the values in the `String` column are:
1448        /// ```sql
1449        /// > select * from './parquet-testing/data/data_index_bloom_encoding_stats.parquet';
1450        /// +-----------+
1451        /// | String    |
1452        /// +-----------+
1453        /// | Hello     |
1454        /// | This is   |
1455        /// | a         |
1456        /// | test      |
1457        /// | How       |
1458        /// | are you   |
1459        /// | doing     |
1460        /// | today     |
1461        /// | the quick |
1462        /// | brown fox |
1463        /// | jumps     |
1464        /// | over      |
1465        /// | the lazy  |
1466        /// | dog       |
1467        /// +-----------+
1468        /// ```
1469        fn new_data_index_bloom_encoding_stats() -> Self {
1470            Self {
1471                file_name: String::from("data_index_bloom_encoding_stats.parquet"),
1472                schema: Schema::new(vec![Field::new("String", DataType::Utf8, false)]),
1473                post_pruning_row_groups: ExpectedPruning::None,
1474            }
1475        }
1476
1477        // Return a test for alltypes_plain.parquet
1478        fn new_all_types() -> Self {
1479            Self {
1480                file_name: String::from("alltypes_plain.parquet"),
1481                schema: Schema::new(vec![Field::new(
1482                    "string_col",
1483                    DataType::Utf8,
1484                    false,
1485                )]),
1486                post_pruning_row_groups: ExpectedPruning::None,
1487            }
1488        }
1489
1490        /// Expect all row groups to be pruned
1491        pub fn with_expect_all_pruned(mut self) -> Self {
1492            self.post_pruning_row_groups = ExpectedPruning::All;
1493            self
1494        }
1495
1496        /// Expect all row groups not to be pruned
1497        pub fn with_expect_none_pruned(mut self) -> Self {
1498            self.post_pruning_row_groups = ExpectedPruning::None;
1499            self
1500        }
1501
1502        /// Prune this file using the specified expression and check that the expected row groups are left
1503        async fn run(self, expr: Expr) {
1504            let Self {
1505                file_name,
1506                schema,
1507                post_pruning_row_groups,
1508            } = self;
1509
1510            let testdata = datafusion_common::test_util::parquet_test_data();
1511            let path = format!("{testdata}/{file_name}");
1512            let data = bytes::Bytes::from(std::fs::read(path).unwrap());
1513
1514            let expr = logical2physical(&expr, &schema);
1515            let pruning_predicate =
1516                PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
1517
1518            let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate(
1519                &file_name,
1520                data,
1521                &pruning_predicate,
1522            )
1523            .await
1524            .unwrap();
1525
1526            post_pruning_row_groups.assert(&pruned_row_groups);
1527        }
1528    }
1529
1530    /// Evaluates the pruning predicate on the specified row groups and returns the row groups that are left
1531    async fn test_row_group_bloom_filter_pruning_predicate(
1532        file_name: &str,
1533        data: bytes::Bytes,
1534        pruning_predicate: &PruningPredicate,
1535    ) -> Result<RowGroupAccessPlanFilter> {
1536        use object_store::{ObjectMeta, ObjectStore};
1537
1538        let object_meta = ObjectMeta {
1539            location: object_store::path::Path::parse(file_name).expect("creating path"),
1540            last_modified: chrono::DateTime::from(std::time::SystemTime::now()),
1541            size: data.len() as u64,
1542            e_tag: None,
1543            version: None,
1544        };
1545        let in_memory = object_store::memory::InMemory::new();
1546        in_memory
1547            .put(&object_meta.location, data.into())
1548            .await
1549            .expect("put parquet file into in memory object store");
1550
1551        let metrics = ExecutionPlanMetricsSet::new();
1552        let file_metrics =
1553            ParquetFileMetrics::new(0, object_meta.location.as_ref(), &metrics);
1554        let inner = ParquetObjectReader::new(Arc::new(in_memory), object_meta.location)
1555            .with_file_size(object_meta.size);
1556
1557        let reader = ParquetFileReader {
1558            inner,
1559            file_metrics: file_metrics.clone(),
1560        };
1561        let mut builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
1562
1563        let access_plan = ParquetAccessPlan::new_all(builder.metadata().num_row_groups());
1564        let mut pruned_row_groups = RowGroupAccessPlanFilter::new(access_plan);
1565        pruned_row_groups
1566            .prune_by_bloom_filters(
1567                pruning_predicate.schema(),
1568                &mut builder,
1569                pruning_predicate,
1570                &file_metrics,
1571            )
1572            .await;
1573
1574        Ok(pruned_row_groups)
1575    }
1576}