datafusion_datasource_parquet/
opener.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ParquetOpener`] for opening Parquet files
19
20use crate::page_filter::PagePruningAccessPlanFilter;
21use crate::row_group_filter::RowGroupAccessPlanFilter;
22use crate::{
23    apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter,
24    ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory,
25};
26use arrow::array::RecordBatch;
27use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
28use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
29use std::pin::Pin;
30use std::sync::Arc;
31use std::task::{Context, Poll};
32
33use arrow::datatypes::{FieldRef, SchemaRef, TimeUnit};
34use datafusion_common::encryption::FileDecryptionProperties;
35
36use datafusion_common::{exec_err, DataFusionError, Result};
37use datafusion_datasource::PartitionedFile;
38use datafusion_physical_expr::simplifier::PhysicalExprSimplifier;
39use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
40use datafusion_physical_expr_common::physical_expr::{
41    is_dynamic_physical_expr, PhysicalExpr,
42};
43use datafusion_physical_plan::metrics::{
44    Count, ExecutionPlanMetricsSet, MetricBuilder, PruningMetrics,
45};
46use datafusion_pruning::{build_pruning_predicate, FilePruner, PruningPredicate};
47
48#[cfg(feature = "parquet_encryption")]
49use datafusion_common::config::EncryptionFactoryOptions;
50#[cfg(feature = "parquet_encryption")]
51use datafusion_execution::parquet_encryption::EncryptionFactory;
52use futures::{ready, Stream, StreamExt, TryStreamExt};
53use itertools::Itertools;
54use log::debug;
55use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
56use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
57use parquet::arrow::async_reader::AsyncFileReader;
58use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
59use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader};
60
61/// Implements [`FileOpener`] for a parquet file
62pub(super) struct ParquetOpener {
63    /// Execution partition index
64    pub partition_index: usize,
65    /// Column indexes in `table_schema` needed by the query
66    pub projection: Arc<[usize]>,
67    /// Target number of rows in each output RecordBatch
68    pub batch_size: usize,
69    /// Optional limit on the number of rows to read
70    pub limit: Option<usize>,
71    /// Optional predicate to apply during the scan
72    pub predicate: Option<Arc<dyn PhysicalExpr>>,
73    /// Schema of the output table without partition columns.
74    /// This is the schema we coerce the physical file schema into.
75    pub logical_file_schema: SchemaRef,
76    /// Partition columns
77    pub partition_fields: Vec<FieldRef>,
78    /// Optional hint for how large the initial request to read parquet metadata
79    /// should be
80    pub metadata_size_hint: Option<usize>,
81    /// Metrics for reporting
82    pub metrics: ExecutionPlanMetricsSet,
83    /// Factory for instantiating parquet reader
84    pub parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
85    /// Should the filters be evaluated during the parquet scan using
86    /// [`DataFusionArrowPredicate`](row_filter::DatafusionArrowPredicate)?
87    pub pushdown_filters: bool,
88    /// Should the filters be reordered to optimize the scan?
89    pub reorder_filters: bool,
90    /// Should the page index be read from parquet files, if present, to skip
91    /// data pages
92    pub enable_page_index: bool,
93    /// Should the bloom filter be read from parquet, if present, to skip row
94    /// groups
95    pub enable_bloom_filter: bool,
96    /// Schema adapter factory
97    pub schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
98    /// Should row group pruning be applied
99    pub enable_row_group_stats_pruning: bool,
100    /// Coerce INT96 timestamps to specific TimeUnit
101    pub coerce_int96: Option<TimeUnit>,
102    /// Optional parquet FileDecryptionProperties
103    #[cfg(feature = "parquet_encryption")]
104    pub file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
105    /// Rewrite expressions in the context of the file schema
106    pub(crate) expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
107    /// Optional factory to create file decryption properties dynamically
108    #[cfg(feature = "parquet_encryption")]
109    pub encryption_factory:
110        Option<(Arc<dyn EncryptionFactory>, EncryptionFactoryOptions)>,
111    /// Maximum size of the predicate cache, in bytes. If none, uses
112    /// the arrow-rs default.
113    pub max_predicate_cache_size: Option<usize>,
114}
115
116impl FileOpener for ParquetOpener {
117    fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
118        let file_range = partitioned_file.range.clone();
119        let extensions = partitioned_file.extensions.clone();
120        let file_location = partitioned_file.object_meta.location.clone();
121        let file_name = file_location.to_string();
122        let file_metrics =
123            ParquetFileMetrics::new(self.partition_index, &file_name, &self.metrics);
124
125        let metadata_size_hint = partitioned_file
126            .metadata_size_hint
127            .or(self.metadata_size_hint);
128
129        let mut async_file_reader: Box<dyn AsyncFileReader> =
130            self.parquet_file_reader_factory.create_reader(
131                self.partition_index,
132                partitioned_file.clone(),
133                metadata_size_hint,
134                &self.metrics,
135            )?;
136
137        let batch_size = self.batch_size;
138
139        let projected_schema =
140            SchemaRef::from(self.logical_file_schema.project(&self.projection)?);
141        let schema_adapter_factory = Arc::clone(&self.schema_adapter_factory);
142        let schema_adapter = self
143            .schema_adapter_factory
144            .create(projected_schema, Arc::clone(&self.logical_file_schema));
145        let mut predicate = self.predicate.clone();
146        let logical_file_schema = Arc::clone(&self.logical_file_schema);
147        let partition_fields = self.partition_fields.clone();
148        let reorder_predicates = self.reorder_filters;
149        let pushdown_filters = self.pushdown_filters;
150        let coerce_int96 = self.coerce_int96;
151        let enable_bloom_filter = self.enable_bloom_filter;
152        let enable_row_group_stats_pruning = self.enable_row_group_stats_pruning;
153        let limit = self.limit;
154
155        let predicate_creation_errors = MetricBuilder::new(&self.metrics)
156            .global_counter("num_predicate_creation_errors");
157
158        let expr_adapter_factory = self.expr_adapter_factory.clone();
159        let mut predicate_file_schema = Arc::clone(&self.logical_file_schema);
160
161        let enable_page_index = self.enable_page_index;
162        #[cfg(feature = "parquet_encryption")]
163        let encryption_context = self.get_encryption_context();
164        let max_predicate_cache_size = self.max_predicate_cache_size;
165
166        Ok(Box::pin(async move {
167            #[cfg(feature = "parquet_encryption")]
168            let file_decryption_properties = encryption_context
169                .get_file_decryption_properties(&file_location)
170                .await?;
171
172            // Prune this file using the file level statistics and partition values.
173            // Since dynamic filters may have been updated since planning it is possible that we are able
174            // to prune files now that we couldn't prune at planning time.
175            // It is assumed that there is no point in doing pruning here if the predicate is not dynamic,
176            // as it would have been done at planning time.
177            // We'll also check this after every record batch we read,
178            // and if at some point we are able to prove we can prune the file using just the file level statistics
179            // we can end the stream early.
180            let mut file_pruner = predicate
181                .as_ref()
182                .map(|p| {
183                    Ok::<_, DataFusionError>(
184                        (is_dynamic_physical_expr(p) | partitioned_file.has_statistics())
185                            .then_some(FilePruner::new(
186                                Arc::clone(p),
187                                &logical_file_schema,
188                                partition_fields.clone(),
189                                partitioned_file.clone(),
190                                predicate_creation_errors.clone(),
191                            )?),
192                    )
193                })
194                .transpose()?
195                .flatten();
196
197            if let Some(file_pruner) = &mut file_pruner {
198                if file_pruner.should_prune()? {
199                    // Return an empty stream immediately to skip the work of setting up the actual stream
200                    file_metrics.files_ranges_pruned_statistics.add_pruned(1);
201                    return Ok(futures::stream::empty().boxed());
202                }
203            }
204
205            file_metrics.files_ranges_pruned_statistics.add_matched(1);
206
207            // Don't load the page index yet. Since it is not stored inline in
208            // the footer, loading the page index if it is not needed will do
209            // unnecessary I/O. We decide later if it is needed to evaluate the
210            // pruning predicates. Thus default to not requesting if from the
211            // underlying reader.
212            let mut options = ArrowReaderOptions::new().with_page_index(false);
213            #[cfg(feature = "parquet_encryption")]
214            if let Some(fd_val) = file_decryption_properties {
215                options = options.with_file_decryption_properties(Arc::clone(&fd_val));
216            }
217            let mut metadata_timer = file_metrics.metadata_load_time.timer();
218
219            // Begin by loading the metadata from the underlying reader (note
220            // the returned metadata may actually include page indexes as some
221            // readers may return page indexes even when not requested -- for
222            // example when they are cached)
223            let mut reader_metadata =
224                ArrowReaderMetadata::load_async(&mut async_file_reader, options.clone())
225                    .await?;
226
227            // Note about schemas: we are actually dealing with **3 different schemas** here:
228            // - The table schema as defined by the TableProvider.
229            //   This is what the user sees, what they get when they `SELECT * FROM table`, etc.
230            // - The logical file schema: this is the table schema minus any hive partition columns and projections.
231            //   This is what the physicalfile schema is coerced to.
232            // - The physical file schema: this is the schema as defined by the parquet file. This is what the parquet file actually contains.
233            let mut physical_file_schema = Arc::clone(reader_metadata.schema());
234
235            // The schema loaded from the file may not be the same as the
236            // desired schema (for example if we want to instruct the parquet
237            // reader to read strings using Utf8View instead). Update if necessary
238            if let Some(merged) = apply_file_schema_type_coercions(
239                &logical_file_schema,
240                &physical_file_schema,
241            ) {
242                physical_file_schema = Arc::new(merged);
243                options = options.with_schema(Arc::clone(&physical_file_schema));
244                reader_metadata = ArrowReaderMetadata::try_new(
245                    Arc::clone(reader_metadata.metadata()),
246                    options.clone(),
247                )?;
248            }
249
250            if let Some(ref coerce) = coerce_int96 {
251                if let Some(merged) = coerce_int96_to_resolution(
252                    reader_metadata.parquet_schema(),
253                    &physical_file_schema,
254                    coerce,
255                ) {
256                    physical_file_schema = Arc::new(merged);
257                    options = options.with_schema(Arc::clone(&physical_file_schema));
258                    reader_metadata = ArrowReaderMetadata::try_new(
259                        Arc::clone(reader_metadata.metadata()),
260                        options.clone(),
261                    )?;
262                }
263            }
264
265            // Adapt the predicate to the physical file schema.
266            // This evaluates missing columns and inserts any necessary casts.
267            if let Some(expr_adapter_factory) = expr_adapter_factory {
268                predicate = predicate
269                    .map(|p| {
270                        let partition_values = partition_fields
271                            .iter()
272                            .cloned()
273                            .zip(partitioned_file.partition_values)
274                            .collect_vec();
275                        let expr = expr_adapter_factory
276                            .create(
277                                Arc::clone(&logical_file_schema),
278                                Arc::clone(&physical_file_schema),
279                            )
280                            .with_partition_values(partition_values)
281                            .rewrite(p)?;
282                        // After rewriting to the file schema, further simplifications may be possible.
283                        // For example, if `'a' = col_that_is_missing` becomes `'a' = NULL` that can then be simplified to `FALSE`
284                        // and we can avoid doing any more work on the file (bloom filters, loading the page index, etc.).
285                        PhysicalExprSimplifier::new(&physical_file_schema).simplify(expr)
286                    })
287                    .transpose()?;
288                predicate_file_schema = Arc::clone(&physical_file_schema);
289            }
290
291            // Build predicates for this specific file
292            let (pruning_predicate, page_pruning_predicate) = build_pruning_predicates(
293                predicate.as_ref(),
294                &predicate_file_schema,
295                &predicate_creation_errors,
296            );
297
298            // The page index is not stored inline in the parquet footer so the
299            // code above may not have read the page index structures yet. If we
300            // need them for reading and they aren't yet loaded, we need to load them now.
301            if should_enable_page_index(enable_page_index, &page_pruning_predicate) {
302                reader_metadata = load_page_index(
303                    reader_metadata,
304                    &mut async_file_reader,
305                    // Since we're manually loading the page index the option here should not matter but we pass it in for consistency
306                    options.with_page_index(true),
307                )
308                .await?;
309            }
310
311            metadata_timer.stop();
312
313            let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata(
314                async_file_reader,
315                reader_metadata,
316            );
317
318            let (schema_mapping, adapted_projections) =
319                schema_adapter.map_schema(&physical_file_schema)?;
320
321            let mask = ProjectionMask::roots(
322                builder.parquet_schema(),
323                adapted_projections.iter().cloned(),
324            );
325
326            // Filter pushdown: evaluate predicates during scan
327            if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() {
328                let row_filter = row_filter::build_row_filter(
329                    &predicate,
330                    &physical_file_schema,
331                    &predicate_file_schema,
332                    builder.metadata(),
333                    reorder_predicates,
334                    &file_metrics,
335                    &schema_adapter_factory,
336                );
337
338                match row_filter {
339                    Ok(Some(filter)) => {
340                        builder = builder.with_row_filter(filter);
341                    }
342                    Ok(None) => {}
343                    Err(e) => {
344                        debug!(
345                            "Ignoring error building row filter for '{predicate:?}': {e}"
346                        );
347                    }
348                };
349            };
350
351            // Determine which row groups to actually read. The idea is to skip
352            // as many row groups as possible based on the metadata and query
353            let file_metadata = Arc::clone(builder.metadata());
354            let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
355            let rg_metadata = file_metadata.row_groups();
356            // track which row groups to actually read
357            let access_plan =
358                create_initial_plan(&file_name, extensions, rg_metadata.len())?;
359            let mut row_groups = RowGroupAccessPlanFilter::new(access_plan);
360            // if there is a range restricting what parts of the file to read
361            if let Some(range) = file_range.as_ref() {
362                row_groups.prune_by_range(rg_metadata, range);
363            }
364
365            // If there is a predicate that can be evaluated against the metadata
366            if let Some(predicate) = predicate.as_ref() {
367                if enable_row_group_stats_pruning {
368                    row_groups.prune_by_statistics(
369                        &physical_file_schema,
370                        builder.parquet_schema(),
371                        rg_metadata,
372                        predicate,
373                        &file_metrics,
374                    );
375                } else {
376                    // Update metrics: statistics unavailable, so all row groups are
377                    // matched (not pruned)
378                    file_metrics
379                        .row_groups_pruned_statistics
380                        .add_matched(row_groups.remaining_row_group_count());
381                }
382
383                if enable_bloom_filter && !row_groups.is_empty() {
384                    row_groups
385                        .prune_by_bloom_filters(
386                            &physical_file_schema,
387                            &mut builder,
388                            predicate,
389                            &file_metrics,
390                        )
391                        .await;
392                } else {
393                    // Update metrics: bloom filter unavailable, so all row groups are
394                    // matched (not pruned)
395                    file_metrics
396                        .row_groups_pruned_bloom_filter
397                        .add_matched(row_groups.remaining_row_group_count());
398                }
399            } else {
400                // Update metrics: no predicate, so all row groups are matched (not pruned)
401                let n_remaining_row_groups = row_groups.remaining_row_group_count();
402                file_metrics
403                    .row_groups_pruned_statistics
404                    .add_matched(n_remaining_row_groups);
405                file_metrics
406                    .row_groups_pruned_bloom_filter
407                    .add_matched(n_remaining_row_groups);
408            }
409
410            let mut access_plan = row_groups.build();
411
412            // page index pruning: if all data on individual pages can
413            // be ruled using page metadata, rows from other columns
414            // with that range can be skipped as well
415            if enable_page_index && !access_plan.is_empty() {
416                if let Some(p) = page_pruning_predicate {
417                    access_plan = p.prune_plan_with_page_index(
418                        access_plan,
419                        &physical_file_schema,
420                        builder.parquet_schema(),
421                        file_metadata.as_ref(),
422                        &file_metrics,
423                    );
424                }
425            }
426
427            let row_group_indexes = access_plan.row_group_indexes();
428            if let Some(row_selection) =
429                access_plan.into_overall_row_selection(rg_metadata)?
430            {
431                builder = builder.with_row_selection(row_selection);
432            }
433
434            if let Some(limit) = limit {
435                builder = builder.with_limit(limit)
436            }
437
438            if let Some(max_predicate_cache_size) = max_predicate_cache_size {
439                builder = builder.with_max_predicate_cache_size(max_predicate_cache_size);
440            }
441
442            // metrics from the arrow reader itself
443            let arrow_reader_metrics = ArrowReaderMetrics::enabled();
444
445            let stream = builder
446                .with_projection(mask)
447                .with_batch_size(batch_size)
448                .with_row_groups(row_group_indexes)
449                .with_metrics(arrow_reader_metrics.clone())
450                .build()?;
451
452            let files_ranges_pruned_statistics =
453                file_metrics.files_ranges_pruned_statistics.clone();
454            let predicate_cache_inner_records =
455                file_metrics.predicate_cache_inner_records.clone();
456            let predicate_cache_records = file_metrics.predicate_cache_records.clone();
457
458            let stream = stream.map_err(DataFusionError::from).map(move |b| {
459                b.and_then(|b| {
460                    copy_arrow_reader_metrics(
461                        &arrow_reader_metrics,
462                        &predicate_cache_inner_records,
463                        &predicate_cache_records,
464                    );
465                    schema_mapping.map_batch(b)
466                })
467            });
468
469            if let Some(file_pruner) = file_pruner {
470                Ok(EarlyStoppingStream::new(
471                    stream,
472                    file_pruner,
473                    files_ranges_pruned_statistics,
474                )
475                .boxed())
476            } else {
477                Ok(stream.boxed())
478            }
479        }))
480    }
481}
482
483/// Copies metrics from ArrowReaderMetrics (the metrics collected by the
484/// arrow-rs parquet reader) to the parquet file metrics for DataFusion
485fn copy_arrow_reader_metrics(
486    arrow_reader_metrics: &ArrowReaderMetrics,
487    predicate_cache_inner_records: &Count,
488    predicate_cache_records: &Count,
489) {
490    if let Some(v) = arrow_reader_metrics.records_read_from_inner() {
491        predicate_cache_inner_records.add(v);
492    }
493
494    if let Some(v) = arrow_reader_metrics.records_read_from_cache() {
495        predicate_cache_records.add(v);
496    }
497}
498
499/// Wraps an inner RecordBatchStream and a [`FilePruner`]
500///
501/// This can terminate the scan early when some dynamic filters is updated after
502/// the scan starts, so we discover after the scan starts that the file can be
503/// pruned (can't have matching rows).
504struct EarlyStoppingStream<S> {
505    /// Has the stream finished processing? All subsequent polls will return
506    /// None
507    done: bool,
508    file_pruner: FilePruner,
509    files_ranges_pruned_statistics: PruningMetrics,
510    /// The inner stream
511    inner: S,
512}
513
514impl<S> EarlyStoppingStream<S> {
515    pub fn new(
516        stream: S,
517        file_pruner: FilePruner,
518        files_ranges_pruned_statistics: PruningMetrics,
519    ) -> Self {
520        Self {
521            done: false,
522            inner: stream,
523            file_pruner,
524            files_ranges_pruned_statistics,
525        }
526    }
527}
528impl<S> EarlyStoppingStream<S>
529where
530    S: Stream<Item = Result<RecordBatch>> + Unpin,
531{
532    fn check_prune(&mut self, input: Result<RecordBatch>) -> Result<Option<RecordBatch>> {
533        let batch = input?;
534
535        // Since dynamic filters may have been updated, see if we can stop
536        // reading this stream entirely.
537        if self.file_pruner.should_prune()? {
538            self.files_ranges_pruned_statistics.add_pruned(1);
539            // Previously this file range has been counted as matched
540            self.files_ranges_pruned_statistics.subtract_matched(1);
541            self.done = true;
542            Ok(None)
543        } else {
544            // Return the adapted batch
545            Ok(Some(batch))
546        }
547    }
548}
549
550impl<S> Stream for EarlyStoppingStream<S>
551where
552    S: Stream<Item = Result<RecordBatch>> + Unpin,
553{
554    type Item = Result<RecordBatch>;
555
556    fn poll_next(
557        mut self: Pin<&mut Self>,
558        cx: &mut Context<'_>,
559    ) -> Poll<Option<Self::Item>> {
560        if self.done {
561            return Poll::Ready(None);
562        }
563        match ready!(self.inner.poll_next_unpin(cx)) {
564            None => {
565                // input done
566                self.done = true;
567                Poll::Ready(None)
568            }
569            Some(input_batch) => {
570                let output = self.check_prune(input_batch);
571                Poll::Ready(output.transpose())
572            }
573        }
574    }
575}
576
577#[derive(Default)]
578#[cfg_attr(not(feature = "parquet_encryption"), allow(dead_code))]
579struct EncryptionContext {
580    #[cfg(feature = "parquet_encryption")]
581    file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
582    #[cfg(feature = "parquet_encryption")]
583    encryption_factory: Option<(Arc<dyn EncryptionFactory>, EncryptionFactoryOptions)>,
584}
585
586#[cfg(feature = "parquet_encryption")]
587impl EncryptionContext {
588    fn new(
589        file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
590        encryption_factory: Option<(
591            Arc<dyn EncryptionFactory>,
592            EncryptionFactoryOptions,
593        )>,
594    ) -> Self {
595        Self {
596            file_decryption_properties,
597            encryption_factory,
598        }
599    }
600
601    async fn get_file_decryption_properties(
602        &self,
603        file_location: &object_store::path::Path,
604    ) -> Result<Option<Arc<FileDecryptionProperties>>> {
605        match &self.file_decryption_properties {
606            Some(file_decryption_properties) => {
607                Ok(Some(Arc::clone(file_decryption_properties)))
608            }
609            None => match &self.encryption_factory {
610                Some((encryption_factory, encryption_config)) => Ok(encryption_factory
611                    .get_file_decryption_properties(encryption_config, file_location)
612                    .await?),
613                None => Ok(None),
614            },
615        }
616    }
617}
618
619#[cfg(not(feature = "parquet_encryption"))]
620#[allow(dead_code)]
621impl EncryptionContext {
622    async fn get_file_decryption_properties(
623        &self,
624        _file_location: &object_store::path::Path,
625    ) -> Result<Option<Arc<FileDecryptionProperties>>> {
626        Ok(None)
627    }
628}
629
630impl ParquetOpener {
631    #[cfg(feature = "parquet_encryption")]
632    fn get_encryption_context(&self) -> EncryptionContext {
633        EncryptionContext::new(
634            self.file_decryption_properties.clone(),
635            self.encryption_factory.clone(),
636        )
637    }
638
639    #[cfg(not(feature = "parquet_encryption"))]
640    #[allow(dead_code)]
641    fn get_encryption_context(&self) -> EncryptionContext {
642        EncryptionContext::default()
643    }
644}
645
646/// Return the initial [`ParquetAccessPlan`]
647///
648/// If the user has supplied one as an extension, use that
649/// otherwise return a plan that scans all row groups
650///
651/// Returns an error if an invalid `ParquetAccessPlan` is provided
652///
653/// Note: file_name is only used for error messages
654fn create_initial_plan(
655    file_name: &str,
656    extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
657    row_group_count: usize,
658) -> Result<ParquetAccessPlan> {
659    if let Some(extensions) = extensions {
660        if let Some(access_plan) = extensions.downcast_ref::<ParquetAccessPlan>() {
661            let plan_len = access_plan.len();
662            if plan_len != row_group_count {
663                return exec_err!(
664                    "Invalid ParquetAccessPlan for {file_name}. Specified {plan_len} row groups, but file has {row_group_count}"
665                );
666            }
667
668            // check row group count matches the plan
669            return Ok(access_plan.clone());
670        } else {
671            debug!("DataSourceExec Ignoring unknown extension specified for {file_name}");
672        }
673    }
674
675    // default to scanning all row groups
676    Ok(ParquetAccessPlan::new_all(row_group_count))
677}
678
679/// Build a page pruning predicate from an optional predicate expression.
680/// If the predicate is None or the predicate cannot be converted to a page pruning
681/// predicate, return None.
682pub(crate) fn build_page_pruning_predicate(
683    predicate: &Arc<dyn PhysicalExpr>,
684    file_schema: &SchemaRef,
685) -> Arc<PagePruningAccessPlanFilter> {
686    Arc::new(PagePruningAccessPlanFilter::new(
687        predicate,
688        Arc::clone(file_schema),
689    ))
690}
691
692pub(crate) fn build_pruning_predicates(
693    predicate: Option<&Arc<dyn PhysicalExpr>>,
694    file_schema: &SchemaRef,
695    predicate_creation_errors: &Count,
696) -> (
697    Option<Arc<PruningPredicate>>,
698    Option<Arc<PagePruningAccessPlanFilter>>,
699) {
700    let Some(predicate) = predicate.as_ref() else {
701        return (None, None);
702    };
703    let pruning_predicate = build_pruning_predicate(
704        Arc::clone(predicate),
705        file_schema,
706        predicate_creation_errors,
707    );
708    let page_pruning_predicate = build_page_pruning_predicate(predicate, file_schema);
709    (pruning_predicate, Some(page_pruning_predicate))
710}
711
712/// Returns a `ArrowReaderMetadata` with the page index loaded, loading
713/// it from the underlying `AsyncFileReader` if necessary.
714async fn load_page_index<T: AsyncFileReader>(
715    reader_metadata: ArrowReaderMetadata,
716    input: &mut T,
717    options: ArrowReaderOptions,
718) -> Result<ArrowReaderMetadata> {
719    let parquet_metadata = reader_metadata.metadata();
720    let missing_column_index = parquet_metadata.column_index().is_none();
721    let missing_offset_index = parquet_metadata.offset_index().is_none();
722    // You may ask yourself: why are we even checking if the page index is already loaded here?
723    // Didn't we explicitly *not* load it above?
724    // Well it's possible that a custom implementation of `AsyncFileReader` gives you
725    // the page index even if you didn't ask for it (e.g. because it's cached)
726    // so it's important to check that here to avoid extra work.
727    if missing_column_index || missing_offset_index {
728        let m = Arc::try_unwrap(Arc::clone(parquet_metadata))
729            .unwrap_or_else(|e| e.as_ref().clone());
730        let mut reader = ParquetMetaDataReader::new_with_metadata(m)
731            .with_page_index_policy(PageIndexPolicy::Optional);
732        reader.load_page_index(input).await?;
733        let new_parquet_metadata = reader.finish()?;
734        let new_arrow_reader =
735            ArrowReaderMetadata::try_new(Arc::new(new_parquet_metadata), options)?;
736        Ok(new_arrow_reader)
737    } else {
738        // No need to load the page index again, just return the existing metadata
739        Ok(reader_metadata)
740    }
741}
742
743fn should_enable_page_index(
744    enable_page_index: bool,
745    page_pruning_predicate: &Option<Arc<PagePruningAccessPlanFilter>>,
746) -> bool {
747    enable_page_index
748        && page_pruning_predicate.is_some()
749        && page_pruning_predicate
750            .as_ref()
751            .map(|p| p.filter_number() > 0)
752            .unwrap_or(false)
753}
754
755#[cfg(test)]
756mod test {
757    use std::sync::Arc;
758
759    use arrow::{
760        compute::cast,
761        datatypes::{DataType, Field, Schema, SchemaRef},
762    };
763    use bytes::{BufMut, BytesMut};
764    use datafusion_common::{
765        assert_batches_eq, record_batch, stats::Precision, ColumnStatistics,
766        DataFusionError, ScalarValue, Statistics,
767    };
768    use datafusion_datasource::{
769        file_stream::FileOpener,
770        schema_adapter::{
771            DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory,
772            SchemaMapper,
773        },
774        PartitionedFile,
775    };
776    use datafusion_expr::{col, lit};
777    use datafusion_physical_expr::{
778        expressions::DynamicFilterPhysicalExpr, planner::logical2physical, PhysicalExpr,
779    };
780    use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
781    use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
782    use futures::{Stream, StreamExt};
783    use object_store::{memory::InMemory, path::Path, ObjectStore};
784    use parquet::arrow::ArrowWriter;
785
786    use crate::{opener::ParquetOpener, DefaultParquetFileReaderFactory};
787
788    async fn count_batches_and_rows(
789        mut stream: std::pin::Pin<
790            Box<
791                dyn Stream<Item = Result<arrow::array::RecordBatch, DataFusionError>>
792                    + Send,
793            >,
794        >,
795    ) -> (usize, usize) {
796        let mut num_batches = 0;
797        let mut num_rows = 0;
798        while let Some(Ok(batch)) = stream.next().await {
799            num_rows += batch.num_rows();
800            num_batches += 1;
801        }
802        (num_batches, num_rows)
803    }
804
805    async fn collect_batches(
806        mut stream: std::pin::Pin<
807            Box<
808                dyn Stream<Item = Result<arrow::array::RecordBatch, DataFusionError>>
809                    + Send,
810            >,
811        >,
812    ) -> Vec<arrow::array::RecordBatch> {
813        let mut batches = vec![];
814        while let Some(Ok(batch)) = stream.next().await {
815            batches.push(batch);
816        }
817        batches
818    }
819
820    async fn write_parquet(
821        store: Arc<dyn ObjectStore>,
822        filename: &str,
823        batch: arrow::record_batch::RecordBatch,
824    ) -> usize {
825        let mut out = BytesMut::new().writer();
826        {
827            let mut writer =
828                ArrowWriter::try_new(&mut out, batch.schema(), None).unwrap();
829            writer.write(&batch).unwrap();
830            writer.finish().unwrap();
831        }
832        let data = out.into_inner().freeze();
833        let data_len = data.len();
834        store.put(&Path::from(filename), data.into()).await.unwrap();
835        data_len
836    }
837
838    fn make_dynamic_expr(expr: Arc<dyn PhysicalExpr>) -> Arc<dyn PhysicalExpr> {
839        Arc::new(DynamicFilterPhysicalExpr::new(
840            expr.children().into_iter().map(Arc::clone).collect(),
841            expr,
842        ))
843    }
844
845    #[tokio::test]
846    async fn test_prune_on_statistics() {
847        let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
848
849        let batch = record_batch!(
850            ("a", Int32, vec![Some(1), Some(2), Some(2)]),
851            ("b", Float32, vec![Some(1.0), Some(2.0), None])
852        )
853        .unwrap();
854
855        let data_size =
856            write_parquet(Arc::clone(&store), "test.parquet", batch.clone()).await;
857
858        let schema = batch.schema();
859        let file = PartitionedFile::new(
860            "test.parquet".to_string(),
861            u64::try_from(data_size).unwrap(),
862        )
863        .with_statistics(Arc::new(
864            Statistics::new_unknown(&schema)
865                .add_column_statistics(ColumnStatistics::new_unknown())
866                .add_column_statistics(
867                    ColumnStatistics::new_unknown()
868                        .with_min_value(Precision::Exact(ScalarValue::Float32(Some(1.0))))
869                        .with_max_value(Precision::Exact(ScalarValue::Float32(Some(2.0))))
870                        .with_null_count(Precision::Exact(1)),
871                ),
872        ));
873
874        let make_opener = |predicate| {
875            ParquetOpener {
876                partition_index: 0,
877                projection: Arc::new([0, 1]),
878                batch_size: 1024,
879                limit: None,
880                predicate: Some(predicate),
881                logical_file_schema: schema.clone(),
882                metadata_size_hint: None,
883                metrics: ExecutionPlanMetricsSet::new(),
884                parquet_file_reader_factory: Arc::new(
885                    DefaultParquetFileReaderFactory::new(Arc::clone(&store)),
886                ),
887                partition_fields: vec![],
888                pushdown_filters: false, // note that this is false!
889                reorder_filters: false,
890                enable_page_index: false,
891                enable_bloom_filter: false,
892                schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
893                enable_row_group_stats_pruning: true,
894                coerce_int96: None,
895                #[cfg(feature = "parquet_encryption")]
896                file_decryption_properties: None,
897                expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
898                #[cfg(feature = "parquet_encryption")]
899                encryption_factory: None,
900                max_predicate_cache_size: None,
901            }
902        };
903
904        // A filter on "a" should not exclude any rows even if it matches the data
905        let expr = col("a").eq(lit(1));
906        let predicate = logical2physical(&expr, &schema);
907        let opener = make_opener(predicate);
908        let stream = opener.open(file.clone()).unwrap().await.unwrap();
909        let (num_batches, num_rows) = count_batches_and_rows(stream).await;
910        assert_eq!(num_batches, 1);
911        assert_eq!(num_rows, 3);
912
913        // A filter on `b = 5.0` should exclude all rows
914        let expr = col("b").eq(lit(ScalarValue::Float32(Some(5.0))));
915        let predicate = logical2physical(&expr, &schema);
916        let opener = make_opener(predicate);
917        let stream = opener.open(file).unwrap().await.unwrap();
918        let (num_batches, num_rows) = count_batches_and_rows(stream).await;
919        assert_eq!(num_batches, 0);
920        assert_eq!(num_rows, 0);
921    }
922
923    #[tokio::test]
924    async fn test_prune_on_partition_statistics_with_dynamic_expression() {
925        let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
926
927        let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap();
928        let data_size =
929            write_parquet(Arc::clone(&store), "part=1/file.parquet", batch.clone()).await;
930
931        let file_schema = batch.schema();
932        let mut file = PartitionedFile::new(
933            "part=1/file.parquet".to_string(),
934            u64::try_from(data_size).unwrap(),
935        );
936        file.partition_values = vec![ScalarValue::Int32(Some(1))];
937
938        let table_schema = Arc::new(Schema::new(vec![
939            Field::new("part", DataType::Int32, false),
940            Field::new("a", DataType::Int32, false),
941        ]));
942
943        let make_opener = |predicate| {
944            ParquetOpener {
945                partition_index: 0,
946                projection: Arc::new([0]),
947                batch_size: 1024,
948                limit: None,
949                predicate: Some(predicate),
950                logical_file_schema: file_schema.clone(),
951                metadata_size_hint: None,
952                metrics: ExecutionPlanMetricsSet::new(),
953                parquet_file_reader_factory: Arc::new(
954                    DefaultParquetFileReaderFactory::new(Arc::clone(&store)),
955                ),
956                partition_fields: vec![Arc::new(Field::new(
957                    "part",
958                    DataType::Int32,
959                    false,
960                ))],
961                pushdown_filters: false, // note that this is false!
962                reorder_filters: false,
963                enable_page_index: false,
964                enable_bloom_filter: false,
965                schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
966                enable_row_group_stats_pruning: true,
967                coerce_int96: None,
968                #[cfg(feature = "parquet_encryption")]
969                file_decryption_properties: None,
970                expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
971                #[cfg(feature = "parquet_encryption")]
972                encryption_factory: None,
973                max_predicate_cache_size: None,
974            }
975        };
976
977        // Filter should match the partition value
978        let expr = col("part").eq(lit(1));
979        // Mark the expression as dynamic even if it's not to force partition pruning to happen
980        // Otherwise we assume it already happened at the planning stage and won't re-do the work here
981        let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
982        let opener = make_opener(predicate);
983        let stream = opener.open(file.clone()).unwrap().await.unwrap();
984        let (num_batches, num_rows) = count_batches_and_rows(stream).await;
985        assert_eq!(num_batches, 1);
986        assert_eq!(num_rows, 3);
987
988        // Filter should not match the partition value
989        let expr = col("part").eq(lit(2));
990        // Mark the expression as dynamic even if it's not to force partition pruning to happen
991        // Otherwise we assume it already happened at the planning stage and won't re-do the work here
992        let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
993        let opener = make_opener(predicate);
994        let stream = opener.open(file).unwrap().await.unwrap();
995        let (num_batches, num_rows) = count_batches_and_rows(stream).await;
996        assert_eq!(num_batches, 0);
997        assert_eq!(num_rows, 0);
998    }
999
1000    #[tokio::test]
1001    async fn test_prune_on_partition_values_and_file_statistics() {
1002        let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
1003
1004        let batch = record_batch!(
1005            ("a", Int32, vec![Some(1), Some(2), Some(3)]),
1006            ("b", Float64, vec![Some(1.0), Some(2.0), None])
1007        )
1008        .unwrap();
1009        let data_size =
1010            write_parquet(Arc::clone(&store), "part=1/file.parquet", batch.clone()).await;
1011        let file_schema = batch.schema();
1012        let mut file = PartitionedFile::new(
1013            "part=1/file.parquet".to_string(),
1014            u64::try_from(data_size).unwrap(),
1015        );
1016        file.partition_values = vec![ScalarValue::Int32(Some(1))];
1017        file.statistics = Some(Arc::new(
1018            Statistics::new_unknown(&file_schema)
1019                .add_column_statistics(ColumnStatistics::new_unknown())
1020                .add_column_statistics(
1021                    ColumnStatistics::new_unknown()
1022                        .with_min_value(Precision::Exact(ScalarValue::Float64(Some(1.0))))
1023                        .with_max_value(Precision::Exact(ScalarValue::Float64(Some(2.0))))
1024                        .with_null_count(Precision::Exact(1)),
1025                ),
1026        ));
1027        let table_schema = Arc::new(Schema::new(vec![
1028            Field::new("part", DataType::Int32, false),
1029            Field::new("a", DataType::Int32, false),
1030            Field::new("b", DataType::Float32, true),
1031        ]));
1032        let make_opener = |predicate| {
1033            ParquetOpener {
1034                partition_index: 0,
1035                projection: Arc::new([0]),
1036                batch_size: 1024,
1037                limit: None,
1038                predicate: Some(predicate),
1039                logical_file_schema: file_schema.clone(),
1040                metadata_size_hint: None,
1041                metrics: ExecutionPlanMetricsSet::new(),
1042                parquet_file_reader_factory: Arc::new(
1043                    DefaultParquetFileReaderFactory::new(Arc::clone(&store)),
1044                ),
1045                partition_fields: vec![Arc::new(Field::new(
1046                    "part",
1047                    DataType::Int32,
1048                    false,
1049                ))],
1050                pushdown_filters: false, // note that this is false!
1051                reorder_filters: false,
1052                enable_page_index: false,
1053                enable_bloom_filter: false,
1054                schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
1055                enable_row_group_stats_pruning: true,
1056                coerce_int96: None,
1057                #[cfg(feature = "parquet_encryption")]
1058                file_decryption_properties: None,
1059                expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
1060                #[cfg(feature = "parquet_encryption")]
1061                encryption_factory: None,
1062                max_predicate_cache_size: None,
1063            }
1064        };
1065
1066        // Filter should match the partition value and file statistics
1067        let expr = col("part").eq(lit(1)).and(col("b").eq(lit(1.0)));
1068        let predicate = logical2physical(&expr, &table_schema);
1069        let opener = make_opener(predicate);
1070        let stream = opener.open(file.clone()).unwrap().await.unwrap();
1071        let (num_batches, num_rows) = count_batches_and_rows(stream).await;
1072        assert_eq!(num_batches, 1);
1073        assert_eq!(num_rows, 3);
1074
1075        // Should prune based on partition value but not file statistics
1076        let expr = col("part").eq(lit(2)).and(col("b").eq(lit(1.0)));
1077        let predicate = logical2physical(&expr, &table_schema);
1078        let opener = make_opener(predicate);
1079        let stream = opener.open(file.clone()).unwrap().await.unwrap();
1080        let (num_batches, num_rows) = count_batches_and_rows(stream).await;
1081        assert_eq!(num_batches, 0);
1082        assert_eq!(num_rows, 0);
1083
1084        // Should prune based on file statistics but not partition value
1085        let expr = col("part").eq(lit(1)).and(col("b").eq(lit(7.0)));
1086        let predicate = logical2physical(&expr, &table_schema);
1087        let opener = make_opener(predicate);
1088        let stream = opener.open(file.clone()).unwrap().await.unwrap();
1089        let (num_batches, num_rows) = count_batches_and_rows(stream).await;
1090        assert_eq!(num_batches, 0);
1091        assert_eq!(num_rows, 0);
1092
1093        // Should prune based on both partition value and file statistics
1094        let expr = col("part").eq(lit(2)).and(col("b").eq(lit(7.0)));
1095        let predicate = logical2physical(&expr, &table_schema);
1096        let opener = make_opener(predicate);
1097        let stream = opener.open(file).unwrap().await.unwrap();
1098        let (num_batches, num_rows) = count_batches_and_rows(stream).await;
1099        assert_eq!(num_batches, 0);
1100        assert_eq!(num_rows, 0);
1101    }
1102
1103    #[tokio::test]
1104    async fn test_prune_on_partition_value_and_data_value() {
1105        let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
1106
1107        // Note: number 3 is missing!
1108        let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(4)])).unwrap();
1109        let data_size =
1110            write_parquet(Arc::clone(&store), "part=1/file.parquet", batch.clone()).await;
1111
1112        let file_schema = batch.schema();
1113        let mut file = PartitionedFile::new(
1114            "part=1/file.parquet".to_string(),
1115            u64::try_from(data_size).unwrap(),
1116        );
1117        file.partition_values = vec![ScalarValue::Int32(Some(1))];
1118
1119        let table_schema = Arc::new(Schema::new(vec![
1120            Field::new("part", DataType::Int32, false),
1121            Field::new("a", DataType::Int32, false),
1122        ]));
1123
1124        let make_opener = |predicate| {
1125            ParquetOpener {
1126                partition_index: 0,
1127                projection: Arc::new([0]),
1128                batch_size: 1024,
1129                limit: None,
1130                predicate: Some(predicate),
1131                logical_file_schema: file_schema.clone(),
1132                metadata_size_hint: None,
1133                metrics: ExecutionPlanMetricsSet::new(),
1134                parquet_file_reader_factory: Arc::new(
1135                    DefaultParquetFileReaderFactory::new(Arc::clone(&store)),
1136                ),
1137                partition_fields: vec![Arc::new(Field::new(
1138                    "part",
1139                    DataType::Int32,
1140                    false,
1141                ))],
1142                pushdown_filters: true, // note that this is true!
1143                reorder_filters: true,
1144                enable_page_index: false,
1145                enable_bloom_filter: false,
1146                schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
1147                enable_row_group_stats_pruning: false, // note that this is false!
1148                coerce_int96: None,
1149                #[cfg(feature = "parquet_encryption")]
1150                file_decryption_properties: None,
1151                expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
1152                #[cfg(feature = "parquet_encryption")]
1153                encryption_factory: None,
1154                max_predicate_cache_size: None,
1155            }
1156        };
1157
1158        // Filter should match the partition value and data value
1159        let expr = col("part").eq(lit(1)).or(col("a").eq(lit(1)));
1160        let predicate = logical2physical(&expr, &table_schema);
1161        let opener = make_opener(predicate);
1162        let stream = opener.open(file.clone()).unwrap().await.unwrap();
1163        let (num_batches, num_rows) = count_batches_and_rows(stream).await;
1164        assert_eq!(num_batches, 1);
1165        assert_eq!(num_rows, 3);
1166
1167        // Filter should match the partition value but not the data value
1168        let expr = col("part").eq(lit(1)).or(col("a").eq(lit(3)));
1169        let predicate = logical2physical(&expr, &table_schema);
1170        let opener = make_opener(predicate);
1171        let stream = opener.open(file.clone()).unwrap().await.unwrap();
1172        let (num_batches, num_rows) = count_batches_and_rows(stream).await;
1173        assert_eq!(num_batches, 1);
1174        assert_eq!(num_rows, 3);
1175
1176        // Filter should not match the partition value but match the data value
1177        let expr = col("part").eq(lit(2)).or(col("a").eq(lit(1)));
1178        let predicate = logical2physical(&expr, &table_schema);
1179        let opener = make_opener(predicate);
1180        let stream = opener.open(file.clone()).unwrap().await.unwrap();
1181        let (num_batches, num_rows) = count_batches_and_rows(stream).await;
1182        assert_eq!(num_batches, 1);
1183        assert_eq!(num_rows, 1);
1184
1185        // Filter should not match the partition value or the data value
1186        let expr = col("part").eq(lit(2)).or(col("a").eq(lit(3)));
1187        let predicate = logical2physical(&expr, &table_schema);
1188        let opener = make_opener(predicate);
1189        let stream = opener.open(file).unwrap().await.unwrap();
1190        let (num_batches, num_rows) = count_batches_and_rows(stream).await;
1191        assert_eq!(num_batches, 0);
1192        assert_eq!(num_rows, 0);
1193    }
1194
1195    /// Test that if the filter is not a dynamic filter and we have no stats we don't do extra pruning work at the file level.
1196    #[tokio::test]
1197    async fn test_opener_pruning_skipped_on_static_filters() {
1198        let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
1199
1200        let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap();
1201        let data_size =
1202            write_parquet(Arc::clone(&store), "part=1/file.parquet", batch.clone()).await;
1203
1204        let file_schema = batch.schema();
1205        let mut file = PartitionedFile::new(
1206            "part=1/file.parquet".to_string(),
1207            u64::try_from(data_size).unwrap(),
1208        );
1209        file.partition_values = vec![ScalarValue::Int32(Some(1))];
1210
1211        let table_schema = Arc::new(Schema::new(vec![
1212            Field::new("part", DataType::Int32, false),
1213            Field::new("a", DataType::Int32, false),
1214        ]));
1215
1216        let make_opener = |predicate| {
1217            ParquetOpener {
1218                partition_index: 0,
1219                projection: Arc::new([0]),
1220                batch_size: 1024,
1221                limit: None,
1222                predicate: Some(predicate),
1223                logical_file_schema: file_schema.clone(),
1224                metadata_size_hint: None,
1225                metrics: ExecutionPlanMetricsSet::new(),
1226                parquet_file_reader_factory: Arc::new(
1227                    DefaultParquetFileReaderFactory::new(Arc::clone(&store)),
1228                ),
1229                partition_fields: vec![Arc::new(Field::new(
1230                    "part",
1231                    DataType::Int32,
1232                    false,
1233                ))],
1234                pushdown_filters: false, // note that this is false!
1235                reorder_filters: false,
1236                enable_page_index: false,
1237                enable_bloom_filter: false,
1238                schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
1239                enable_row_group_stats_pruning: true,
1240                coerce_int96: None,
1241                #[cfg(feature = "parquet_encryption")]
1242                file_decryption_properties: None,
1243                expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
1244                #[cfg(feature = "parquet_encryption")]
1245                encryption_factory: None,
1246                max_predicate_cache_size: None,
1247            }
1248        };
1249
1250        // Filter should NOT match the stats but the file is never attempted to be pruned because the filters are not dynamic
1251        let expr = col("part").eq(lit(2));
1252        let predicate = logical2physical(&expr, &table_schema);
1253        let opener = make_opener(predicate);
1254        let stream = opener.open(file.clone()).unwrap().await.unwrap();
1255        let (num_batches, num_rows) = count_batches_and_rows(stream).await;
1256        assert_eq!(num_batches, 1);
1257        assert_eq!(num_rows, 3);
1258
1259        // If we make the filter dynamic, it should prune
1260        let predicate = make_dynamic_expr(logical2physical(&expr, &table_schema));
1261        let opener = make_opener(predicate);
1262        let stream = opener.open(file.clone()).unwrap().await.unwrap();
1263        let (num_batches, num_rows) = count_batches_and_rows(stream).await;
1264        assert_eq!(num_batches, 0);
1265        assert_eq!(num_rows, 0);
1266    }
1267
1268    fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize {
1269        match metrics.sum_by_name(metric_name) {
1270            Some(v) => v.as_usize(),
1271            _ => {
1272                panic!(
1273                    "Expected metric not found. Looking for '{metric_name}' in\n\n{metrics:#?}"
1274                );
1275            }
1276        }
1277    }
1278
1279    #[tokio::test]
1280    async fn test_custom_schema_adapter_no_rewriter() {
1281        // Make a hardcoded schema adapter that adds a new column "b" with default value 0.0
1282        // and converts the first column "a" from Int32 to UInt64.
1283        #[derive(Debug, Clone)]
1284        struct CustomSchemaMapper;
1285
1286        impl SchemaMapper for CustomSchemaMapper {
1287            fn map_batch(
1288                &self,
1289                batch: arrow::array::RecordBatch,
1290            ) -> datafusion_common::Result<arrow::array::RecordBatch> {
1291                let a_column = cast(batch.column(0), &DataType::UInt64)?;
1292                // Add in a new column "b" with default value 0.0
1293                let b_column =
1294                    arrow::array::Float64Array::from(vec![Some(0.0); batch.num_rows()]);
1295                let columns = vec![a_column, Arc::new(b_column)];
1296                let new_schema = Arc::new(Schema::new(vec![
1297                    Field::new("a", DataType::UInt64, false),
1298                    Field::new("b", DataType::Float64, false),
1299                ]));
1300                Ok(arrow::record_batch::RecordBatch::try_new(
1301                    new_schema, columns,
1302                )?)
1303            }
1304
1305            fn map_column_statistics(
1306                &self,
1307                file_col_statistics: &[ColumnStatistics],
1308            ) -> datafusion_common::Result<Vec<ColumnStatistics>> {
1309                Ok(vec![
1310                    file_col_statistics[0].clone(),
1311                    ColumnStatistics::new_unknown(),
1312                ])
1313            }
1314        }
1315
1316        #[derive(Debug, Clone)]
1317        struct CustomSchemaAdapter;
1318
1319        impl SchemaAdapter for CustomSchemaAdapter {
1320            fn map_schema(
1321                &self,
1322                _file_schema: &Schema,
1323            ) -> datafusion_common::Result<(Arc<dyn SchemaMapper>, Vec<usize>)>
1324            {
1325                let mapper = Arc::new(CustomSchemaMapper);
1326                let projection = vec![0]; // We only need to read the first column "a" from the file
1327                Ok((mapper, projection))
1328            }
1329
1330            fn map_column_index(
1331                &self,
1332                index: usize,
1333                file_schema: &Schema,
1334            ) -> Option<usize> {
1335                if index < file_schema.fields().len() {
1336                    Some(index)
1337                } else {
1338                    None // The new column "b" is not in the original schema
1339                }
1340            }
1341        }
1342
1343        #[derive(Debug, Clone)]
1344        struct CustomSchemaAdapterFactory;
1345
1346        impl SchemaAdapterFactory for CustomSchemaAdapterFactory {
1347            fn create(
1348                &self,
1349                _projected_table_schema: SchemaRef,
1350                _table_schema: SchemaRef,
1351            ) -> Box<dyn SchemaAdapter> {
1352                Box::new(CustomSchemaAdapter)
1353            }
1354        }
1355
1356        // Test that if no expression rewriter is provided we use a schemaadapter to adapt the data to the expression
1357        let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
1358        let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap();
1359        // Write out the batch to a Parquet file
1360        let data_size =
1361            write_parquet(Arc::clone(&store), "test.parquet", batch.clone()).await;
1362        let file = PartitionedFile::new(
1363            "test.parquet".to_string(),
1364            u64::try_from(data_size).unwrap(),
1365        );
1366        let table_schema = Arc::new(Schema::new(vec![
1367            Field::new("a", DataType::UInt64, false),
1368            Field::new("b", DataType::Float64, false),
1369        ]));
1370
1371        let make_opener = |predicate| ParquetOpener {
1372            partition_index: 0,
1373            projection: Arc::new([0, 1]),
1374            batch_size: 1024,
1375            limit: None,
1376            predicate: Some(predicate),
1377            logical_file_schema: Arc::clone(&table_schema),
1378            metadata_size_hint: None,
1379            metrics: ExecutionPlanMetricsSet::new(),
1380            parquet_file_reader_factory: Arc::new(DefaultParquetFileReaderFactory::new(
1381                Arc::clone(&store),
1382            )),
1383            partition_fields: vec![],
1384            pushdown_filters: true,
1385            reorder_filters: false,
1386            enable_page_index: false,
1387            enable_bloom_filter: false,
1388            schema_adapter_factory: Arc::new(CustomSchemaAdapterFactory),
1389            enable_row_group_stats_pruning: false,
1390            coerce_int96: None,
1391            #[cfg(feature = "parquet_encryption")]
1392            file_decryption_properties: None,
1393            expr_adapter_factory: None,
1394            #[cfg(feature = "parquet_encryption")]
1395            encryption_factory: None,
1396            max_predicate_cache_size: None,
1397        };
1398
1399        let predicate = logical2physical(&col("a").eq(lit(1u64)), &table_schema);
1400        let opener = make_opener(predicate);
1401        let stream = opener.open(file.clone()).unwrap().await.unwrap();
1402        let batches = collect_batches(stream).await;
1403
1404        #[rustfmt::skip]
1405        let expected = [
1406            "+---+-----+",
1407            "| a | b   |",
1408            "+---+-----+",
1409            "| 1 | 0.0 |",
1410            "+---+-----+",
1411        ];
1412        assert_batches_eq!(expected, &batches);
1413        let metrics = opener.metrics.clone_inner();
1414        assert_eq!(get_value(&metrics, "row_groups_pruned_statistics"), 0);
1415        assert_eq!(get_value(&metrics, "pushdown_rows_pruned"), 2);
1416    }
1417}