1use crate::page_filter::PagePruningAccessPlanFilter;
21use crate::row_group_filter::RowGroupAccessPlanFilter;
22use crate::{
23 apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter,
24 ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory,
25};
26use arrow::array::RecordBatch;
27use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
28use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
29use std::pin::Pin;
30use std::sync::Arc;
31use std::task::{Context, Poll};
32
33use arrow::datatypes::{FieldRef, SchemaRef, TimeUnit};
34use datafusion_common::encryption::FileDecryptionProperties;
35
36use datafusion_common::{exec_err, DataFusionError, Result};
37use datafusion_datasource::PartitionedFile;
38use datafusion_physical_expr::simplifier::PhysicalExprSimplifier;
39use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
40use datafusion_physical_expr_common::physical_expr::{
41 is_dynamic_physical_expr, PhysicalExpr,
42};
43use datafusion_physical_plan::metrics::{
44 Count, ExecutionPlanMetricsSet, MetricBuilder, PruningMetrics,
45};
46use datafusion_pruning::{build_pruning_predicate, FilePruner, PruningPredicate};
47
48#[cfg(feature = "parquet_encryption")]
49use datafusion_common::config::EncryptionFactoryOptions;
50#[cfg(feature = "parquet_encryption")]
51use datafusion_execution::parquet_encryption::EncryptionFactory;
52use futures::{ready, Stream, StreamExt, TryStreamExt};
53use itertools::Itertools;
54use log::debug;
55use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
56use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
57use parquet::arrow::async_reader::AsyncFileReader;
58use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
59use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader};
60
61pub(super) struct ParquetOpener {
63 pub partition_index: usize,
65 pub projection: Arc<[usize]>,
67 pub batch_size: usize,
69 pub limit: Option<usize>,
71 pub predicate: Option<Arc<dyn PhysicalExpr>>,
73 pub logical_file_schema: SchemaRef,
76 pub partition_fields: Vec<FieldRef>,
78 pub metadata_size_hint: Option<usize>,
81 pub metrics: ExecutionPlanMetricsSet,
83 pub parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
85 pub pushdown_filters: bool,
88 pub reorder_filters: bool,
90 pub enable_page_index: bool,
93 pub enable_bloom_filter: bool,
96 pub schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
98 pub enable_row_group_stats_pruning: bool,
100 pub coerce_int96: Option<TimeUnit>,
102 #[cfg(feature = "parquet_encryption")]
104 pub file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
105 pub(crate) expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
107 #[cfg(feature = "parquet_encryption")]
109 pub encryption_factory:
110 Option<(Arc<dyn EncryptionFactory>, EncryptionFactoryOptions)>,
111 pub max_predicate_cache_size: Option<usize>,
114}
115
116impl FileOpener for ParquetOpener {
117 fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
118 let file_range = partitioned_file.range.clone();
119 let extensions = partitioned_file.extensions.clone();
120 let file_location = partitioned_file.object_meta.location.clone();
121 let file_name = file_location.to_string();
122 let file_metrics =
123 ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics);
124
125 let metadata_size_hint = partitioned_file
126 .metadata_size_hint
127 .or(self.metadata_size_hint);
128
129 let mut async_file_reader: Box<dyn AsyncFileReader> =
130 self.parquet_file_reader_factory.create_reader(
131 self.partition_index,
132 partitioned_file.clone(),
133 metadata_size_hint,
134 &self.metrics,
135 )?;
136
137 let batch_size = self.batch_size;
138
139 let projected_schema =
140 SchemaRef::from(self.logical_file_schema.project(&self.projection)?);
141 let schema_adapter_factory = Arc::clone(&self.schema_adapter_factory);
142 let schema_adapter = self
143 .schema_adapter_factory
144 .create(projected_schema, Arc::clone(&self.logical_file_schema));
145 let mut predicate = self.predicate.clone();
146 let logical_file_schema = Arc::clone(&self.logical_file_schema);
147 let partition_fields = self.partition_fields.clone();
148 let reorder_predicates = self.reorder_filters;
149 let pushdown_filters = self.pushdown_filters;
150 let coerce_int96 = self.coerce_int96;
151 let enable_bloom_filter = self.enable_bloom_filter;
152 let enable_row_group_stats_pruning = self.enable_row_group_stats_pruning;
153 let limit = self.limit;
154
155 let predicate_creation_errors = MetricBuilder::new(&self.metrics)
156 .global_counter("num_predicate_creation_errors");
157
158 let expr_adapter_factory = self.expr_adapter_factory.clone();
159 let mut predicate_file_schema = Arc::clone(&self.logical_file_schema);
160
161 let enable_page_index = self.enable_page_index;
162 #[cfg(feature = "parquet_encryption")]
163 let encryption_context = self.get_encryption_context();
164 let max_predicate_cache_size = self.max_predicate_cache_size;
165
166 Ok(Box::pin(async move {
167 #[cfg(feature = "parquet_encryption")]
168 let file_decryption_properties = encryption_context
169 .get_file_decryption_properties(&file_location)
170 .await?;
171
172 let mut file_pruner = predicate
181 .as_ref()
182 .map(|p| {
183 Ok::<_, DataFusionError>(
184 (is_dynamic_physical_expr(p) | partitioned_file.has_statistics())
185 .then_some(FilePruner::new(
186 Arc::clone(p),
187 &logical_file_schema,
188 partition_fields.clone(),
189 partitioned_file.clone(),
190 predicate_creation_errors.clone(),
191 )?),
192 )
193 })
194 .transpose()?
195 .flatten();
196
197 if let Some(file_pruner) = &mut file_pruner {
198 if file_pruner.should_prune()? {
199 file_metrics.files_ranges_pruned_statistics.add_pruned(1);
201 return Ok(futures::stream::empty().boxed());
202 }
203 }
204
205 file_metrics.files_ranges_pruned_statistics.add_matched(1);
206
207 let mut options = ArrowReaderOptions::new().with_page_index(false);
213 #[cfg(feature = "parquet_encryption")]
214 if let Some(fd_val) = file_decryption_properties {
215 options = options.with_file_decryption_properties(Arc::clone(&fd_val));
216 }
217 let mut metadata_timer = file_metrics.metadata_load_time.timer();
218
219 let mut reader_metadata =
224 ArrowReaderMetadata::load_async(&mut async_file_reader, options.clone())
225 .await?;
226
227 let mut physical_file_schema = Arc::clone(reader_metadata.schema());
234
235 if let Some(merged) = apply_file_schema_type_coercions(
239 &logical_file_schema,
240 &physical_file_schema,
241 ) {
242 physical_file_schema = Arc::new(merged);
243 options = options.with_schema(Arc::clone(&physical_file_schema));
244 reader_metadata = ArrowReaderMetadata::try_new(
245 Arc::clone(reader_metadata.metadata()),
246 options.clone(),
247 )?;
248 }
249
250 if let Some(ref coerce) = coerce_int96 {
251 if let Some(merged) = coerce_int96_to_resolution(
252 reader_metadata.parquet_schema(),
253 &physical_file_schema,
254 coerce,
255 ) {
256 physical_file_schema = Arc::new(merged);
257 options = options.with_schema(Arc::clone(&physical_file_schema));
258 reader_metadata = ArrowReaderMetadata::try_new(
259 Arc::clone(reader_metadata.metadata()),
260 options.clone(),
261 )?;
262 }
263 }
264
265 if let Some(expr_adapter_factory) = expr_adapter_factory {
268 predicate = predicate
269 .map(|p| {
270 let partition_values = partition_fields
271 .iter()
272 .cloned()
273 .zip(partitioned_file.partition_values)
274 .collect_vec();
275 let expr = expr_adapter_factory
276 .create(
277 Arc::clone(&logical_file_schema),
278 Arc::clone(&physical_file_schema),
279 )
280 .with_partition_values(partition_values)
281 .rewrite(p)?;
282 PhysicalExprSimplifier::new(&physical_file_schema).simplify(expr)
286 })
287 .transpose()?;
288 predicate_file_schema = Arc::clone(&physical_file_schema);
289 }
290
291 let (pruning_predicate, page_pruning_predicate) = build_pruning_predicates(
293 predicate.as_ref(),
294 &predicate_file_schema,
295 &predicate_creation_errors,
296 );
297
298 if should_enable_page_index(enable_page_index, &page_pruning_predicate) {
302 reader_metadata = load_page_index(
303 reader_metadata,
304 &mut async_file_reader,
305 options.with_page_index(true),
307 )
308 .await?;
309 }
310
311 metadata_timer.stop();
312
313 let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata(
314 async_file_reader,
315 reader_metadata,
316 );
317
318 let (schema_mapping, adapted_projections) =
319 schema_adapter.map_schema(&physical_file_schema)?;
320
321 let mask = ProjectionMask::roots(
322 builder.parquet_schema(),
323 adapted_projections.iter().cloned(),
324 );
325
326 if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() {
328 let row_filter = row_filter::build_row_filter(
329 &predicate,
330 &physical_file_schema,
331 &predicate_file_schema,
332 builder.metadata(),
333 reorder_predicates,
334 &file_metrics,
335 &schema_adapter_factory,
336 );
337
338 match row_filter {
339 Ok(Some(filter)) => {
340 builder = builder.with_row_filter(filter);
341 }
342 Ok(None) => {}
343 Err(e) => {
344 debug!(
345 "Ignoring error building row filter for '{predicate:?}': {e}"
346 );
347 }
348 };
349 };
350
351 let file_metadata = Arc::clone(builder.metadata());
354 let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
355 let rg_metadata = file_metadata.row_groups();
356 let access_plan =
358 create_initial_plan(&file_name, extensions, rg_metadata.len())?;
359 let mut row_groups = RowGroupAccessPlanFilter::new(access_plan);
360 if let Some(range) = file_range.as_ref() {
362 row_groups.prune_by_range(rg_metadata, range);
363 }
364
365 if let Some(predicate) = predicate.as_ref() {
367 if enable_row_group_stats_pruning {
368 row_groups.prune_by_statistics(
369 &physical_file_schema,
370 builder.parquet_schema(),
371 rg_metadata,
372 predicate,
373 &file_metrics,
374 );
375 } else {
376 file_metrics
379 .row_groups_pruned_statistics
380 .add_matched(row_groups.remaining_row_group_count());
381 }
382
383 if enable_bloom_filter && !row_groups.is_empty() {
384 row_groups
385 .prune_by_bloom_filters(
386 &physical_file_schema,
387 &mut builder,
388 predicate,
389 &file_metrics,
390 )
391 .await;
392 } else {
393 file_metrics
396 .row_groups_pruned_bloom_filter
397 .add_matched(row_groups.remaining_row_group_count());
398 }
399 } else {
400 let n_remaining_row_groups = row_groups.remaining_row_group_count();
402 file_metrics
403 .row_groups_pruned_statistics
404 .add_matched(n_remaining_row_groups);
405 file_metrics
406 .row_groups_pruned_bloom_filter
407 .add_matched(n_remaining_row_groups);
408 }
409
410 let mut access_plan = row_groups.build();
411
412 if enable_page_index && !access_plan.is_empty() {
416 if let Some(p) = page_pruning_predicate {
417 access_plan = p.prune_plan_with_page_index(
418 access_plan,
419 &physical_file_schema,
420 builder.parquet_schema(),
421 file_metadata.as_ref(),
422 &file_metrics,
423 );
424 }
425 }
426
427 let row_group_indexes = access_plan.row_group_indexes();
428 if let Some(row_selection) =
429 access_plan.into_overall_row_selection(rg_metadata)?
430 {
431 builder = builder.with_row_selection(row_selection);
432 }
433
434 if let Some(limit) = limit {
435 builder = builder.with_limit(limit)
436 }
437
438 if let Some(max_predicate_cache_size) = max_predicate_cache_size {
439 builder = builder.with_max_predicate_cache_size(max_predicate_cache_size);
440 }
441
442 let arrow_reader_metrics = ArrowReaderMetrics::enabled();
444
445 let stream = builder
446 .with_projection(mask)
447 .with_batch_size(batch_size)
448 .with_row_groups(row_group_indexes)
449 .with_metrics(arrow_reader_metrics.clone())
450 .build()?;
451
452 let files_ranges_pruned_statistics =
453 file_metrics.files_ranges_pruned_statistics.clone();
454 let predicate_cache_inner_records =
455 file_metrics.predicate_cache_inner_records.clone();
456 let predicate_cache_records = file_metrics.predicate_cache_records.clone();
457
458 let stream = stream.map_err(DataFusionError::from).map(move |b| {
459 b.and_then(|b| {
460 copy_arrow_reader_metrics(
461 &arrow_reader_metrics,
462 &predicate_cache_inner_records,
463 &predicate_cache_records,
464 );
465 schema_mapping.map_batch(b)
466 })
467 });
468
469 if let Some(file_pruner) = file_pruner {
470 Ok(EarlyStoppingStream::new(
471 stream,
472 file_pruner,
473 files_ranges_pruned_statistics,
474 )
475 .boxed())
476 } else {
477 Ok(stream.boxed())
478 }
479 }))
480 }
481}
482
483fn copy_arrow_reader_metrics(
486 arrow_reader_metrics: &ArrowReaderMetrics,
487 predicate_cache_inner_records: &Count,
488 predicate_cache_records: &Count,
489) {
490 if let Some(v) = arrow_reader_metrics.records_read_from_inner() {
491 predicate_cache_inner_records.add(v);
492 }
493
494 if let Some(v) = arrow_reader_metrics.records_read_from_cache() {
495 predicate_cache_records.add(v);
496 }
497}
498
499struct EarlyStoppingStream<S> {
505 done: bool,
508 file_pruner: FilePruner,
509 files_ranges_pruned_statistics: PruningMetrics,
510 inner: S,
512}
513
514impl<S> EarlyStoppingStream<S> {
515 pub fn new(
516 stream: S,
517 file_pruner: FilePruner,
518 files_ranges_pruned_statistics: PruningMetrics,
519 ) -> Self {
520 Self {
521 done: false,
522 inner: stream,
523 file_pruner,
524 files_ranges_pruned_statistics,
525 }
526 }
527}
528impl<S> EarlyStoppingStream<S>
529where
530 S: Stream<Item = Result<RecordBatch>> + Unpin,
531{
532 fn check_prune(&mut self, input: Result<RecordBatch>) -> Result<Option<RecordBatch>> {
533 let batch = input?;
534
535 if self.file_pruner.should_prune()? {
538 self.files_ranges_pruned_statistics.add_pruned(1);
539 self.files_ranges_pruned_statistics.subtract_matched(1);
541 self.done = true;
542 Ok(None)
543 } else {
544 Ok(Some(batch))
546 }
547 }
548}
549
550impl<S> Stream for EarlyStoppingStream<S>
551where
552 S: Stream<Item = Result<RecordBatch>> + Unpin,
553{
554 type Item = Result<RecordBatch>;
555
556 fn poll_next(
557 mut self: Pin<&mut Self>,
558 cx: &mut Context<'_>,
559 ) -> Poll<Option<Self::Item>> {
560 if self.done {
561 return Poll::Ready(None);
562 }
563 match ready!(self.inner.poll_next_unpin(cx)) {
564 None => {
565 self.done = true;
567 Poll::Ready(None)
568 }
569 Some(input_batch) => {
570 let output = self.check_prune(input_batch);
571 Poll::Ready(output.transpose())
572 }
573 }
574 }
575}
576
577#[derive(Default)]
578#[cfg_attr(not(feature = "parquet_encryption"), allow(dead_code))]
579struct EncryptionContext {
580 #[cfg(feature = "parquet_encryption")]
581 file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
582 #[cfg(feature = "parquet_encryption")]
583 encryption_factory: Option<(Arc<dyn EncryptionFactory>, EncryptionFactoryOptions)>,
584}
585
586#[cfg(feature = "parquet_encryption")]
587impl EncryptionContext {
588 fn new(
589 file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
590 encryption_factory: Option<(
591 Arc<dyn EncryptionFactory>,
592 EncryptionFactoryOptions,
593 )>,
594 ) -> Self {
595 Self {
596 file_decryption_properties,
597 encryption_factory,
598 }
599 }
600
601 async fn get_file_decryption_properties(
602 &self,
603 file_location: &object_store::path::Path,
604 ) -> Result<Option<Arc<FileDecryptionProperties>>> {
605 match &self.file_decryption_properties {
606 Some(file_decryption_properties) => {
607 Ok(Some(Arc::clone(file_decryption_properties)))
608 }
609 None => match &self.encryption_factory {
610 Some((encryption_factory, encryption_config)) => Ok(encryption_factory
611 .get_file_decryption_properties(encryption_config, file_location)
612 .await?),
613 None => Ok(None),
614 },
615 }
616 }
617}
618
619#[cfg(not(feature = "parquet_encryption"))]
620#[allow(dead_code)]
621impl EncryptionContext {
622 async fn get_file_decryption_properties(
623 &self,
624 _file_location: &object_store::path::Path,
625 ) -> Result<Option<Arc<FileDecryptionProperties>>> {
626 Ok(None)
627 }
628}
629
630impl ParquetOpener {
631 #[cfg(feature = "parquet_encryption")]
632 fn get_encryption_context(&self) -> EncryptionContext {
633 EncryptionContext::new(
634 self.file_decryption_properties.clone(),
635 self.encryption_factory.clone(),
636 )
637 }
638
639 #[cfg(not(feature = "parquet_encryption"))]
640 #[allow(dead_code)]
641 fn get_encryption_context(&self) -> EncryptionContext {
642 EncryptionContext::default()
643 }
644}
645
646fn create_initial_plan(
655 file_name: &str,
656 extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
657 row_group_count: usize,
658) -> Result<ParquetAccessPlan> {
659 if let Some(extensions) = extensions {
660 if let Some(access_plan) = extensions.downcast_ref::<ParquetAccessPlan>() {
661 let plan_len = access_plan.len();
662 if plan_len != row_group_count {
663 return exec_err!(
664 "Invalid ParquetAccessPlan for {file_name}. Specified {plan_len} row groups, but file has {row_group_count}"
665 );
666 }
667
668 return Ok(access_plan.clone());
670 } else {
671 debug!("DataSourceExec Ignoring unknown extension specified for {file_name}");
672 }
673 }
674
675 Ok(ParquetAccessPlan::new_all(row_group_count))
677}
678
679pub(crate) fn build_page_pruning_predicate(
683 predicate: &Arc<dyn PhysicalExpr>,
684 file_schema: &SchemaRef,
685) -> Arc<PagePruningAccessPlanFilter> {
686 Arc::new(PagePruningAccessPlanFilter::new(
687 predicate,
688 Arc::clone(file_schema),
689 ))
690}
691
692pub(crate) fn build_pruning_predicates(
693 predicate: Option<&Arc<dyn PhysicalExpr>>,
694 file_schema: &SchemaRef,
695 predicate_creation_errors: &Count,
696) -> (
697 Option<Arc<PruningPredicate>>,
698 Option<Arc<PagePruningAccessPlanFilter>>,
699) {
700 let Some(predicate) = predicate.as_ref() else {
701 return (None, None);
702 };
703 let pruning_predicate = build_pruning_predicate(
704 Arc::clone(predicate),
705 file_schema,
706 predicate_creation_errors,
707 );
708 let page_pruning_predicate = build_page_pruning_predicate(predicate, file_schema);
709 (pruning_predicate, Some(page_pruning_predicate))
710}
711
712async fn load_page_index<T: AsyncFileReader>(
715 reader_metadata: ArrowReaderMetadata,
716 input: &mut T,
717 options: ArrowReaderOptions,
718) -> Result<ArrowReaderMetadata> {
719 let parquet_metadata = reader_metadata.metadata();
720 let missing_column_index = parquet_metadata.column_index().is_none();
721 let missing_offset_index = parquet_metadata.offset_index().is_none();
722 if missing_column_index || missing_offset_index {
728 let m = Arc::try_unwrap(Arc::clone(parquet_metadata))
729 .unwrap_or_else(|e| e.as_ref().clone());
730 let mut reader = ParquetMetaDataReader::new_with_metadata(m)
731 .with_page_index_policy(PageIndexPolicy::Optional);
732 reader.load_page_index(input).await?;
733 let new_parquet_metadata = reader.finish()?;
734 let new_arrow_reader =
735 ArrowReaderMetadata::try_new(Arc::new(new_parquet_metadata), options)?;
736 Ok(new_arrow_reader)
737 } else {
738 Ok(reader_metadata)
740 }
741}
742
743fn should_enable_page_index(
744 enable_page_index: bool,
745 page_pruning_predicate: &Option<Arc<PagePruningAccessPlanFilter>>,
746) -> bool {
747 enable_page_index
748 && page_pruning_predicate.is_some()
749 && page_pruning_predicate
750 .as_ref()
751 .map(|p| p.filter_number() > 0)
752 .unwrap_or(false)
753}
754
755#[cfg(test)]
756mod test {
757 use std::sync::Arc;
758
759 use arrow::{
760 compute::cast,
761 datatypes::{DataType, Field, Schema, SchemaRef},
762 };
763 use bytes::{BufMut, BytesMut};
764 use datafusion_common::{
765 assert_batches_eq, record_batch, stats::Precision, ColumnStatistics,
766 DataFusionError, ScalarValue, Statistics,
767 };
768 use datafusion_datasource::{
769 file_stream::FileOpener,
770 schema_adapter::{
771 DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory,
772 SchemaMapper,
773 },
774 PartitionedFile,
775 };
776 use datafusion_expr::{col, lit};
777 use datafusion_physical_expr::{
778 expressions::DynamicFilterPhysicalExpr, planner::logical2physical, PhysicalExpr,
779 };
780 use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
781 use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
782 use futures::{Stream, StreamExt};
783 use object_store::{memory::InMemory, path::Path, ObjectStore};
784 use parquet::arrow::ArrowWriter;
785
786 use crate::{opener::ParquetOpener, DefaultParquetFileReaderFactory};
787
788 async fn count_batches_and_rows(
789 mut stream: std::pin::Pin<
790 Box<
791 dyn Stream<Item = Result<arrow::array::RecordBatch, DataFusionError>>
792 + Send,
793 >,
794 >,
795 ) -> (usize, usize) {
796 let mut num_batches = 0;
797 let mut num_rows = 0;
798 while let Some(Ok(batch)) = stream.next().await {
799 num_rows += batch.num_rows();
800 num_batches += 1;
801 }
802 (num_batches, num_rows)
803 }
804
805 async fn collect_batches(
806 mut stream: std::pin::Pin<
807 Box<
808 dyn Stream<Item = Result<arrow::array::RecordBatch, DataFusionError>>
809 + Send,
810 >,
811 >,
812 ) -> Vec<arrow::array::RecordBatch> {
813 let mut batches = vec![];
814 while let Some(Ok(batch)) = stream.next().await {
815 batches.push(batch);
816 }
817 batches
818 }
819
820 async fn write_parquet(
821 store: Arc<dyn ObjectStore>,
822 filename: &str,
823 batch: arrow::record_batch::RecordBatch,
824 ) -> usize {
825 let mut out = BytesMut::new().writer();
826 {
827 let mut writer =
828 ArrowWriter::try_new(&mut out, batch.schema(), None).unwrap();
829 writer.write(&batch).unwrap();
830 writer.finish().unwrap();
831 }
832 let data = out.into_inner().freeze();
833 let data_len = data.len();
834 store.put(&Path::from(filename), data.into()).await.unwrap();
835 data_len
836 }
837
838 fn make_dynamic_expr(expr: Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
839 Arc::new(DynamicFilterPhysicalExpr::new(
840 expr.children().into_iter().map(Arc::clone).collect(),
841 expr,
842 ))
843 }
844
845 #[tokio::test]
846 async fn test_prune_on_statistics() {
847 let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
848
849 let batch = record_batch!(
850 ("a", Int32, vec![Some(1), Some(2), Some(2)]),
851 ("b", Float32, vec![Some(1.0), Some(2.0), None])
852 )
853 .unwrap();
854
855 let data_size =
856 write_parquet(Arc::clone(&store), "test.parquet", batch.clone()).await;
857
858 let schema = batch.schema();
859 let file = PartitionedFile::new(
860 "test.parquet".to_string(),
861 u64::try_from(data_size).unwrap(),
862 )
863 .with_statistics(Arc::new(
864 Statistics::new_unknown(&schema)
865 .add_column_statistics(ColumnStatistics::new_unknown())
866 .add_column_statistics(
867 ColumnStatistics::new_unknown()
868 .with_min_value(Precision::Exact(ScalarValue::Float32(Some(1.0))))
869 .with_max_value(Precision::Exact(ScalarValue::Float32(Some(2.0))))
870 .with_null_count(Precision::Exact(1)),
871 ),
872 ));
873
874 let make_opener = |predicate| {
875 ParquetOpener {
876 partition_index: 0,
877 projection: Arc::new([0, 1]),
878 batch_size: 1024,
879 limit: None,
880 predicate: Some(predicate),
881 logical_file_schema: schema.clone(),
882 metadata_size_hint: None,
883 metrics: ExecutionPlanMetricsSet::new(),
884 parquet_file_reader_factory: Arc::new(
885 DefaultParquetFileReaderFactory::new(Arc::clone(&store)),
886 ),
887 partition_fields: vec![],
888 pushdown_filters: false, reorder_filters: false,
890 enable_page_index: false,
891 enable_bloom_filter: false,
892 schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
893 enable_row_group_stats_pruning: true,
894 coerce_int96: None,
895 #[cfg(feature = "parquet_encryption")]
896 file_decryption_properties: None,
897 expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
898 #[cfg(feature = "parquet_encryption")]
899 encryption_factory: None,
900 max_predicate_cache_size: None,
901 }
902 };
903
904 let expr = col("a").eq(lit(1));
906 let predicate = logical2physical(&expr, &schema);
907 let opener = make_opener(predicate);
908 let stream = opener.open(file.clone()).unwrap().await.unwrap();
909 let (num_batches, num_rows) = count_batches_and_rows(stream).await;
910 assert_eq!(num_batches, 1);
911 assert_eq!(num_rows, 3);
912
913 let expr = col("b").eq(lit(ScalarValue::Float32(Some(5.0))));
915 let predicate = logical2physical(&expr, &schema);
916 let opener = make_opener(predicate);
917 let stream = opener.open(file).unwrap().await.unwrap();
918 let (num_batches, num_rows) = count_batches_and_rows(stream).await;
919 assert_eq!(num_batches, 0);
920 assert_eq!(num_rows, 0);
921 }
922
923 #[tokio::test]
924 async fn test_prune_on_partition_statistics_with_dynamic_expression() {
925 let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
926
927 let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap();
928 let data_size =
929 write_parquet(Arc::clone(&store), "part=1/file.parquet", batch.clone()).await;
930
931 let file_schema = batch.schema();
932 let mut file = PartitionedFile::new(
933 "part=1/file.parquet".to_string(),
934 u64::try_from(data_size).unwrap(),
935 );
936 file.partition_values = vec![ScalarValue::Int32(Some(1))];
937
938 let table_schema = Arc::new(Schema::new(vec![
939 Field::new("part", DataType::Int32, false),
940 Field::new("a", DataType::Int32, false),
941 ]));
942
943 let make_opener = |predicate| {
944 ParquetOpener {
945 partition_index: 0,
946 projection: Arc::new([0]),
947 batch_size: 1024,
948 limit: None,
949 predicate: Some(predicate),
950 logical_file_schema: file_schema.clone(),
951 metadata_size_hint: None,
952 metrics: ExecutionPlanMetricsSet::new(),
953 parquet_file_reader_factory: Arc::new(
954 DefaultParquetFileReaderFactory::new(Arc::clone(&store)),
955 ),
956 partition_fields: vec![Arc::new(Field::new(
957 "part",
958 DataType::Int32,
959 false,
960 ))],
961 pushdown_filters: false, reorder_filters: false,
963 enable_page_index: false,
964 enable_bloom_filter: false,
965 schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
966 enable_row_group_stats_pruning: true,
967 coerce_int96: None,
968 #[cfg(feature = "parquet_encryption")]
969 file_decryption_properties: None,
970 expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
971 #[cfg(feature = "parquet_encryption")]
972 encryption_factory: None,
973 max_predicate_cache_size: None,
974 }
975 };
976
977 let expr = col("part").eq(lit(1));
979 let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
982 let opener = make_opener(predicate);
983 let stream = opener.open(file.clone()).unwrap().await.unwrap();
984 let (num_batches, num_rows) = count_batches_and_rows(stream).await;
985 assert_eq!(num_batches, 1);
986 assert_eq!(num_rows, 3);
987
988 let expr = col("part").eq(lit(2));
990 let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
993 let opener = make_opener(predicate);
994 let stream = opener.open(file).unwrap().await.unwrap();
995 let (num_batches, num_rows) = count_batches_and_rows(stream).await;
996 assert_eq!(num_batches, 0);
997 assert_eq!(num_rows, 0);
998 }
999
1000 #[tokio::test]
1001 async fn test_prune_on_partition_values_and_file_statistics() {
1002 let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
1003
1004 let batch = record_batch!(
1005 ("a", Int32, vec![Some(1), Some(2), Some(3)]),
1006 ("b", Float64, vec![Some(1.0), Some(2.0), None])
1007 )
1008 .unwrap();
1009 let data_size =
1010 write_parquet(Arc::clone(&store), "part=1/file.parquet", batch.clone()).await;
1011 let file_schema = batch.schema();
1012 let mut file = PartitionedFile::new(
1013 "part=1/file.parquet".to_string(),
1014 u64::try_from(data_size).unwrap(),
1015 );
1016 file.partition_values = vec![ScalarValue::Int32(Some(1))];
1017 file.statistics = Some(Arc::new(
1018 Statistics::new_unknown(&file_schema)
1019 .add_column_statistics(ColumnStatistics::new_unknown())
1020 .add_column_statistics(
1021 ColumnStatistics::new_unknown()
1022 .with_min_value(Precision::Exact(ScalarValue::Float64(Some(1.0))))
1023 .with_max_value(Precision::Exact(ScalarValue::Float64(Some(2.0))))
1024 .with_null_count(Precision::Exact(1)),
1025 ),
1026 ));
1027 let table_schema = Arc::new(Schema::new(vec![
1028 Field::new("part", DataType::Int32, false),
1029 Field::new("a", DataType::Int32, false),
1030 Field::new("b", DataType::Float32, true),
1031 ]));
1032 let make_opener = |predicate| {
1033 ParquetOpener {
1034 partition_index: 0,
1035 projection: Arc::new([0]),
1036 batch_size: 1024,
1037 limit: None,
1038 predicate: Some(predicate),
1039 logical_file_schema: file_schema.clone(),
1040 metadata_size_hint: None,
1041 metrics: ExecutionPlanMetricsSet::new(),
1042 parquet_file_reader_factory: Arc::new(
1043 DefaultParquetFileReaderFactory::new(Arc::clone(&store)),
1044 ),
1045 partition_fields: vec![Arc::new(Field::new(
1046 "part",
1047 DataType::Int32,
1048 false,
1049 ))],
1050 pushdown_filters: false, reorder_filters: false,
1052 enable_page_index: false,
1053 enable_bloom_filter: false,
1054 schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
1055 enable_row_group_stats_pruning: true,
1056 coerce_int96: None,
1057 #[cfg(feature = "parquet_encryption")]
1058 file_decryption_properties: None,
1059 expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
1060 #[cfg(feature = "parquet_encryption")]
1061 encryption_factory: None,
1062 max_predicate_cache_size: None,
1063 }
1064 };
1065
1066 let expr = col("part").eq(lit(1)).and(col("b").eq(lit(1.0)));
1068 let predicate = logical2physical(&expr, &table_schema);
1069 let opener = make_opener(predicate);
1070 let stream = opener.open(file.clone()).unwrap().await.unwrap();
1071 let (num_batches, num_rows) = count_batches_and_rows(stream).await;
1072 assert_eq!(num_batches, 1);
1073 assert_eq!(num_rows, 3);
1074
1075 let expr = col("part").eq(lit(2)).and(col("b").eq(lit(1.0)));
1077 let predicate = logical2physical(&expr, &table_schema);
1078 let opener = make_opener(predicate);
1079 let stream = opener.open(file.clone()).unwrap().await.unwrap();
1080 let (num_batches, num_rows) = count_batches_and_rows(stream).await;
1081 assert_eq!(num_batches, 0);
1082 assert_eq!(num_rows, 0);
1083
1084 let expr = col("part").eq(lit(1)).and(col("b").eq(lit(7.0)));
1086 let predicate = logical2physical(&expr, &table_schema);
1087 let opener = make_opener(predicate);
1088 let stream = opener.open(file.clone()).unwrap().await.unwrap();
1089 let (num_batches, num_rows) = count_batches_and_rows(stream).await;
1090 assert_eq!(num_batches, 0);
1091 assert_eq!(num_rows, 0);
1092
1093 let expr = col("part").eq(lit(2)).and(col("b").eq(lit(7.0)));
1095 let predicate = logical2physical(&expr, &table_schema);
1096 let opener = make_opener(predicate);
1097 let stream = opener.open(file).unwrap().await.unwrap();
1098 let (num_batches, num_rows) = count_batches_and_rows(stream).await;
1099 assert_eq!(num_batches, 0);
1100 assert_eq!(num_rows, 0);
1101 }
1102
1103 #[tokio::test]
1104 async fn test_prune_on_partition_value_and_data_value() {
1105 let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
1106
1107 let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(4)])).unwrap();
1109 let data_size =
1110 write_parquet(Arc::clone(&store), "part=1/file.parquet", batch.clone()).await;
1111
1112 let file_schema = batch.schema();
1113 let mut file = PartitionedFile::new(
1114 "part=1/file.parquet".to_string(),
1115 u64::try_from(data_size).unwrap(),
1116 );
1117 file.partition_values = vec![ScalarValue::Int32(Some(1))];
1118
1119 let table_schema = Arc::new(Schema::new(vec![
1120 Field::new("part", DataType::Int32, false),
1121 Field::new("a", DataType::Int32, false),
1122 ]));
1123
1124 let make_opener = |predicate| {
1125 ParquetOpener {
1126 partition_index: 0,
1127 projection: Arc::new([0]),
1128 batch_size: 1024,
1129 limit: None,
1130 predicate: Some(predicate),
1131 logical_file_schema: file_schema.clone(),
1132 metadata_size_hint: None,
1133 metrics: ExecutionPlanMetricsSet::new(),
1134 parquet_file_reader_factory: Arc::new(
1135 DefaultParquetFileReaderFactory::new(Arc::clone(&store)),
1136 ),
1137 partition_fields: vec![Arc::new(Field::new(
1138 "part",
1139 DataType::Int32,
1140 false,
1141 ))],
1142 pushdown_filters: true, reorder_filters: true,
1144 enable_page_index: false,
1145 enable_bloom_filter: false,
1146 schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
1147 enable_row_group_stats_pruning: false, coerce_int96: None,
1149 #[cfg(feature = "parquet_encryption")]
1150 file_decryption_properties: None,
1151 expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
1152 #[cfg(feature = "parquet_encryption")]
1153 encryption_factory: None,
1154 max_predicate_cache_size: None,
1155 }
1156 };
1157
1158 let expr = col("part").eq(lit(1)).or(col("a").eq(lit(1)));
1160 let predicate = logical2physical(&expr, &table_schema);
1161 let opener = make_opener(predicate);
1162 let stream = opener.open(file.clone()).unwrap().await.unwrap();
1163 let (num_batches, num_rows) = count_batches_and_rows(stream).await;
1164 assert_eq!(num_batches, 1);
1165 assert_eq!(num_rows, 3);
1166
1167 let expr = col("part").eq(lit(1)).or(col("a").eq(lit(3)));
1169 let predicate = logical2physical(&expr, &table_schema);
1170 let opener = make_opener(predicate);
1171 let stream = opener.open(file.clone()).unwrap().await.unwrap();
1172 let (num_batches, num_rows) = count_batches_and_rows(stream).await;
1173 assert_eq!(num_batches, 1);
1174 assert_eq!(num_rows, 3);
1175
1176 let expr = col("part").eq(lit(2)).or(col("a").eq(lit(1)));
1178 let predicate = logical2physical(&expr, &table_schema);
1179 let opener = make_opener(predicate);
1180 let stream = opener.open(file.clone()).unwrap().await.unwrap();
1181 let (num_batches, num_rows) = count_batches_and_rows(stream).await;
1182 assert_eq!(num_batches, 1);
1183 assert_eq!(num_rows, 1);
1184
1185 let expr = col("part").eq(lit(2)).or(col("a").eq(lit(3)));
1187 let predicate = logical2physical(&expr, &table_schema);
1188 let opener = make_opener(predicate);
1189 let stream = opener.open(file).unwrap().await.unwrap();
1190 let (num_batches, num_rows) = count_batches_and_rows(stream).await;
1191 assert_eq!(num_batches, 0);
1192 assert_eq!(num_rows, 0);
1193 }
1194
1195 #[tokio::test]
1197 async fn test_opener_pruning_skipped_on_static_filters() {
1198 let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
1199
1200 let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap();
1201 let data_size =
1202 write_parquet(Arc::clone(&store), "part=1/file.parquet", batch.clone()).await;
1203
1204 let file_schema = batch.schema();
1205 let mut file = PartitionedFile::new(
1206 "part=1/file.parquet".to_string(),
1207 u64::try_from(data_size).unwrap(),
1208 );
1209 file.partition_values = vec![ScalarValue::Int32(Some(1))];
1210
1211 let table_schema = Arc::new(Schema::new(vec![
1212 Field::new("part", DataType::Int32, false),
1213 Field::new("a", DataType::Int32, false),
1214 ]));
1215
1216 let make_opener = |predicate| {
1217 ParquetOpener {
1218 partition_index: 0,
1219 projection: Arc::new([0]),
1220 batch_size: 1024,
1221 limit: None,
1222 predicate: Some(predicate),
1223 logical_file_schema: file_schema.clone(),
1224 metadata_size_hint: None,
1225 metrics: ExecutionPlanMetricsSet::new(),
1226 parquet_file_reader_factory: Arc::new(
1227 DefaultParquetFileReaderFactory::new(Arc::clone(&store)),
1228 ),
1229 partition_fields: vec![Arc::new(Field::new(
1230 "part",
1231 DataType::Int32,
1232 false,
1233 ))],
1234 pushdown_filters: false, reorder_filters: false,
1236 enable_page_index: false,
1237 enable_bloom_filter: false,
1238 schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
1239 enable_row_group_stats_pruning: true,
1240 coerce_int96: None,
1241 #[cfg(feature = "parquet_encryption")]
1242 file_decryption_properties: None,
1243 expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
1244 #[cfg(feature = "parquet_encryption")]
1245 encryption_factory: None,
1246 max_predicate_cache_size: None,
1247 }
1248 };
1249
1250 let expr = col("part").eq(lit(2));
1252 let predicate = logical2physical(&expr, &table_schema);
1253 let opener = make_opener(predicate);
1254 let stream = opener.open(file.clone()).unwrap().await.unwrap();
1255 let (num_batches, num_rows) = count_batches_and_rows(stream).await;
1256 assert_eq!(num_batches, 1);
1257 assert_eq!(num_rows, 3);
1258
1259 let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
1261 let opener = make_opener(predicate);
1262 let stream = opener.open(file.clone()).unwrap().await.unwrap();
1263 let (num_batches, num_rows) = count_batches_and_rows(stream).await;
1264 assert_eq!(num_batches, 0);
1265 assert_eq!(num_rows, 0);
1266 }
1267
1268 fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize {
1269 match metrics.sum_by_name(metric_name) {
1270 Some(v) => v.as_usize(),
1271 _ => {
1272 panic!(
1273 "Expected metric not found. Looking for '{metric_name}' in\n\n{metrics:#?}"
1274 );
1275 }
1276 }
1277 }
1278
1279 #[tokio::test]
1280 async fn test_custom_schema_adapter_no_rewriter() {
1281 #[derive(Debug, Clone)]
1284 struct CustomSchemaMapper;
1285
1286 impl SchemaMapper for CustomSchemaMapper {
1287 fn map_batch(
1288 &self,
1289 batch: arrow::array::RecordBatch,
1290 ) -> datafusion_common::Result<arrow::array::RecordBatch> {
1291 let a_column = cast(batch.column(0), &DataType::UInt64)?;
1292 let b_column =
1294 arrow::array::Float64Array::from(vec![Some(0.0); batch.num_rows()]);
1295 let columns = vec![a_column, Arc::new(b_column)];
1296 let new_schema = Arc::new(Schema::new(vec![
1297 Field::new("a", DataType::UInt64, false),
1298 Field::new("b", DataType::Float64, false),
1299 ]));
1300 Ok(arrow::record_batch::RecordBatch::try_new(
1301 new_schema, columns,
1302 )?)
1303 }
1304
1305 fn map_column_statistics(
1306 &self,
1307 file_col_statistics: &[ColumnStatistics],
1308 ) -> datafusion_common::Result<Vec<ColumnStatistics>> {
1309 Ok(vec![
1310 file_col_statistics[0].clone(),
1311 ColumnStatistics::new_unknown(),
1312 ])
1313 }
1314 }
1315
1316 #[derive(Debug, Clone)]
1317 struct CustomSchemaAdapter;
1318
1319 impl SchemaAdapter for CustomSchemaAdapter {
1320 fn map_schema(
1321 &self,
1322 _file_schema: &Schema,
1323 ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)>
1324 {
1325 let mapper = Arc::new(CustomSchemaMapper);
1326 let projection = vec![0]; Ok((mapper, projection))
1328 }
1329
1330 fn map_column_index(
1331 &self,
1332 index: usize,
1333 file_schema: &Schema,
1334 ) -> Option<usize> {
1335 if index < file_schema.fields().len() {
1336 Some(index)
1337 } else {
1338 None }
1340 }
1341 }
1342
1343 #[derive(Debug, Clone)]
1344 struct CustomSchemaAdapterFactory;
1345
1346 impl SchemaAdapterFactory for CustomSchemaAdapterFactory {
1347 fn create(
1348 &self,
1349 _projected_table_schema: SchemaRef,
1350 _table_schema: SchemaRef,
1351 ) -> Box<dyn SchemaAdapter> {
1352 Box::new(CustomSchemaAdapter)
1353 }
1354 }
1355
1356 let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
1358 let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap();
1359 let data_size =
1361 write_parquet(Arc::clone(&store), "test.parquet", batch.clone()).await;
1362 let file = PartitionedFile::new(
1363 "test.parquet".to_string(),
1364 u64::try_from(data_size).unwrap(),
1365 );
1366 let table_schema = Arc::new(Schema::new(vec![
1367 Field::new("a", DataType::UInt64, false),
1368 Field::new("b", DataType::Float64, false),
1369 ]));
1370
1371 let make_opener = |predicate| ParquetOpener {
1372 partition_index: 0,
1373 projection: Arc::new([0, 1]),
1374 batch_size: 1024,
1375 limit: None,
1376 predicate: Some(predicate),
1377 logical_file_schema: Arc::clone(&table_schema),
1378 metadata_size_hint: None,
1379 metrics: ExecutionPlanMetricsSet::new(),
1380 parquet_file_reader_factory: Arc::new(DefaultParquetFileReaderFactory::new(
1381 Arc::clone(&store),
1382 )),
1383 partition_fields: vec![],
1384 pushdown_filters: true,
1385 reorder_filters: false,
1386 enable_page_index: false,
1387 enable_bloom_filter: false,
1388 schema_adapter_factory: Arc::new(CustomSchemaAdapterFactory),
1389 enable_row_group_stats_pruning: false,
1390 coerce_int96: None,
1391 #[cfg(feature = "parquet_encryption")]
1392 file_decryption_properties: None,
1393 expr_adapter_factory: None,
1394 #[cfg(feature = "parquet_encryption")]
1395 encryption_factory: None,
1396 max_predicate_cache_size: None,
1397 };
1398
1399 let predicate = logical2physical(&col("a").eq(lit(1u64)), &table_schema);
1400 let opener = make_opener(predicate);
1401 let stream = opener.open(file.clone()).unwrap().await.unwrap();
1402 let batches = collect_batches(stream).await;
1403
1404 #[rustfmt::skip]
1405 let expected = [
1406 "+---+-----+",
1407 "| a | b |",
1408 "+---+-----+",
1409 "| 1 | 0.0 |",
1410 "+---+-----+",
1411 ];
1412 assert_batches_eq!(expected, &batches);
1413 let metrics = opener.metrics.clone_inner();
1414 assert_eq!(get_value(&metrics, "row_groups_pruned_statistics"), 0);
1415 assert_eq!(get_value(&metrics, "pushdown_rows_pruned"), 2);
1416 }
1417}