1use std::collections::{HashMap, HashSet};
19use std::sync::Arc;
20
21use super::{ParquetAccessPlan, ParquetFileMetrics};
22use arrow::array::{ArrayRef, BooleanArray};
23use arrow::datatypes::Schema;
24use datafusion_common::pruning::PruningStatistics;
25use datafusion_common::{Column, Result, ScalarValue};
26use datafusion_datasource::FileRange;
27use datafusion_pruning::PruningPredicate;
28use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
29use parquet::arrow::parquet_column;
30use parquet::basic::Type;
31use parquet::data_type::Decimal;
32use parquet::schema::types::SchemaDescriptor;
33use parquet::{
34 arrow::{async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder},
35 bloom_filter::Sbbf,
36 file::metadata::RowGroupMetaData,
37};
38
39#[derive(Debug, Clone, PartialEq)]
46pub struct RowGroupAccessPlanFilter {
47 access_plan: ParquetAccessPlan,
49}
50
51impl RowGroupAccessPlanFilter {
52 pub fn new(access_plan: ParquetAccessPlan) -> Self {
55 Self { access_plan }
56 }
57
58 pub fn is_empty(&self) -> bool {
60 self.access_plan.is_empty()
61 }
62
63 pub fn remaining_row_group_count(&self) -> usize {
65 self.access_plan.row_group_index_iter().count()
66 }
67
68 pub fn build(self) -> ParquetAccessPlan {
70 self.access_plan
71 }
72
73 pub fn prune_by_range(&mut self, groups: &[RowGroupMetaData], range: &FileRange) {
80 assert_eq!(groups.len(), self.access_plan.len());
81 for (idx, metadata) in groups.iter().enumerate() {
82 if !self.access_plan.should_scan(idx) {
83 continue;
84 }
85
86 let col = metadata.column(0);
92 let offset = col
93 .dictionary_page_offset()
94 .unwrap_or_else(|| col.data_page_offset());
95 if !range.contains(offset) {
96 self.access_plan.skip(idx);
97 }
98 }
99 }
100 pub fn prune_by_statistics(
111 &mut self,
112 arrow_schema: &Schema,
113 parquet_schema: &SchemaDescriptor,
114 groups: &[RowGroupMetaData],
115 predicate: &PruningPredicate,
116 metrics: &ParquetFileMetrics,
117 ) {
118 let _timer_guard = metrics.statistics_eval_time.timer();
120
121 assert_eq!(groups.len(), self.access_plan.len());
122 let row_group_indexes = self.access_plan.row_group_indexes();
124 let row_group_metadatas = row_group_indexes
125 .iter()
126 .map(|&i| &groups[i])
127 .collect::<Vec<_>>();
128
129 let pruning_stats = RowGroupPruningStatistics {
130 parquet_schema,
131 row_group_metadatas,
132 arrow_schema,
133 };
134
135 match predicate.prune(&pruning_stats) {
137 Ok(values) => {
138 for (idx, &value) in row_group_indexes.iter().zip(values.iter()) {
140 if !value {
141 self.access_plan.skip(*idx);
142 metrics.row_groups_pruned_statistics.add_pruned(1);
143 } else {
144 metrics.row_groups_pruned_statistics.add_matched(1);
145 }
146 }
147 }
148 Err(e) => {
150 log::debug!("Error evaluating row group predicate values {e}");
151 metrics.predicate_evaluation_errors.add(1);
152 }
153 }
154 }
155
156 pub async fn prune_by_bloom_filters<T: AsyncFileReader + Send + 'static>(
164 &mut self,
165 arrow_schema: &Schema,
166 builder: &mut ParquetRecordBatchStreamBuilder<T>,
167 predicate: &PruningPredicate,
168 metrics: &ParquetFileMetrics,
169 ) {
170 let _timer_guard = metrics.bloom_filter_eval_time.timer();
172
173 assert_eq!(builder.metadata().num_row_groups(), self.access_plan.len());
174 for idx in 0..self.access_plan.len() {
175 if !self.access_plan.should_scan(idx) {
176 continue;
177 }
178
179 let literal_columns = predicate.literal_columns();
181 let mut column_sbbf = HashMap::with_capacity(literal_columns.len());
182
183 for column_name in literal_columns {
184 let Some((column_idx, _field)) =
185 parquet_column(builder.parquet_schema(), arrow_schema, &column_name)
186 else {
187 continue;
188 };
189
190 let bf = match builder
191 .get_row_group_column_bloom_filter(idx, column_idx)
192 .await
193 {
194 Ok(Some(bf)) => bf,
195 Ok(None) => continue, Err(e) => {
197 log::debug!("Ignoring error reading bloom filter: {e}");
198 metrics.predicate_evaluation_errors.add(1);
199 continue;
200 }
201 };
202 let physical_type =
203 builder.parquet_schema().column(column_idx).physical_type();
204
205 column_sbbf.insert(column_name.to_string(), (bf, physical_type));
206 }
207
208 let stats = BloomFilterStatistics { column_sbbf };
209
210 let prune_group = match predicate.prune(&stats) {
212 Ok(values) => !values[0],
213 Err(e) => {
214 log::debug!(
215 "Error evaluating row group predicate on bloom filter: {e}"
216 );
217 metrics.predicate_evaluation_errors.add(1);
218 false
219 }
220 };
221
222 if prune_group {
223 metrics.row_groups_pruned_bloom_filter.add_pruned(1);
224 self.access_plan.skip(idx)
225 } else {
226 metrics.row_groups_pruned_bloom_filter.add_matched(1);
227 }
228 }
229 }
230}
231struct BloomFilterStatistics {
233 column_sbbf: HashMap<String, (Sbbf, Type)>,
235}
236
237impl BloomFilterStatistics {
238 fn check_scalar(sbbf: &Sbbf, value: &ScalarValue, parquet_type: &Type) -> bool {
243 match value {
244 ScalarValue::Utf8(Some(v))
245 | ScalarValue::Utf8View(Some(v))
246 | ScalarValue::LargeUtf8(Some(v)) => sbbf.check(&v.as_str()),
247 ScalarValue::Binary(Some(v))
248 | ScalarValue::BinaryView(Some(v))
249 | ScalarValue::LargeBinary(Some(v)) => sbbf.check(v),
250 ScalarValue::FixedSizeBinary(_size, Some(v)) => sbbf.check(v),
251 ScalarValue::Boolean(Some(v)) => sbbf.check(v),
252 ScalarValue::Float64(Some(v)) => sbbf.check(v),
253 ScalarValue::Float32(Some(v)) => sbbf.check(v),
254 ScalarValue::Int64(Some(v)) => sbbf.check(v),
255 ScalarValue::Int32(Some(v)) => sbbf.check(v),
256 ScalarValue::UInt64(Some(v)) => sbbf.check(v),
257 ScalarValue::UInt32(Some(v)) => sbbf.check(v),
258 ScalarValue::Decimal128(Some(v), p, s) => match parquet_type {
259 Type::INT32 => {
260 if *p > 9 {
263 return true;
268 }
269 let b = (*v as i32).to_le_bytes();
270 let decimal = Decimal::Int32 {
272 value: b,
273 precision: *p as i32,
274 scale: *s as i32,
275 };
276 sbbf.check(&decimal)
277 }
278 Type::INT64 => {
279 if *p > 18 {
280 return true;
281 }
282 let b = (*v as i64).to_le_bytes();
283 let decimal = Decimal::Int64 {
284 value: b,
285 precision: *p as i32,
286 scale: *s as i32,
287 };
288 sbbf.check(&decimal)
289 }
290 Type::FIXED_LEN_BYTE_ARRAY => {
291 let b = v.to_be_bytes().to_vec();
293 let decimal = Decimal::Bytes {
295 value: b.into(),
296 precision: *p as i32,
297 scale: *s as i32,
298 };
299 sbbf.check(&decimal)
300 }
301 _ => true,
302 },
303 ScalarValue::Dictionary(_, inner) => {
304 BloomFilterStatistics::check_scalar(sbbf, inner, parquet_type)
305 }
306 _ => true,
307 }
308 }
309}
310
311impl PruningStatistics for BloomFilterStatistics {
312 fn min_values(&self, _column: &Column) -> Option<ArrayRef> {
313 None
314 }
315
316 fn max_values(&self, _column: &Column) -> Option<ArrayRef> {
317 None
318 }
319
320 fn num_containers(&self) -> usize {
321 1
322 }
323
324 fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
325 None
326 }
327
328 fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
329 None
330 }
331
332 fn contained(
338 &self,
339 column: &Column,
340 values: &HashSet<ScalarValue>,
341 ) -> Option<BooleanArray> {
342 let (sbbf, parquet_type) = self.column_sbbf.get(column.name.as_str())?;
343
344 let known_not_present = values
350 .iter()
351 .map(|value| BloomFilterStatistics::check_scalar(sbbf, value, parquet_type))
352 .all(|v| !v);
355
356 let contains = if known_not_present {
357 Some(false)
358 } else {
359 None
363 };
364
365 Some(BooleanArray::from(vec![contains]))
366 }
367}
368
369struct RowGroupPruningStatistics<'a> {
371 parquet_schema: &'a SchemaDescriptor,
372 row_group_metadatas: Vec<&'a RowGroupMetaData>,
373 arrow_schema: &'a Schema,
374}
375
376impl<'a> RowGroupPruningStatistics<'a> {
377 fn metadata_iter(&'a self) -> impl Iterator<Item = &'a RowGroupMetaData> + 'a {
379 self.row_group_metadatas.iter().copied()
380 }
381
382 fn statistics_converter<'b>(
383 &'a self,
384 column: &'b Column,
385 ) -> Result<StatisticsConverter<'a>> {
386 Ok(StatisticsConverter::try_new(
387 &column.name,
388 self.arrow_schema,
389 self.parquet_schema,
390 )?)
391 }
392}
393
394impl PruningStatistics for RowGroupPruningStatistics<'_> {
395 fn min_values(&self, column: &Column) -> Option<ArrayRef> {
396 self.statistics_converter(column)
397 .and_then(|c| Ok(c.row_group_mins(self.metadata_iter())?))
398 .ok()
399 }
400
401 fn max_values(&self, column: &Column) -> Option<ArrayRef> {
402 self.statistics_converter(column)
403 .and_then(|c| Ok(c.row_group_maxes(self.metadata_iter())?))
404 .ok()
405 }
406
407 fn num_containers(&self) -> usize {
408 self.row_group_metadatas.len()
409 }
410
411 fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
412 self.statistics_converter(column)
413 .and_then(|c| Ok(c.row_group_null_counts(self.metadata_iter())?))
414 .ok()
415 .map(|counts| Arc::new(counts) as ArrayRef)
416 }
417
418 fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
419 self.statistics_converter(column)
421 .and_then(|c| Ok(c.row_group_row_counts(self.metadata_iter())?))
422 .ok()
423 .flatten()
424 .map(|counts| Arc::new(counts) as ArrayRef)
425 }
426
427 fn contained(
428 &self,
429 _column: &Column,
430 _values: &HashSet<ScalarValue>,
431 ) -> Option<BooleanArray> {
432 None
433 }
434}
435
436#[cfg(test)]
437mod tests {
438 use std::ops::Rem;
439 use std::sync::Arc;
440
441 use super::*;
442 use crate::reader::ParquetFileReader;
443
444 use arrow::datatypes::DataType::Decimal128;
445 use arrow::datatypes::{DataType, Field};
446 use datafusion_common::Result;
447 use datafusion_expr::{cast, col, lit, Expr};
448 use datafusion_physical_expr::planner::logical2physical;
449 use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
450 use parquet::arrow::async_reader::ParquetObjectReader;
451 use parquet::arrow::ArrowSchemaConverter;
452 use parquet::basic::LogicalType;
453 use parquet::data_type::{ByteArray, FixedLenByteArray};
454 use parquet::file::metadata::ColumnChunkMetaData;
455 use parquet::{
456 basic::Type as PhysicalType, file::statistics::Statistics as ParquetStatistics,
457 schema::types::SchemaDescPtr,
458 };
459
460 struct PrimitiveTypeField {
461 name: &'static str,
462 physical_ty: PhysicalType,
463 logical_ty: Option<LogicalType>,
464 precision: Option<i32>,
465 scale: Option<i32>,
466 byte_len: Option<i32>,
467 }
468
469 impl PrimitiveTypeField {
470 fn new(name: &'static str, physical_ty: PhysicalType) -> Self {
471 Self {
472 name,
473 physical_ty,
474 logical_ty: None,
475 precision: None,
476 scale: None,
477 byte_len: None,
478 }
479 }
480
481 fn with_logical_type(mut self, logical_type: LogicalType) -> Self {
482 self.logical_ty = Some(logical_type);
483 self
484 }
485
486 fn with_precision(mut self, precision: i32) -> Self {
487 self.precision = Some(precision);
488 self
489 }
490
491 fn with_scale(mut self, scale: i32) -> Self {
492 self.scale = Some(scale);
493 self
494 }
495
496 fn with_byte_len(mut self, byte_len: i32) -> Self {
497 self.byte_len = Some(byte_len);
498 self
499 }
500 }
501
502 #[test]
503 fn remaining_row_group_count_reports_non_skipped_groups() {
504 let mut filter = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(4));
505 assert_eq!(filter.remaining_row_group_count(), 4);
506
507 filter.access_plan.skip(1);
508 assert_eq!(filter.remaining_row_group_count(), 3);
509
510 filter.access_plan.skip(3);
511 assert_eq!(filter.remaining_row_group_count(), 2);
512 }
513
514 #[test]
515 fn row_group_pruning_predicate_simple_expr() {
516 use datafusion_expr::{col, lit};
517 let schema =
519 Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
520 let expr = col("c1").gt(lit(15));
521 let expr = logical2physical(&expr, &schema);
522 let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
523
524 let field = PrimitiveTypeField::new("c1", PhysicalType::INT32);
525 let schema_descr = get_test_schema_descr(vec![field]);
526 let rgm1 = get_row_group_meta_data(
527 &schema_descr,
528 vec![ParquetStatistics::int32(
529 Some(1),
530 Some(10),
531 None,
532 Some(0),
533 false,
534 )],
535 );
536 let rgm2 = get_row_group_meta_data(
537 &schema_descr,
538 vec![ParquetStatistics::int32(
539 Some(11),
540 Some(20),
541 None,
542 Some(0),
543 false,
544 )],
545 );
546
547 let metrics = parquet_file_metrics();
548 let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2));
549 row_groups.prune_by_statistics(
550 &schema,
551 &schema_descr,
552 &[rgm1, rgm2],
553 &pruning_predicate,
554 &metrics,
555 );
556 assert_pruned(row_groups, ExpectedPruning::Some(vec![1]))
557 }
558
559 #[test]
560 fn row_group_pruning_predicate_missing_stats() {
561 use datafusion_expr::{col, lit};
562 let schema =
564 Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
565 let expr = col("c1").gt(lit(15));
566 let expr = logical2physical(&expr, &schema);
567 let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
568
569 let field = PrimitiveTypeField::new("c1", PhysicalType::INT32);
570 let schema_descr = get_test_schema_descr(vec![field]);
571 let rgm1 = get_row_group_meta_data(
572 &schema_descr,
573 vec![ParquetStatistics::int32(None, None, None, Some(0), false)],
574 );
575 let rgm2 = get_row_group_meta_data(
576 &schema_descr,
577 vec![ParquetStatistics::int32(
578 Some(11),
579 Some(20),
580 None,
581 Some(0),
582 false,
583 )],
584 );
585 let metrics = parquet_file_metrics();
586 let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2));
589 row_groups.prune_by_statistics(
590 &schema,
591 &schema_descr,
592 &[rgm1, rgm2],
593 &pruning_predicate,
594 &metrics,
595 );
596 assert_pruned(row_groups, ExpectedPruning::None);
597 }
598
599 #[test]
600 fn row_group_pruning_predicate_partial_expr() {
601 use datafusion_expr::{col, lit};
602 let schema = Arc::new(Schema::new(vec![
605 Field::new("c1", DataType::Int32, false),
606 Field::new("c2", DataType::Int32, false),
607 ]));
608 let expr = col("c1").gt(lit(15)).and(col("c2").rem(lit(2)).eq(lit(0)));
609 let expr = logical2physical(&expr, &schema);
610 let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
611
612 let schema_descr = get_test_schema_descr(vec![
613 PrimitiveTypeField::new("c1", PhysicalType::INT32),
614 PrimitiveTypeField::new("c2", PhysicalType::INT32),
615 ]);
616 let rgm1 = get_row_group_meta_data(
617 &schema_descr,
618 vec![
619 ParquetStatistics::int32(Some(1), Some(10), None, Some(0), false),
620 ParquetStatistics::int32(Some(1), Some(10), None, Some(0), false),
621 ],
622 );
623 let rgm2 = get_row_group_meta_data(
624 &schema_descr,
625 vec![
626 ParquetStatistics::int32(Some(11), Some(20), None, Some(0), false),
627 ParquetStatistics::int32(Some(11), Some(20), None, Some(0), false),
628 ],
629 );
630
631 let metrics = parquet_file_metrics();
632 let groups = &[rgm1, rgm2];
633 let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2));
636 row_groups.prune_by_statistics(
637 &schema,
638 &schema_descr,
639 groups,
640 &pruning_predicate,
641 &metrics,
642 );
643 assert_pruned(row_groups, ExpectedPruning::Some(vec![1]));
644
645 let expr = col("c1").gt(lit(15)).or(col("c2").rem(lit(2)).eq(lit(0)));
648 let expr = logical2physical(&expr, &schema);
649 let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
650
651 let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2));
654 row_groups.prune_by_statistics(
655 &schema,
656 &schema_descr,
657 groups,
658 &pruning_predicate,
659 &metrics,
660 );
661 assert_pruned(row_groups, ExpectedPruning::None);
662 }
663
664 #[test]
665 fn row_group_pruning_predicate_file_schema() {
666 use datafusion_expr::{col, lit};
667 let table_schema = Arc::new(Schema::new(vec![
670 Field::new("c1", DataType::Int32, false),
671 Field::new("c2", DataType::Int32, false),
672 ]));
673 let expr = col("c1").gt(lit(0));
674 let expr = logical2physical(&expr, &table_schema);
675 let pruning_predicate =
676 PruningPredicate::try_new(expr, table_schema.clone()).unwrap();
677
678 let file_schema = Arc::new(Schema::new(vec![
681 Field::new("c2", DataType::Int32, false),
682 Field::new("c1", DataType::Int32, false),
683 ]));
684 let schema_descr = get_test_schema_descr(vec![
685 PrimitiveTypeField::new("c2", PhysicalType::INT32),
686 PrimitiveTypeField::new("c1", PhysicalType::INT32),
687 ]);
688 let rgm1 = get_row_group_meta_data(
690 &schema_descr,
691 vec![
692 ParquetStatistics::int32(Some(-10), Some(-1), None, Some(0), false), ParquetStatistics::int32(Some(1), Some(10), None, Some(0), false),
694 ],
695 );
696 let rgm2 = get_row_group_meta_data(
698 &schema_descr,
699 vec![
700 ParquetStatistics::int32(Some(1), Some(10), None, Some(0), false),
701 ParquetStatistics::int32(Some(-10), Some(-1), None, Some(0), false),
702 ],
703 );
704
705 let metrics = parquet_file_metrics();
706 let groups = &[rgm1, rgm2];
707 let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2));
710 row_groups.prune_by_statistics(
711 &file_schema,
712 &schema_descr,
713 groups,
714 &pruning_predicate,
715 &metrics,
716 );
717 assert_pruned(row_groups, ExpectedPruning::Some(vec![0]));
718 }
719
720 fn gen_row_group_meta_data_for_pruning_predicate() -> Vec<RowGroupMetaData> {
721 let schema_descr = get_test_schema_descr(vec![
722 PrimitiveTypeField::new("c1", PhysicalType::INT32),
723 PrimitiveTypeField::new("c2", PhysicalType::BOOLEAN),
724 ]);
725 let rgm1 = get_row_group_meta_data(
726 &schema_descr,
727 vec![
728 ParquetStatistics::int32(Some(1), Some(10), None, Some(0), false),
729 ParquetStatistics::boolean(Some(false), Some(true), None, Some(0), false),
730 ],
731 );
732 let rgm2 = get_row_group_meta_data(
733 &schema_descr,
734 vec![
735 ParquetStatistics::int32(Some(11), Some(20), None, Some(0), false),
736 ParquetStatistics::boolean(Some(false), Some(true), None, Some(1), false),
737 ],
738 );
739 vec![rgm1, rgm2]
740 }
741
742 #[test]
743 fn row_group_pruning_predicate_null_expr() {
744 use datafusion_expr::{col, lit};
745 let schema = Arc::new(Schema::new(vec![
747 Field::new("c1", DataType::Int32, false),
748 Field::new("c2", DataType::Boolean, false),
749 ]));
750 let schema_descr = ArrowSchemaConverter::new().convert(&schema).unwrap();
751 let expr = col("c1").gt(lit(15)).and(col("c2").is_null());
752 let expr = logical2physical(&expr, &schema);
753 let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
754 let groups = gen_row_group_meta_data_for_pruning_predicate();
755
756 let metrics = parquet_file_metrics();
757 let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(2));
759 row_groups.prune_by_statistics(
760 &schema,
761 &schema_descr,
762 &groups,
763 &pruning_predicate,
764 &metrics,
765 );
766 assert_pruned(row_groups, ExpectedPruning::Some(vec![1]));
767 }
768
769 #[test]
770 fn row_group_pruning_predicate_eq_null_expr() {
771 use datafusion_expr::{col, lit};
772 let schema = Arc::new(Schema::new(vec![
776 Field::new("c1", DataType::Int32, false),
777 Field::new("c2", DataType::Boolean, false),
778 ]));
779 let schema_descr = ArrowSchemaConverter::new().convert(&schema).unwrap();
780 let expr = col("c1")
781 .gt(lit(15))
782 .and(col("c2").eq(lit(ScalarValue::Boolean(None))));
783 let expr = logical2physical(&expr, &schema);
784 let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
785 let groups = gen_row_group_meta_data_for_pruning_predicate();
786
787 let metrics = parquet_file_metrics();
788 let mut row_groups =
791 RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(groups.len()));
792 row_groups.prune_by_statistics(
793 &schema,
794 &schema_descr,
795 &groups,
796 &pruning_predicate,
797 &metrics,
798 );
799 assert_pruned(row_groups, ExpectedPruning::Some(vec![1]));
800 }
801
802 #[test]
803 fn row_group_pruning_predicate_decimal_type() {
804 let schema =
811 Arc::new(Schema::new(vec![Field::new("c1", Decimal128(9, 2), false)]));
812 let field = PrimitiveTypeField::new("c1", PhysicalType::INT32)
813 .with_logical_type(LogicalType::Decimal {
814 scale: 2,
815 precision: 9,
816 })
817 .with_scale(2)
818 .with_precision(9);
819 let schema_descr = get_test_schema_descr(vec![field]);
820 let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2)));
821 let expr = logical2physical(&expr, &schema);
822 let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
823 let rgm1 = get_row_group_meta_data(
824 &schema_descr,
825 vec![ParquetStatistics::int32(
828 Some(100),
829 Some(600),
830 None,
831 Some(0),
832 false,
833 )],
834 );
835 let rgm2 = get_row_group_meta_data(
836 &schema_descr,
837 vec![ParquetStatistics::int32(
840 Some(10),
841 Some(20),
842 None,
843 Some(0),
844 false,
845 )],
846 );
847 let rgm3 = get_row_group_meta_data(
848 &schema_descr,
849 vec![ParquetStatistics::int32(
852 Some(100),
853 None,
854 None,
855 Some(0),
856 false,
857 )],
858 );
859 let metrics = parquet_file_metrics();
860 let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(3));
861 row_groups.prune_by_statistics(
862 &schema,
863 &schema_descr,
864 &[rgm1, rgm2, rgm3],
865 &pruning_predicate,
866 &metrics,
867 );
868 assert_pruned(row_groups, ExpectedPruning::Some(vec![0, 2]));
869 }
870
871 #[test]
872 fn row_group_pruning_predicate_decimal_type2() {
873 let schema =
878 Arc::new(Schema::new(vec![Field::new("c1", Decimal128(9, 0), false)]));
879
880 let field = PrimitiveTypeField::new("c1", PhysicalType::INT32)
881 .with_logical_type(LogicalType::Decimal {
882 scale: 0,
883 precision: 9,
884 })
885 .with_scale(0)
886 .with_precision(9);
887 let schema_descr = get_test_schema_descr(vec![field]);
888 let expr = cast(col("c1"), Decimal128(11, 2)).gt(cast(
889 lit(ScalarValue::Decimal128(Some(500), 5, 2)),
890 Decimal128(11, 2),
891 ));
892 let expr = logical2physical(&expr, &schema);
893 let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
894 let rgm1 = get_row_group_meta_data(
895 &schema_descr,
896 vec![ParquetStatistics::int32(
899 Some(100),
900 Some(600),
901 None,
902 Some(0),
903 false,
904 )],
905 );
906 let rgm2 = get_row_group_meta_data(
907 &schema_descr,
908 vec![ParquetStatistics::int32(
911 Some(10),
912 Some(20),
913 None,
914 Some(0),
915 false,
916 )],
917 );
918 let rgm3 = get_row_group_meta_data(
919 &schema_descr,
920 vec![ParquetStatistics::int32(
923 Some(0),
924 Some(2),
925 None,
926 Some(0),
927 false,
928 )],
929 );
930 let rgm4 = get_row_group_meta_data(
931 &schema_descr,
932 vec![ParquetStatistics::int32(
936 None,
937 Some(2),
938 None,
939 Some(0),
940 false,
941 )],
942 );
943 let rgm5 = get_row_group_meta_data(
944 &schema_descr,
945 vec![ParquetStatistics::int32(
949 Some(2),
950 None,
951 None,
952 Some(0),
953 false,
954 )],
955 );
956 let metrics = parquet_file_metrics();
957 let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(5));
958 row_groups.prune_by_statistics(
959 &schema,
960 &schema_descr,
961 &[rgm1, rgm2, rgm3, rgm4, rgm5],
962 &pruning_predicate,
963 &metrics,
964 );
965 assert_pruned(row_groups, ExpectedPruning::Some(vec![0, 1, 4]));
966 }
967 #[test]
968 fn row_group_pruning_predicate_decimal_type3() {
969 let schema = Arc::new(Schema::new(vec![Field::new(
971 "c1",
972 Decimal128(18, 2),
973 false,
974 )]));
975 let field = PrimitiveTypeField::new("c1", PhysicalType::INT64)
976 .with_logical_type(LogicalType::Decimal {
977 scale: 2,
978 precision: 18,
979 })
980 .with_scale(2)
981 .with_precision(18);
982 let schema_descr = get_test_schema_descr(vec![field]);
983 let expr = col("c1").lt(lit(ScalarValue::Decimal128(Some(500), 18, 2)));
984 let expr = logical2physical(&expr, &schema);
985 let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
986 let rgm1 = get_row_group_meta_data(
987 &schema_descr,
988 vec![ParquetStatistics::int32(
990 Some(600),
991 Some(800),
992 None,
993 Some(0),
994 false,
995 )],
996 );
997 let rgm2 = get_row_group_meta_data(
998 &schema_descr,
999 vec![ParquetStatistics::int64(
1001 Some(10),
1002 Some(20),
1003 None,
1004 Some(0),
1005 false,
1006 )],
1007 );
1008 let rgm3 = get_row_group_meta_data(
1009 &schema_descr,
1010 vec![ParquetStatistics::int64(None, None, None, Some(0), false)],
1012 );
1013 let metrics = parquet_file_metrics();
1014 let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(3));
1015 row_groups.prune_by_statistics(
1016 &schema,
1017 &schema_descr,
1018 &[rgm1, rgm2, rgm3],
1019 &pruning_predicate,
1020 &metrics,
1021 );
1022 assert_pruned(row_groups, ExpectedPruning::Some(vec![1, 2]));
1023 }
1024 #[test]
1025 fn row_group_pruning_predicate_decimal_type4() {
1026 let schema = Arc::new(Schema::new(vec![Field::new(
1029 "c1",
1030 Decimal128(18, 2),
1031 false,
1032 )]));
1033 let field = PrimitiveTypeField::new("c1", PhysicalType::FIXED_LEN_BYTE_ARRAY)
1034 .with_logical_type(LogicalType::Decimal {
1035 scale: 2,
1036 precision: 18,
1037 })
1038 .with_scale(2)
1039 .with_precision(18)
1040 .with_byte_len(16);
1041 let schema_descr = get_test_schema_descr(vec![field]);
1042 let left = cast(col("c1"), Decimal128(28, 3));
1044 let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
1045 let expr = logical2physical(&expr, &schema);
1046 let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
1047 let rgm1 = get_row_group_meta_data(
1049 &schema_descr,
1050 vec![ParquetStatistics::fixed_len_byte_array(
1051 Some(FixedLenByteArray::from(ByteArray::from(
1053 500i128.to_be_bytes().to_vec(),
1054 ))),
1055 Some(FixedLenByteArray::from(ByteArray::from(
1057 8000i128.to_be_bytes().to_vec(),
1058 ))),
1059 None,
1060 Some(0),
1061 false,
1062 )],
1063 );
1064 let rgm2 = get_row_group_meta_data(
1065 &schema_descr,
1066 vec![ParquetStatistics::fixed_len_byte_array(
1067 Some(FixedLenByteArray::from(ByteArray::from(
1069 500i128.to_be_bytes().to_vec(),
1070 ))),
1071 Some(FixedLenByteArray::from(ByteArray::from(
1073 20000i128.to_be_bytes().to_vec(),
1074 ))),
1075 None,
1076 Some(0),
1077 false,
1078 )],
1079 );
1080
1081 let rgm3 = get_row_group_meta_data(
1082 &schema_descr,
1083 vec![ParquetStatistics::fixed_len_byte_array(
1084 None,
1085 None,
1086 None,
1087 Some(0),
1088 false,
1089 )],
1090 );
1091 let metrics = parquet_file_metrics();
1092 let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(3));
1093 row_groups.prune_by_statistics(
1094 &schema,
1095 &schema_descr,
1096 &[rgm1, rgm2, rgm3],
1097 &pruning_predicate,
1098 &metrics,
1099 );
1100 assert_pruned(row_groups, ExpectedPruning::Some(vec![1, 2]));
1101 }
1102 #[test]
1103 fn row_group_pruning_predicate_decimal_type5() {
1104 let schema = Arc::new(Schema::new(vec![Field::new(
1107 "c1",
1108 Decimal128(18, 2),
1109 false,
1110 )]));
1111 let field = PrimitiveTypeField::new("c1", PhysicalType::BYTE_ARRAY)
1112 .with_logical_type(LogicalType::Decimal {
1113 scale: 2,
1114 precision: 18,
1115 })
1116 .with_scale(2)
1117 .with_precision(18)
1118 .with_byte_len(16);
1119 let schema_descr = get_test_schema_descr(vec![field]);
1120 let left = cast(col("c1"), Decimal128(28, 3));
1122 let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
1123 let expr = logical2physical(&expr, &schema);
1124 let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
1125 let rgm1 = get_row_group_meta_data(
1127 &schema_descr,
1128 vec![ParquetStatistics::byte_array(
1129 Some(ByteArray::from(500i128.to_be_bytes().to_vec())),
1131 Some(ByteArray::from(8000i128.to_be_bytes().to_vec())),
1133 None,
1134 Some(0),
1135 false,
1136 )],
1137 );
1138 let rgm2 = get_row_group_meta_data(
1139 &schema_descr,
1140 vec![ParquetStatistics::byte_array(
1141 Some(ByteArray::from(500i128.to_be_bytes().to_vec())),
1143 Some(ByteArray::from(20000i128.to_be_bytes().to_vec())),
1145 None,
1146 Some(0),
1147 false,
1148 )],
1149 );
1150 let rgm3 = get_row_group_meta_data(
1151 &schema_descr,
1152 vec![ParquetStatistics::byte_array(
1153 None,
1154 None,
1155 None,
1156 Some(0),
1157 false,
1158 )],
1159 );
1160 let metrics = parquet_file_metrics();
1161 let mut row_groups = RowGroupAccessPlanFilter::new(ParquetAccessPlan::new_all(3));
1162 row_groups.prune_by_statistics(
1163 &schema,
1164 &schema_descr,
1165 &[rgm1, rgm2, rgm3],
1166 &pruning_predicate,
1167 &metrics,
1168 );
1169 assert_pruned(row_groups, ExpectedPruning::Some(vec![1, 2]));
1170 }
1171
1172 fn get_row_group_meta_data(
1173 schema_descr: &SchemaDescPtr,
1174 column_statistics: Vec<ParquetStatistics>,
1175 ) -> RowGroupMetaData {
1176 let mut columns = vec![];
1177 let number_row = 1000;
1178 for (i, s) in column_statistics.iter().enumerate() {
1179 let column = ColumnChunkMetaData::builder(schema_descr.column(i))
1180 .set_statistics(s.clone())
1181 .set_num_values(number_row)
1182 .build()
1183 .unwrap();
1184 columns.push(column);
1185 }
1186 RowGroupMetaData::builder(schema_descr.clone())
1187 .set_num_rows(number_row)
1188 .set_total_byte_size(2000)
1189 .set_column_metadata(columns)
1190 .build()
1191 .unwrap()
1192 }
1193
1194 fn get_test_schema_descr(fields: Vec<PrimitiveTypeField>) -> SchemaDescPtr {
1195 use parquet::schema::types::Type as SchemaType;
1196 let schema_fields = fields
1197 .iter()
1198 .map(|field| {
1199 let mut builder =
1200 SchemaType::primitive_type_builder(field.name, field.physical_ty);
1201 if let Some(logical_type) = &field.logical_ty {
1203 builder = builder.with_logical_type(Some(logical_type.clone()));
1204 }
1205 if let Some(precision) = field.precision {
1206 builder = builder.with_precision(precision);
1207 }
1208 if let Some(scale) = field.scale {
1209 builder = builder.with_scale(scale);
1210 }
1211 if let Some(byte_len) = field.byte_len {
1212 builder = builder.with_length(byte_len);
1213 }
1214 Arc::new(builder.build().unwrap())
1215 })
1216 .collect::<Vec<_>>();
1217 let schema = SchemaType::group_type_builder("schema")
1218 .with_fields(schema_fields)
1219 .build()
1220 .unwrap();
1221
1222 Arc::new(SchemaDescriptor::new(Arc::new(schema)))
1223 }
1224
1225 fn parquet_file_metrics() -> ParquetFileMetrics {
1226 let metrics = Arc::new(ExecutionPlanMetricsSet::new());
1227 ParquetFileMetrics::new(0, "file.parquet", &metrics)
1228 }
1229
1230 #[tokio::test]
1231 async fn test_row_group_bloom_filter_pruning_predicate_simple_expr() {
1232 BloomFilterTest::new_data_index_bloom_encoding_stats()
1233 .with_expect_all_pruned()
1234 .run(col(r#""String""#).eq(lit("Hello_Not_Exists")))
1236 .await
1237 }
1238
1239 #[tokio::test]
1240 async fn test_row_group_bloom_filter_pruning_predicate_multiple_expr() {
1241 BloomFilterTest::new_data_index_bloom_encoding_stats()
1242 .with_expect_all_pruned()
1243 .run(
1245 lit("1").eq(lit("1")).and(
1246 col(r#""String""#)
1247 .eq(lit("Hello_Not_Exists"))
1248 .or(col(r#""String""#).eq(lit("Hello_Not_Exists2"))),
1249 ),
1250 )
1251 .await
1252 }
1253
1254 #[tokio::test]
1255 async fn test_row_group_bloom_filter_pruning_predicate_multiple_expr_view() {
1256 BloomFilterTest::new_data_index_bloom_encoding_stats()
1257 .with_expect_all_pruned()
1258 .run(
1260 lit("1").eq(lit("1")).and(
1261 col(r#""String""#)
1262 .eq(Expr::Literal(
1263 ScalarValue::Utf8View(Some(String::from("Hello_Not_Exists"))),
1264 None,
1265 ))
1266 .or(col(r#""String""#).eq(Expr::Literal(
1267 ScalarValue::Utf8View(Some(String::from(
1268 "Hello_Not_Exists2",
1269 ))),
1270 None,
1271 ))),
1272 ),
1273 )
1274 .await
1275 }
1276
1277 #[tokio::test]
1278 async fn test_row_group_bloom_filter_pruning_predicate_sql_in() {
1279 let testdata = datafusion_common::test_util::parquet_test_data();
1281 let file_name = "data_index_bloom_encoding_stats.parquet";
1282 let path = format!("{testdata}/{file_name}");
1283 let data = bytes::Bytes::from(std::fs::read(path).unwrap());
1284
1285 let schema = Schema::new(vec![Field::new("String", DataType::Utf8, false)]);
1287
1288 let expr = col(r#""String""#).in_list(
1289 (1..25)
1290 .map(|i| lit(format!("Hello_Not_Exists{i}")))
1291 .collect::<Vec<_>>(),
1292 false,
1293 );
1294 let expr = logical2physical(&expr, &schema);
1295 let pruning_predicate =
1296 PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
1297
1298 let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate(
1299 file_name,
1300 data,
1301 &pruning_predicate,
1302 )
1303 .await
1304 .unwrap();
1305 assert!(pruned_row_groups.access_plan.row_group_indexes().is_empty());
1306 }
1307
1308 #[tokio::test]
1309 async fn test_row_group_bloom_filter_pruning_predicate_with_exists_value() {
1310 BloomFilterTest::new_data_index_bloom_encoding_stats()
1311 .with_expect_none_pruned()
1312 .run(col(r#""String""#).eq(lit("Hello")))
1314 .await
1315 }
1316
1317 #[tokio::test]
1318 async fn test_row_group_bloom_filter_pruning_predicate_with_exists_2_values() {
1319 BloomFilterTest::new_data_index_bloom_encoding_stats()
1320 .with_expect_none_pruned()
1321 .run(
1323 col(r#""String""#)
1324 .eq(lit("Hello"))
1325 .or(col(r#""String""#).eq(lit("the quick"))),
1326 )
1327 .await
1328 }
1329
1330 #[tokio::test]
1331 async fn test_row_group_bloom_filter_pruning_predicate_with_exists_3_values() {
1332 BloomFilterTest::new_data_index_bloom_encoding_stats()
1333 .with_expect_none_pruned()
1334 .run(
1336 col(r#""String""#)
1337 .eq(lit("Hello"))
1338 .or(col(r#""String""#).eq(lit("the quick")))
1339 .or(col(r#""String""#).eq(lit("are you"))),
1340 )
1341 .await
1342 }
1343
1344 #[tokio::test]
1345 async fn test_row_group_bloom_filter_pruning_predicate_with_exists_3_values_view() {
1346 BloomFilterTest::new_data_index_bloom_encoding_stats()
1347 .with_expect_none_pruned()
1348 .run(
1350 col(r#""String""#)
1351 .eq(Expr::Literal(
1352 ScalarValue::Utf8View(Some(String::from("Hello"))),
1353 None,
1354 ))
1355 .or(col(r#""String""#).eq(Expr::Literal(
1356 ScalarValue::Utf8View(Some(String::from("the quick"))),
1357 None,
1358 )))
1359 .or(col(r#""String""#).eq(Expr::Literal(
1360 ScalarValue::Utf8View(Some(String::from("are you"))),
1361 None,
1362 ))),
1363 )
1364 .await
1365 }
1366
1367 #[tokio::test]
1368 async fn test_row_group_bloom_filter_pruning_predicate_with_or_not_eq() {
1369 BloomFilterTest::new_data_index_bloom_encoding_stats()
1370 .with_expect_none_pruned()
1371 .run(
1373 col(r#""String""#)
1374 .not_eq(lit("foo"))
1375 .or(col(r#""String""#).not_eq(lit("bar"))),
1376 )
1377 .await
1378 }
1379
1380 #[tokio::test]
1381 async fn test_row_group_bloom_filter_pruning_predicate_without_bloom_filter() {
1382 BloomFilterTest::new_all_types()
1384 .with_expect_none_pruned()
1385 .run(col(r#""string_col""#).eq(lit("0")))
1386 .await
1387 }
1388
1389 #[derive(Debug)]
1391 enum ExpectedPruning {
1392 All,
1393 Some(Vec<usize>),
1395 None,
1396 }
1397
1398 impl ExpectedPruning {
1399 fn assert(&self, row_groups: &RowGroupAccessPlanFilter) {
1401 let num_row_groups = row_groups.access_plan.len();
1402 assert!(num_row_groups > 0);
1403 let num_pruned = (0..num_row_groups)
1404 .filter_map(|i| {
1405 if row_groups.access_plan.should_scan(i) {
1406 None
1407 } else {
1408 Some(1)
1409 }
1410 })
1411 .sum::<usize>();
1412
1413 match self {
1414 Self::All => {
1415 assert_eq!(
1416 num_row_groups, num_pruned,
1417 "Expected all row groups to be pruned, but got {row_groups:?}"
1418 );
1419 }
1420 ExpectedPruning::None => {
1421 assert_eq!(
1422 num_pruned, 0,
1423 "Expected no row groups to be pruned, but got {row_groups:?}"
1424 );
1425 }
1426 ExpectedPruning::Some(expected) => {
1427 let actual = row_groups.access_plan.row_group_indexes();
1428 assert_eq!(expected, &actual, "Unexpected row groups pruned. Expected {expected:?}, got {actual:?}");
1429 }
1430 }
1431 }
1432 }
1433
1434 fn assert_pruned(row_groups: RowGroupAccessPlanFilter, expected: ExpectedPruning) {
1435 expected.assert(&row_groups);
1436 }
1437
1438 struct BloomFilterTest {
1439 file_name: String,
1440 schema: Schema,
1441 post_pruning_row_groups: ExpectedPruning,
1443 }
1444
1445 impl BloomFilterTest {
1446 fn new_data_index_bloom_encoding_stats() -> Self {
1470 Self {
1471 file_name: String::from("data_index_bloom_encoding_stats.parquet"),
1472 schema: Schema::new(vec![Field::new("String", DataType::Utf8, false)]),
1473 post_pruning_row_groups: ExpectedPruning::None,
1474 }
1475 }
1476
1477 fn new_all_types() -> Self {
1479 Self {
1480 file_name: String::from("alltypes_plain.parquet"),
1481 schema: Schema::new(vec![Field::new(
1482 "string_col",
1483 DataType::Utf8,
1484 false,
1485 )]),
1486 post_pruning_row_groups: ExpectedPruning::None,
1487 }
1488 }
1489
1490 pub fn with_expect_all_pruned(mut self) -> Self {
1492 self.post_pruning_row_groups = ExpectedPruning::All;
1493 self
1494 }
1495
1496 pub fn with_expect_none_pruned(mut self) -> Self {
1498 self.post_pruning_row_groups = ExpectedPruning::None;
1499 self
1500 }
1501
1502 async fn run(self, expr: Expr) {
1504 let Self {
1505 file_name,
1506 schema,
1507 post_pruning_row_groups,
1508 } = self;
1509
1510 let testdata = datafusion_common::test_util::parquet_test_data();
1511 let path = format!("{testdata}/{file_name}");
1512 let data = bytes::Bytes::from(std::fs::read(path).unwrap());
1513
1514 let expr = logical2physical(&expr, &schema);
1515 let pruning_predicate =
1516 PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
1517
1518 let pruned_row_groups = test_row_group_bloom_filter_pruning_predicate(
1519 &file_name,
1520 data,
1521 &pruning_predicate,
1522 )
1523 .await
1524 .unwrap();
1525
1526 post_pruning_row_groups.assert(&pruned_row_groups);
1527 }
1528 }
1529
1530 async fn test_row_group_bloom_filter_pruning_predicate(
1532 file_name: &str,
1533 data: bytes::Bytes,
1534 pruning_predicate: &PruningPredicate,
1535 ) -> Result<RowGroupAccessPlanFilter> {
1536 use object_store::{ObjectMeta, ObjectStore};
1537
1538 let object_meta = ObjectMeta {
1539 location: object_store::path::Path::parse(file_name).expect("creating path"),
1540 last_modified: chrono::DateTime::from(std::time::SystemTime::now()),
1541 size: data.len() as u64,
1542 e_tag: None,
1543 version: None,
1544 };
1545 let in_memory = object_store::memory::InMemory::new();
1546 in_memory
1547 .put(&object_meta.location, data.into())
1548 .await
1549 .expect("put parquet file into in memory object store");
1550
1551 let metrics = ExecutionPlanMetricsSet::new();
1552 let file_metrics =
1553 ParquetFileMetrics::new(0, object_meta.location.as_ref(), &metrics);
1554 let inner = ParquetObjectReader::new(Arc::new(in_memory), object_meta.location)
1555 .with_file_size(object_meta.size);
1556
1557 let reader = ParquetFileReader {
1558 inner,
1559 file_metrics: file_metrics.clone(),
1560 };
1561 let mut builder = ParquetRecordBatchStreamBuilder::new(reader).await.unwrap();
1562
1563 let access_plan = ParquetAccessPlan::new_all(builder.metadata().num_row_groups());
1564 let mut pruned_row_groups = RowGroupAccessPlanFilter::new(access_plan);
1565 pruned_row_groups
1566 .prune_by_bloom_filters(
1567 pruning_predicate.schema(),
1568 &mut builder,
1569 pruning_predicate,
1570 &file_metrics,
1571 )
1572 .await;
1573
1574 Ok(pruned_row_groups)
1575 }
1576}