datafusion_datasource_parquet/
row_filter.rs1use std::cmp::Ordering;
63use std::collections::BTreeSet;
64use std::sync::Arc;
65
66use arrow::array::BooleanArray;
67use arrow::datatypes::{DataType, Schema, SchemaRef};
68use arrow::error::{ArrowError, Result as ArrowResult};
69use arrow::record_batch::RecordBatch;
70use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
71use parquet::arrow::ProjectionMask;
72use parquet::file::metadata::ParquetMetaData;
73
74use datafusion_common::cast::as_boolean_array;
75use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
76use datafusion_common::Result;
77use datafusion_datasource::schema_adapter::{SchemaAdapterFactory, SchemaMapper};
78use datafusion_physical_expr::expressions::Column;
79use datafusion_physical_expr::utils::reassign_expr_columns;
80use datafusion_physical_expr::{split_conjunction, PhysicalExpr, PhysicalExprExt};
81
82use datafusion_physical_plan::metrics;
83
84use super::ParquetFileMetrics;
85
86#[derive(Debug)]
97pub(crate) struct DatafusionArrowPredicate {
98 physical_expr: Arc<dyn PhysicalExpr>,
100 projection_mask: ProjectionMask,
103 rows_pruned: metrics::Count,
105 rows_matched: metrics::Count,
107 time: metrics::Time,
109 schema_mapper: Arc<dyn SchemaMapper>,
111}
112
113impl DatafusionArrowPredicate {
114 pub fn try_new(
116 candidate: FilterCandidate,
117 metadata: &ParquetMetaData,
118 rows_pruned: metrics::Count,
119 rows_matched: metrics::Count,
120 time: metrics::Time,
121 ) -> Result<Self> {
122 let physical_expr =
123 reassign_expr_columns(candidate.expr, &candidate.filter_schema)?;
124
125 Ok(Self {
126 physical_expr,
127 projection_mask: ProjectionMask::roots(
128 metadata.file_metadata().schema_descr(),
129 candidate.projection,
130 ),
131 rows_pruned,
132 rows_matched,
133 time,
134 schema_mapper: candidate.schema_mapper,
135 })
136 }
137}
138
139impl ArrowPredicate for DatafusionArrowPredicate {
140 fn projection(&self) -> &ProjectionMask {
141 &self.projection_mask
142 }
143
144 fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray> {
145 let batch = self.schema_mapper.map_batch(batch)?;
146
147 let mut timer = self.time.timer();
149
150 self.physical_expr
151 .evaluate(&batch)
152 .and_then(|v| v.into_array(batch.num_rows()))
153 .and_then(|array| {
154 let bool_arr = as_boolean_array(&array)?.clone();
155 let num_matched = bool_arr.true_count();
156 let num_pruned = bool_arr.len() - num_matched;
157 self.rows_pruned.add(num_pruned);
158 self.rows_matched.add(num_matched);
159 timer.stop();
160 Ok(bool_arr)
161 })
162 .map_err(|e| {
163 ArrowError::ComputeError(format!(
164 "Error evaluating filter predicate: {e:?}"
165 ))
166 })
167 }
168}
169
170pub(crate) struct FilterCandidate {
177 expr: Arc<dyn PhysicalExpr>,
178 required_bytes: usize,
183 can_use_index: bool,
185 projection: Vec<usize>,
189 schema_mapper: Arc<dyn SchemaMapper>,
192 filter_schema: SchemaRef,
194}
195
196struct FilterCandidateBuilder {
226 expr: Arc<dyn PhysicalExpr>,
227 file_schema: SchemaRef,
232 table_schema: SchemaRef,
235 schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
237}
238
239impl FilterCandidateBuilder {
240 pub fn new(
241 expr: Arc<dyn PhysicalExpr>,
242 file_schema: Arc<Schema>,
243 table_schema: Arc<Schema>,
244 schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
245 ) -> Self {
246 Self {
247 expr,
248 file_schema,
249 table_schema,
250 schema_adapter_factory,
251 }
252 }
253
254 pub fn build(self, metadata: &ParquetMetaData) -> Result<Option<FilterCandidate>> {
262 let Some(required_indices_into_table_schema) =
263 pushdown_columns(&self.expr, &self.table_schema)?
264 else {
265 return Ok(None);
266 };
267
268 let projected_table_schema = Arc::new(
269 self.table_schema
270 .project(&required_indices_into_table_schema)?,
271 );
272
273 let (schema_mapper, projection_into_file_schema) = self
274 .schema_adapter_factory
275 .create(Arc::clone(&projected_table_schema), self.table_schema)
276 .map_schema(&self.file_schema)?;
277
278 let required_bytes = size_of_columns(&projection_into_file_schema, metadata)?;
279 let can_use_index = columns_sorted(&projection_into_file_schema, metadata)?;
280
281 Ok(Some(FilterCandidate {
282 expr: self.expr,
283 required_bytes,
284 can_use_index,
285 projection: projection_into_file_schema,
286 schema_mapper: Arc::clone(&schema_mapper),
287 filter_schema: Arc::clone(&projected_table_schema),
288 }))
289 }
290}
291
292struct PushdownChecker<'schema> {
297 non_primitive_columns: bool,
299 projected_columns: bool,
303 required_columns: BTreeSet<usize>,
305 table_schema: &'schema Schema,
306}
307
308impl<'schema> PushdownChecker<'schema> {
309 fn new(table_schema: &'schema Schema) -> Self {
310 Self {
311 non_primitive_columns: false,
312 projected_columns: false,
313 required_columns: BTreeSet::default(),
314 table_schema,
315 }
316 }
317
318 fn check_single_column(&mut self, column_name: &str) -> Option<TreeNodeRecursion> {
319 if let Ok(idx) = self.table_schema.index_of(column_name) {
320 self.required_columns.insert(idx);
321 if DataType::is_nested(self.table_schema.field(idx).data_type()) {
322 self.non_primitive_columns = true;
323 return Some(TreeNodeRecursion::Jump);
324 }
325 } else {
326 self.projected_columns = true;
329 return Some(TreeNodeRecursion::Jump);
330 }
331
332 None
333 }
334
335 #[inline]
336 fn prevents_pushdown(&self) -> bool {
337 self.non_primitive_columns || self.projected_columns
338 }
339
340 fn check(&mut self, node: Arc<dyn PhysicalExpr>) -> Result<TreeNodeRecursion> {
341 node.apply_with_lambdas_params(|node, lamdas_params| {
342 if let Some(column) = node.as_any().downcast_ref::<Column>() {
343 if !lamdas_params.contains(column.name()) {
344 if let Some(recursion) = self.check_single_column(column.name()) {
345 return Ok(recursion);
346 }
347 }
348 }
349
350 Ok(TreeNodeRecursion::Continue)
351 })
352 }
353}
354
355impl TreeNodeVisitor<'_> for PushdownChecker<'_> {
356 type Node = Arc<dyn PhysicalExpr>;
357
358 fn f_down(&mut self, node: &Self::Node) -> Result<TreeNodeRecursion> {
359 if let Some(column) = node.as_any().downcast_ref::<Column>() {
360 if let Some(recursion) = self.check_single_column(column.name()) {
361 return Ok(recursion);
362 }
363 }
364
365 Ok(TreeNodeRecursion::Continue)
366 }
367}
368
369fn pushdown_columns(
374 expr: &Arc<dyn PhysicalExpr>,
375 table_schema: &Schema,
376) -> Result<Option<Vec<usize>>> {
377 let mut checker = PushdownChecker::new(table_schema);
378 expr.visit(&mut checker)?;
379 Ok((!checker.prevents_pushdown())
380 .then_some(checker.required_columns.into_iter().collect()))
381}
382
383pub fn can_expr_be_pushed_down_with_schemas(
389 expr: &Arc<dyn PhysicalExpr>,
390 file_schema: &Schema,
391) -> bool {
392 match pushdown_columns(expr, file_schema) {
393 Ok(Some(_)) => true,
394 Ok(None) | Err(_) => false,
395 }
396}
397
398fn size_of_columns(columns: &[usize], metadata: &ParquetMetaData) -> Result<usize> {
404 let mut total_size = 0;
405 let row_groups = metadata.row_groups();
406 for idx in columns {
407 for rg in row_groups.iter() {
408 total_size += rg.column(*idx).compressed_size() as usize;
409 }
410 }
411
412 Ok(total_size)
413}
414
415fn columns_sorted(_columns: &[usize], _metadata: &ParquetMetaData) -> Result<bool> {
421 Ok(false)
423}
424
425pub fn build_row_filter(
440 expr: &Arc<dyn PhysicalExpr>,
441 physical_file_schema: &SchemaRef,
442 predicate_file_schema: &SchemaRef,
443 metadata: &ParquetMetaData,
444 reorder_predicates: bool,
445 file_metrics: &ParquetFileMetrics,
446 schema_adapter_factory: &Arc<dyn SchemaAdapterFactory>,
447) -> Result<Option<RowFilter>> {
448 let rows_pruned = &file_metrics.pushdown_rows_pruned;
449 let rows_matched = &file_metrics.pushdown_rows_matched;
450 let time = &file_metrics.row_pushdown_eval_time;
451
452 let predicates = split_conjunction(expr);
455
456 let mut candidates: Vec<FilterCandidate> = predicates
458 .into_iter()
459 .map(|expr| {
460 FilterCandidateBuilder::new(
461 Arc::clone(expr),
462 Arc::clone(physical_file_schema),
463 Arc::clone(predicate_file_schema),
464 Arc::clone(schema_adapter_factory),
465 )
466 .build(metadata)
467 })
468 .collect::<Result<Vec<_>, _>>()?
469 .into_iter()
470 .flatten()
471 .collect();
472
473 if candidates.is_empty() {
475 return Ok(None);
476 }
477
478 if reorder_predicates {
479 candidates.sort_unstable_by(|c1, c2| {
480 match c1.can_use_index.cmp(&c2.can_use_index) {
481 Ordering::Equal => c1.required_bytes.cmp(&c2.required_bytes),
482 ord => ord,
483 }
484 });
485 }
486
487 candidates
488 .into_iter()
489 .map(|candidate| {
490 DatafusionArrowPredicate::try_new(
491 candidate,
492 metadata,
493 rows_pruned.clone(),
494 rows_matched.clone(),
495 time.clone(),
496 )
497 .map(|pred| Box::new(pred) as _)
498 })
499 .collect::<Result<Vec<_>, _>>()
500 .map(|filters| Some(RowFilter::new(filters)))
501}
502
503#[cfg(test)]
504mod test {
505 use super::*;
506 use datafusion_common::ScalarValue;
507
508 use arrow::datatypes::{Field, TimeUnit::Nanosecond};
509 use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory;
510 use datafusion_expr::{col, Expr};
511 use datafusion_physical_expr::planner::logical2physical;
512 use datafusion_physical_plan::metrics::{Count, Time};
513
514 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
515 use parquet::arrow::parquet_to_arrow_schema;
516 use parquet::file::reader::{FileReader, SerializedFileReader};
517
518 #[test]
520 fn test_filter_candidate_builder_ignore_complex_types() {
521 let testdata = datafusion_common::test_util::parquet_test_data();
522 let file = std::fs::File::open(format!("{testdata}/list_columns.parquet"))
523 .expect("opening file");
524
525 let reader = SerializedFileReader::new(file).expect("creating reader");
526
527 let metadata = reader.metadata();
528
529 let table_schema =
530 parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None)
531 .expect("parsing schema");
532
533 let expr = col("int64_list").is_not_null();
534 let expr = logical2physical(&expr, &table_schema);
535
536 let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory);
537 let table_schema = Arc::new(table_schema.clone());
538
539 let candidate = FilterCandidateBuilder::new(
540 expr,
541 table_schema.clone(),
542 table_schema,
543 schema_adapter_factory,
544 )
545 .build(metadata)
546 .expect("building candidate");
547
548 assert!(candidate.is_none());
549 }
550
551 #[test]
552 fn test_filter_type_coercion() {
553 let testdata = datafusion_common::test_util::parquet_test_data();
554 let file = std::fs::File::open(format!("{testdata}/alltypes_plain.parquet"))
555 .expect("opening file");
556
557 let parquet_reader_builder =
558 ParquetRecordBatchReaderBuilder::try_new(file).expect("creating reader");
559 let metadata = parquet_reader_builder.metadata().clone();
560 let file_schema = parquet_reader_builder.schema().clone();
561
562 let table_schema = Schema::new(vec![Field::new(
565 "timestamp_col",
566 DataType::Timestamp(Nanosecond, Some(Arc::from("UTC"))),
567 false,
568 )]);
569
570 let expr = col("timestamp_col").lt(Expr::Literal(
572 ScalarValue::TimestampNanosecond(Some(1), Some(Arc::from("UTC"))),
573 None,
574 ));
575 let expr = logical2physical(&expr, &table_schema);
576 let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory);
577 let table_schema = Arc::new(table_schema.clone());
578 let candidate = FilterCandidateBuilder::new(
579 expr,
580 file_schema.clone(),
581 table_schema.clone(),
582 schema_adapter_factory,
583 )
584 .build(&metadata)
585 .expect("building candidate")
586 .expect("candidate expected");
587
588 let mut row_filter = DatafusionArrowPredicate::try_new(
589 candidate,
590 &metadata,
591 Count::new(),
592 Count::new(),
593 Time::new(),
594 )
595 .expect("creating filter predicate");
596
597 let mut parquet_reader = parquet_reader_builder
598 .with_projection(row_filter.projection().clone())
599 .build()
600 .expect("building reader");
601
602 let first_rb = parquet_reader
604 .next()
605 .expect("expected record batch")
606 .expect("expected error free record batch");
607
608 let filtered = row_filter.evaluate(first_rb.clone());
609 assert!(matches!(filtered, Ok(a) if a == BooleanArray::from(vec![false; 8])));
610
611 let expr = col("timestamp_col").gt(Expr::Literal(
613 ScalarValue::TimestampNanosecond(Some(0), Some(Arc::from("UTC"))),
614 None,
615 ));
616 let expr = logical2physical(&expr, &table_schema);
617 let schema_adapter_factory = Arc::new(DefaultSchemaAdapterFactory);
618 let candidate = FilterCandidateBuilder::new(
619 expr,
620 file_schema,
621 table_schema,
622 schema_adapter_factory,
623 )
624 .build(&metadata)
625 .expect("building candidate")
626 .expect("candidate expected");
627
628 let mut row_filter = DatafusionArrowPredicate::try_new(
629 candidate,
630 &metadata,
631 Count::new(),
632 Count::new(),
633 Time::new(),
634 )
635 .expect("creating filter predicate");
636
637 let filtered = row_filter.evaluate(first_rb);
638 assert!(matches!(filtered, Ok(a) if a == BooleanArray::from(vec![true; 8])));
639 }
640
641 #[test]
642 fn nested_data_structures_prevent_pushdown() {
643 let table_schema = Arc::new(get_lists_table_schema());
644
645 let expr = col("utf8_list").is_not_null();
646 let expr = logical2physical(&expr, &table_schema);
647 check_expression_can_evaluate_against_schema(&expr, &table_schema);
648
649 assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
650 }
651
652 #[test]
653 fn projected_columns_prevent_pushdown() {
654 let table_schema = get_basic_table_schema();
655
656 let expr =
657 Arc::new(Column::new("nonexistent_column", 0)) as Arc<dyn PhysicalExpr>;
658
659 assert!(!can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
660 }
661
662 #[test]
663 fn basic_expr_doesnt_prevent_pushdown() {
664 let table_schema = get_basic_table_schema();
665
666 let expr = col("string_col").is_null();
667 let expr = logical2physical(&expr, &table_schema);
668
669 assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
670 }
671
672 #[test]
673 fn complex_expr_doesnt_prevent_pushdown() {
674 let table_schema = get_basic_table_schema();
675
676 let expr = col("string_col")
677 .is_not_null()
678 .or(col("bigint_col").gt(Expr::Literal(ScalarValue::Int64(Some(5)), None)));
679 let expr = logical2physical(&expr, &table_schema);
680
681 assert!(can_expr_be_pushed_down_with_schemas(&expr, &table_schema));
682 }
683
684 fn get_basic_table_schema() -> Schema {
685 let testdata = datafusion_common::test_util::parquet_test_data();
686 let file = std::fs::File::open(format!("{testdata}/alltypes_plain.parquet"))
687 .expect("opening file");
688
689 let reader = SerializedFileReader::new(file).expect("creating reader");
690
691 let metadata = reader.metadata();
692
693 parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None)
694 .expect("parsing schema")
695 }
696
697 fn get_lists_table_schema() -> Schema {
698 let testdata = datafusion_common::test_util::parquet_test_data();
699 let file = std::fs::File::open(format!("{testdata}/list_columns.parquet"))
700 .expect("opening file");
701
702 let reader = SerializedFileReader::new(file).expect("creating reader");
703
704 let metadata = reader.metadata();
705
706 parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None)
707 .expect("parsing schema")
708 }
709
710 fn check_expression_can_evaluate_against_schema(
713 expr: &Arc<dyn PhysicalExpr>,
714 table_schema: &Arc<Schema>,
715 ) -> bool {
716 let batch = RecordBatch::new_empty(Arc::clone(table_schema));
717 expr.evaluate(&batch).is_ok()
718 }
719}