datafusion_datasource_parquet/
source.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! ParquetSource implementation for reading parquet files
19use std::any::Any;
20use std::fmt::Debug;
21use std::fmt::Formatter;
22use std::sync::Arc;
23
24use crate::opener::build_pruning_predicates;
25use crate::opener::ParquetOpener;
26use crate::row_filter::can_expr_be_pushed_down_with_schemas;
27use crate::DefaultParquetFileReaderFactory;
28use crate::ParquetFileReaderFactory;
29use datafusion_common::config::ConfigOptions;
30#[cfg(feature = "parquet_encryption")]
31use datafusion_common::config::EncryptionFactoryOptions;
32use datafusion_datasource::as_file_source;
33use datafusion_datasource::file_stream::FileOpener;
34use datafusion_datasource::schema_adapter::{
35    DefaultSchemaAdapterFactory, SchemaAdapterFactory,
36};
37
38use arrow::datatypes::TimeUnit;
39use datafusion_common::config::TableParquetOptions;
40use datafusion_common::{DataFusionError, Statistics};
41use datafusion_datasource::file::FileSource;
42use datafusion_datasource::file_scan_config::FileScanConfig;
43use datafusion_datasource::TableSchema;
44use datafusion_physical_expr::conjunction;
45use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
46use datafusion_physical_expr_common::physical_expr::fmt_sql;
47use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
48use datafusion_physical_plan::filter_pushdown::PushedDown;
49use datafusion_physical_plan::filter_pushdown::{
50    FilterPushdownPropagation, PushedDownPredicate,
51};
52use datafusion_physical_plan::metrics::Count;
53use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
54use datafusion_physical_plan::DisplayFormatType;
55
56#[cfg(feature = "parquet_encryption")]
57use datafusion_execution::parquet_encryption::EncryptionFactory;
58use itertools::Itertools;
59use object_store::ObjectStore;
60#[cfg(feature = "parquet_encryption")]
61use parquet::encryption::decrypt::FileDecryptionProperties;
62
63/// Execution plan for reading one or more Parquet files.
64///
65/// ```text
66///             ▲
67///             │
68///             │  Produce a stream of
69///             │  RecordBatches
70///             │
71/// ┌───────────────────────┐
72/// │                       │
73/// │     DataSourceExec    │
74/// │                       │
75/// └───────────────────────┘
76///             ▲
77///             │  Asynchronously read from one
78///             │  or more parquet files via
79///             │  ObjectStore interface
80///             │
81///             │
82///   .───────────────────.
83///  │                     )
84///  │`───────────────────'│
85///  │    ObjectStore      │
86///  │.───────────────────.│
87///  │                     )
88///   `───────────────────'
89/// ```
90///
91/// # Example: Create a `DataSourceExec`
92/// ```
93/// # use std::sync::Arc;
94/// # use arrow::datatypes::Schema;
95/// # use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
96/// # use datafusion_datasource_parquet::source::ParquetSource;
97/// # use datafusion_datasource::PartitionedFile;
98/// # use datafusion_execution::object_store::ObjectStoreUrl;
99/// # use datafusion_physical_expr::expressions::lit;
100/// # use datafusion_datasource::source::DataSourceExec;
101/// # use datafusion_common::config::TableParquetOptions;
102///
103/// # let file_schema = Arc::new(Schema::empty());
104/// # let object_store_url = ObjectStoreUrl::local_filesystem();
105/// # let predicate = lit(true);
106/// let source = Arc::new(
107///     ParquetSource::default()
108///     .with_predicate(predicate)
109/// );
110/// // Create a DataSourceExec for reading `file1.parquet` with a file size of 100MB
111/// let config = FileScanConfigBuilder::new(object_store_url, file_schema, source)
112///    .with_file(PartitionedFile::new("file1.parquet", 100*1024*1024)).build();
113/// let exec = DataSourceExec::from_data_source(config);
114/// ```
115///
116/// # Features
117///
118/// Supports the following optimizations:
119///
120/// * Concurrent reads: reads from one or more files in parallel as multiple
121///   partitions, including concurrently reading multiple row groups from a single
122///   file.
123///
124/// * Predicate push down: skips row groups, pages, rows based on metadata
125///   and late materialization. See "Predicate Pushdown" below.
126///
127/// * Projection pushdown: reads and decodes only the columns required.
128///
129/// * Limit pushdown: stop execution early after some number of rows are read.
130///
131/// * Custom readers: customize reading  parquet files, e.g. to cache metadata,
132///   coalesce I/O operations, etc. See [`ParquetFileReaderFactory`] for more
133///   details.
134///
135/// * Schema evolution: read parquet files with different schemas into a unified
136///   table schema. See [`SchemaAdapterFactory`] for more details.
137///
138/// * metadata_size_hint: controls the number of bytes read from the end of the
139///   file in the initial I/O when the default [`ParquetFileReaderFactory`]. If a
140///   custom reader is used, it supplies the metadata directly and this parameter
141///   is ignored. [`ParquetSource::with_metadata_size_hint`] for more details.
142///
143/// * User provided  `ParquetAccessPlan`s to skip row groups and/or pages
144///   based on external information. See "Implementing External Indexes" below
145///
146/// # Predicate Pushdown
147///
148/// `DataSourceExec` uses the provided [`PhysicalExpr`] predicate as a filter to
149/// skip reading unnecessary data and improve query performance using several techniques:
150///
151/// * Row group pruning: skips entire row groups based on min/max statistics
152///   found in [`ParquetMetaData`] and any Bloom filters that are present.
153///
154/// * Page pruning: skips individual pages within a ColumnChunk using the
155///   [Parquet PageIndex], if present.
156///
157/// * Row filtering: skips rows within a page using a form of late
158///   materialization. When possible, predicates are applied by the parquet
159///   decoder *during* decode (see [`ArrowPredicate`] and [`RowFilter`] for more
160///   details). This is only enabled if `ParquetScanOptions::pushdown_filters` is set to true.
161///
162/// Note: If the predicate can not be used to accelerate the scan, it is ignored
163/// (no error is raised on predicate evaluation errors).
164///
165/// [`ArrowPredicate`]: parquet::arrow::arrow_reader::ArrowPredicate
166/// [`RowFilter`]: parquet::arrow::arrow_reader::RowFilter
167/// [Parquet PageIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
168///
169/// # Example: rewriting `DataSourceExec`
170///
171/// You can modify a `DataSourceExec` using [`ParquetSource`], for example
172/// to change files or add a predicate.
173///
174/// ```no_run
175/// # use std::sync::Arc;
176/// # use arrow::datatypes::Schema;
177/// # use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
178/// # use datafusion_datasource::PartitionedFile;
179/// # use datafusion_datasource::source::DataSourceExec;
180///
181/// # fn parquet_exec() -> DataSourceExec { unimplemented!() }
182/// // Split a single DataSourceExec into multiple DataSourceExecs, one for each file
183/// let exec = parquet_exec();
184/// let data_source = exec.data_source();
185/// let base_config = data_source.as_any().downcast_ref::<FileScanConfig>().unwrap();
186/// let existing_file_groups = &base_config.file_groups;
187/// let new_execs = existing_file_groups
188///   .iter()
189///   .map(|file_group| {
190///     // create a new exec by copying the existing exec's source config
191///     let new_config = FileScanConfigBuilder::from(base_config.clone())
192///        .with_file_groups(vec![file_group.clone()])
193///       .build();
194///
195///     (DataSourceExec::from_data_source(new_config))
196///   })
197///   .collect::<Vec<_>>();
198/// ```
199///
200/// # Implementing External Indexes
201///
202/// It is possible to restrict the row groups and selections within those row
203/// groups that the DataSourceExec will consider by providing an initial
204/// `ParquetAccessPlan` as `extensions` on `PartitionedFile`. This can be
205/// used to implement external indexes on top of parquet files and select only
206/// portions of the files.
207///
208/// The `DataSourceExec` will try and reduce any provided `ParquetAccessPlan`
209/// further based on the contents of `ParquetMetadata` and other settings.
210///
211/// ## Example of providing a ParquetAccessPlan
212///
213/// ```
214/// # use std::sync::Arc;
215/// # use arrow::datatypes::{Schema, SchemaRef};
216/// # use datafusion_datasource::PartitionedFile;
217/// # use datafusion_datasource_parquet::ParquetAccessPlan;
218/// # use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
219/// # use datafusion_datasource_parquet::source::ParquetSource;
220/// # use datafusion_execution::object_store::ObjectStoreUrl;
221/// # use datafusion_datasource::source::DataSourceExec;
222///
223/// # fn schema() -> SchemaRef {
224/// #   Arc::new(Schema::empty())
225/// # }
226/// // create an access plan to scan row group 0, 1 and 3 and skip row groups 2 and 4
227/// let mut access_plan = ParquetAccessPlan::new_all(5);
228/// access_plan.skip(2);
229/// access_plan.skip(4);
230/// // provide the plan as extension to the FileScanConfig
231/// let partitioned_file = PartitionedFile::new("my_file.parquet", 1234)
232///   .with_extensions(Arc::new(access_plan));
233/// // create a FileScanConfig to scan this file
234/// let config = FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), schema(), Arc::new(ParquetSource::default()))
235///     .with_file(partitioned_file).build();
236/// // this parquet DataSourceExec will not even try to read row groups 2 and 4. Additional
237/// // pruning based on predicates may also happen
238/// let exec = DataSourceExec::from_data_source(config);
239/// ```
240///
241/// For a complete example, see the [`advanced_parquet_index` example]).
242///
243/// [`parquet_index_advanced` example]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/advanced_parquet_index.rs
244///
245/// # Execution Overview
246///
247/// * Step 1: `DataSourceExec::execute` is called, returning a `FileStream`
248///   configured to open parquet files with a `ParquetOpener`.
249///
250/// * Step 2: When the stream is polled, the `ParquetOpener` is called to open
251///   the file.
252///
253/// * Step 3: The `ParquetOpener` gets the [`ParquetMetaData`] (file metadata)
254///   via [`ParquetFileReaderFactory`], creating a `ParquetAccessPlan` by
255///   applying predicates to metadata. The plan and projections are used to
256///   determine what pages must be read.
257///
258/// * Step 4: The stream begins reading data, fetching the required parquet
259///   pages incrementally decoding them, and applying any row filters (see
260///   [`Self::with_pushdown_filters`]).
261///
262/// * Step 5: As each [`RecordBatch`] is read, it may be adapted by a
263///   [`SchemaAdapter`] to match the table schema. By default missing columns are
264///   filled with nulls, but this can be customized via [`SchemaAdapterFactory`].
265///
266/// [`RecordBatch`]: arrow::record_batch::RecordBatch
267/// [`SchemaAdapter`]: datafusion_datasource::schema_adapter::SchemaAdapter
268/// [`ParquetMetadata`]: parquet::file::metadata::ParquetMetaData
269#[derive(Clone, Default, Debug)]
270pub struct ParquetSource {
271    /// Options for reading Parquet files
272    pub(crate) table_parquet_options: TableParquetOptions,
273    /// Optional metrics
274    pub(crate) metrics: ExecutionPlanMetricsSet,
275    /// The schema of the file.
276    /// In particular, this is the schema of the table without partition columns,
277    /// *not* the physical schema of the file.
278    pub(crate) table_schema: Option<TableSchema>,
279    /// Optional predicate for row filtering during parquet scan
280    pub(crate) predicate: Option<Arc<dyn PhysicalExpr>>,
281    /// Optional user defined parquet file reader factory
282    pub(crate) parquet_file_reader_factory: Option<Arc<dyn ParquetFileReaderFactory>>,
283    /// Optional user defined schema adapter
284    pub(crate) schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
285    /// Batch size configuration
286    pub(crate) batch_size: Option<usize>,
287    /// Optional hint for the size of the parquet metadata
288    pub(crate) metadata_size_hint: Option<usize>,
289    pub(crate) projected_statistics: Option<Statistics>,
290    #[cfg(feature = "parquet_encryption")]
291    pub(crate) encryption_factory: Option<Arc<dyn EncryptionFactory>>,
292}
293
294impl ParquetSource {
295    /// Create a new ParquetSource to read the data specified in the file scan
296    /// configuration with the provided `TableParquetOptions`.
297    /// if default values are going to be used, use `ParguetConfig::default()` instead
298    pub fn new(table_parquet_options: TableParquetOptions) -> Self {
299        Self {
300            table_parquet_options,
301            ..Self::default()
302        }
303    }
304
305    /// Set the metadata size hint
306    ///
307    /// This value determines how many bytes at the end of the file the default
308    /// [`ParquetFileReaderFactory`] will request in the initial IO. If this is
309    /// too small, the ParquetSource will need to make additional IO requests to
310    /// read the footer.
311    pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
312        self.metadata_size_hint = Some(metadata_size_hint);
313        self
314    }
315
316    /// Set predicate information
317    pub fn with_predicate(&self, predicate: Arc<dyn PhysicalExpr>) -> Self {
318        let mut conf = self.clone();
319        conf.predicate = Some(Arc::clone(&predicate));
320        conf
321    }
322
323    /// Set the encryption factory to use to generate file decryption properties
324    #[cfg(feature = "parquet_encryption")]
325    pub fn with_encryption_factory(
326        mut self,
327        encryption_factory: Arc<dyn EncryptionFactory>,
328    ) -> Self {
329        self.encryption_factory = Some(encryption_factory);
330        self
331    }
332
333    /// Options passed to the parquet reader for this scan
334    pub fn table_parquet_options(&self) -> &TableParquetOptions {
335        &self.table_parquet_options
336    }
337
338    /// Optional predicate.
339    #[deprecated(since = "50.2.0", note = "use `filter` instead")]
340    pub fn predicate(&self) -> Option<&Arc<dyn PhysicalExpr>> {
341        self.predicate.as_ref()
342    }
343
344    /// return the optional file reader factory
345    pub fn parquet_file_reader_factory(
346        &self,
347    ) -> Option<&Arc<dyn ParquetFileReaderFactory>> {
348        self.parquet_file_reader_factory.as_ref()
349    }
350
351    /// Optional user defined parquet file reader factory.
352    pub fn with_parquet_file_reader_factory(
353        mut self,
354        parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
355    ) -> Self {
356        self.parquet_file_reader_factory = Some(parquet_file_reader_factory);
357        self
358    }
359
360    /// If true, the predicate will be used during the parquet scan.
361    /// Defaults to false.
362    pub fn with_pushdown_filters(mut self, pushdown_filters: bool) -> Self {
363        self.table_parquet_options.global.pushdown_filters = pushdown_filters;
364        self
365    }
366
367    /// Return the value described in [`Self::with_pushdown_filters`]
368    pub(crate) fn pushdown_filters(&self) -> bool {
369        self.table_parquet_options.global.pushdown_filters
370    }
371
372    /// If true, the `RowFilter` made by `pushdown_filters` may try to
373    /// minimize the cost of filter evaluation by reordering the
374    /// predicate [`Expr`]s. If false, the predicates are applied in
375    /// the same order as specified in the query. Defaults to false.
376    ///
377    /// [`Expr`]: datafusion_expr::Expr
378    pub fn with_reorder_filters(mut self, reorder_filters: bool) -> Self {
379        self.table_parquet_options.global.reorder_filters = reorder_filters;
380        self
381    }
382
383    /// Return the value described in [`Self::with_reorder_filters`]
384    fn reorder_filters(&self) -> bool {
385        self.table_parquet_options.global.reorder_filters
386    }
387
388    /// If enabled, the reader will read the page index
389    /// This is used to optimize filter pushdown
390    /// via `RowSelector` and `RowFilter` by
391    /// eliminating unnecessary IO and decoding
392    pub fn with_enable_page_index(mut self, enable_page_index: bool) -> Self {
393        self.table_parquet_options.global.enable_page_index = enable_page_index;
394        self
395    }
396
397    /// Return the value described in [`Self::with_enable_page_index`]
398    fn enable_page_index(&self) -> bool {
399        self.table_parquet_options.global.enable_page_index
400    }
401
402    /// If enabled, the reader will read by the bloom filter
403    pub fn with_bloom_filter_on_read(mut self, bloom_filter_on_read: bool) -> Self {
404        self.table_parquet_options.global.bloom_filter_on_read = bloom_filter_on_read;
405        self
406    }
407
408    /// If enabled, the writer will write by the bloom filter
409    pub fn with_bloom_filter_on_write(
410        mut self,
411        enable_bloom_filter_on_write: bool,
412    ) -> Self {
413        self.table_parquet_options.global.bloom_filter_on_write =
414            enable_bloom_filter_on_write;
415        self
416    }
417
418    /// Return the value described in [`Self::with_bloom_filter_on_read`]
419    fn bloom_filter_on_read(&self) -> bool {
420        self.table_parquet_options.global.bloom_filter_on_read
421    }
422
423    /// Return the maximum predicate cache size, in bytes, used when
424    /// `pushdown_filters`
425    pub fn max_predicate_cache_size(&self) -> Option<usize> {
426        self.table_parquet_options.global.max_predicate_cache_size
427    }
428
429    /// Applies schema adapter factory from the FileScanConfig if present.
430    ///
431    /// # Arguments
432    /// * `conf` - FileScanConfig that may contain a schema adapter factory
433    /// # Returns
434    /// The converted FileSource with schema adapter factory applied if provided
435    pub fn apply_schema_adapter(
436        self,
437        conf: &FileScanConfig,
438    ) -> datafusion_common::Result<Arc<dyn FileSource>> {
439        let file_source: Arc<dyn FileSource> = self.into();
440
441        // If the FileScanConfig.file_source() has a schema adapter factory, apply it
442        if let Some(factory) = conf.file_source().schema_adapter_factory() {
443            file_source.with_schema_adapter_factory(
444                Arc::<dyn SchemaAdapterFactory>::clone(&factory),
445            )
446        } else {
447            Ok(file_source)
448        }
449    }
450
451    #[cfg(feature = "parquet_encryption")]
452    fn get_encryption_factory_with_config(
453        &self,
454    ) -> Option<(Arc<dyn EncryptionFactory>, EncryptionFactoryOptions)> {
455        match &self.encryption_factory {
456            None => None,
457            Some(factory) => Some((
458                Arc::clone(factory),
459                self.table_parquet_options.crypto.factory_options.clone(),
460            )),
461        }
462    }
463}
464
465/// Parses datafusion.common.config.ParquetOptions.coerce_int96 String to a arrow_schema.datatype.TimeUnit
466pub(crate) fn parse_coerce_int96_string(
467    str_setting: &str,
468) -> datafusion_common::Result<TimeUnit> {
469    let str_setting_lower: &str = &str_setting.to_lowercase();
470
471    match str_setting_lower {
472        "ns" => Ok(TimeUnit::Nanosecond),
473        "us" => Ok(TimeUnit::Microsecond),
474        "ms" => Ok(TimeUnit::Millisecond),
475        "s" => Ok(TimeUnit::Second),
476        _ => Err(DataFusionError::Configuration(format!(
477            "Unknown or unsupported parquet coerce_int96: \
478        {str_setting}. Valid values are: ns, us, ms, and s."
479        ))),
480    }
481}
482
483/// Allows easy conversion from ParquetSource to Arc&lt;dyn FileSource&gt;
484impl From<ParquetSource> for Arc<dyn FileSource> {
485    fn from(source: ParquetSource) -> Self {
486        as_file_source(source)
487    }
488}
489
490impl FileSource for ParquetSource {
491    fn create_file_opener(
492        &self,
493        object_store: Arc<dyn ObjectStore>,
494        base_config: &FileScanConfig,
495        partition: usize,
496    ) -> Arc<dyn FileOpener> {
497        let projection = base_config
498            .file_column_projection_indices()
499            .unwrap_or_else(|| (0..base_config.file_schema().fields().len()).collect());
500
501        let (expr_adapter_factory, schema_adapter_factory) = match (
502            base_config.expr_adapter_factory.as_ref(),
503            self.schema_adapter_factory.as_ref(),
504        ) {
505            (Some(expr_adapter_factory), Some(schema_adapter_factory)) => {
506                // Use both the schema adapter factory and the expr adapter factory.
507                // This results in the SchemaAdapter being used for projections (e.g. a column was selected that is a UInt32 in the file and a UInt64 in the table schema)
508                // but the PhysicalExprAdapterFactory being used for predicate pushdown and stats pruning.
509                (
510                    Some(Arc::clone(expr_adapter_factory)),
511                    Arc::clone(schema_adapter_factory),
512                )
513            }
514            (Some(expr_adapter_factory), None) => {
515                // If no custom schema adapter factory is provided but an expr adapter factory is provided use the expr adapter factory alongside the default schema adapter factory.
516                // This means that the PhysicalExprAdapterFactory will be used for predicate pushdown and stats pruning, while the default schema adapter factory will be used for projections.
517                (
518                    Some(Arc::clone(expr_adapter_factory)),
519                    Arc::new(DefaultSchemaAdapterFactory) as _,
520                )
521            }
522            (None, Some(schema_adapter_factory)) => {
523                // If a custom schema adapter factory is provided but no expr adapter factory is provided use the custom SchemaAdapter for both projections and predicate pushdown.
524                // This maximizes compatibility with existing code that uses the SchemaAdapter API and did not explicitly opt into the PhysicalExprAdapterFactory API.
525                (None, Arc::clone(schema_adapter_factory) as _)
526            }
527            (None, None) => {
528                // If no custom schema adapter factory or expr adapter factory is provided, use the default schema adapter factory and the default physical expr adapter factory.
529                // This means that the default SchemaAdapter will be used for projections (e.g. a column was selected that is a UInt32 in the file and a UInt64 in the table schema)
530                // and the default PhysicalExprAdapterFactory will be used for predicate pushdown and stats pruning.
531                // This is the default behavior with not customization and means that most users of DataFusion will be cut over to the new PhysicalExprAdapterFactory API.
532                (
533                    Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _),
534                    Arc::new(DefaultSchemaAdapterFactory) as _,
535                )
536            }
537        };
538
539        let parquet_file_reader_factory =
540            self.parquet_file_reader_factory.clone().unwrap_or_else(|| {
541                Arc::new(DefaultParquetFileReaderFactory::new(object_store)) as _
542            });
543
544        #[cfg(feature = "parquet_encryption")]
545        let file_decryption_properties = self
546            .table_parquet_options()
547            .crypto
548            .file_decryption
549            .clone()
550            .map(FileDecryptionProperties::from)
551            .map(Arc::new);
552
553        let coerce_int96 = self
554            .table_parquet_options
555            .global
556            .coerce_int96
557            .as_ref()
558            .map(|time_unit| parse_coerce_int96_string(time_unit.as_str()).unwrap());
559
560        Arc::new(ParquetOpener {
561            partition_index: partition,
562            projection: Arc::from(projection),
563            batch_size: self
564                .batch_size
565                .expect("Batch size must set before creating ParquetOpener"),
566            limit: base_config.limit,
567            predicate: self.predicate.clone(),
568            logical_file_schema: Arc::clone(base_config.file_schema()),
569            partition_fields: base_config.table_partition_cols().clone(),
570            metadata_size_hint: self.metadata_size_hint,
571            metrics: self.metrics().clone(),
572            parquet_file_reader_factory,
573            pushdown_filters: self.pushdown_filters(),
574            reorder_filters: self.reorder_filters(),
575            enable_page_index: self.enable_page_index(),
576            enable_bloom_filter: self.bloom_filter_on_read(),
577            enable_row_group_stats_pruning: self.table_parquet_options.global.pruning,
578            schema_adapter_factory,
579            coerce_int96,
580            #[cfg(feature = "parquet_encryption")]
581            file_decryption_properties,
582            expr_adapter_factory,
583            #[cfg(feature = "parquet_encryption")]
584            encryption_factory: self.get_encryption_factory_with_config(),
585            max_predicate_cache_size: self.max_predicate_cache_size(),
586        })
587    }
588
589    fn as_any(&self) -> &dyn Any {
590        self
591    }
592
593    fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
594        self.predicate.clone()
595    }
596
597    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
598        let mut conf = self.clone();
599        conf.batch_size = Some(batch_size);
600        Arc::new(conf)
601    }
602
603    fn with_schema(&self, schema: TableSchema) -> Arc<dyn FileSource> {
604        Arc::new(Self {
605            table_schema: Some(schema),
606            ..self.clone()
607        })
608    }
609
610    fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
611        let mut conf = self.clone();
612        conf.projected_statistics = Some(statistics);
613        Arc::new(conf)
614    }
615
616    fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
617        Arc::new(Self { ..self.clone() })
618    }
619
620    fn metrics(&self) -> &ExecutionPlanMetricsSet {
621        &self.metrics
622    }
623
624    fn statistics(&self) -> datafusion_common::Result<Statistics> {
625        let statistics = &self.projected_statistics;
626        let statistics = statistics
627            .clone()
628            .expect("projected_statistics must be set");
629        // When filters are pushed down, we have no way of knowing the exact statistics.
630        // Note that pruning predicate is also a kind of filter pushdown.
631        // (bloom filters use `pruning_predicate` too).
632        // Because filter pushdown may happen dynamically as long as there is a predicate
633        // if we have *any* predicate applied, we can't guarantee the statistics are exact.
634        if self.filter().is_some() {
635            Ok(statistics.to_inexact())
636        } else {
637            Ok(statistics)
638        }
639    }
640
641    fn file_type(&self) -> &str {
642        "parquet"
643    }
644
645    fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
646        match t {
647            DisplayFormatType::Default | DisplayFormatType::Verbose => {
648                let predicate_string = self
649                    .filter()
650                    .map(|p| format!(", predicate={p}"))
651                    .unwrap_or_default();
652
653                write!(f, "{predicate_string}")?;
654
655                // Try to build a the pruning predicates.
656                // These are only generated here because it's useful to have *some*
657                // idea of what pushdown is happening when viewing plans.
658                // However it is important to note that these predicates are *not*
659                // necessarily the predicates that are actually evaluated:
660                // the actual predicates are built in reference to the physical schema of
661                // each file, which we do not have at this point and hence cannot use.
662                // Instead we use the logical schema of the file (the table schema without partition columns).
663                if let (Some(file_schema), Some(predicate)) = (
664                    &self.table_schema.as_ref().map(|ts| ts.file_schema()),
665                    &self.predicate,
666                ) {
667                    let predicate_creation_errors = Count::new();
668                    if let (Some(pruning_predicate), _) = build_pruning_predicates(
669                        Some(predicate),
670                        file_schema,
671                        &predicate_creation_errors,
672                    ) {
673                        let mut guarantees = pruning_predicate
674                            .literal_guarantees()
675                            .iter()
676                            .map(|item| format!("{item}"))
677                            .collect_vec();
678                        guarantees.sort();
679                        write!(
680                            f,
681                            ", pruning_predicate={}, required_guarantees=[{}]",
682                            pruning_predicate.predicate_expr(),
683                            guarantees.join(", ")
684                        )?;
685                    }
686                };
687                Ok(())
688            }
689            DisplayFormatType::TreeRender => {
690                if let Some(predicate) = self.filter() {
691                    writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?;
692                }
693                Ok(())
694            }
695        }
696    }
697
698    fn try_pushdown_filters(
699        &self,
700        filters: Vec<Arc<dyn PhysicalExpr>>,
701        config: &ConfigOptions,
702    ) -> datafusion_common::Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
703        let Some(table_schema) = self
704            .table_schema
705            .as_ref()
706            .map(|ts| ts.table_schema())
707            .cloned()
708        else {
709            return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
710                vec![PushedDown::No; filters.len()],
711            ));
712        };
713        // Determine if based on configs we should push filters down.
714        // If either the table / scan itself or the config has pushdown enabled,
715        // we will push down the filters.
716        // If both are disabled, we will not push down the filters.
717        // By default they are both disabled.
718        // Regardless of pushdown, we will update the predicate to include the filters
719        // because even if scan pushdown is disabled we can still use the filters for stats pruning.
720        let config_pushdown_enabled = config.execution.parquet.pushdown_filters;
721        let table_pushdown_enabled = self.pushdown_filters();
722        let pushdown_filters = table_pushdown_enabled || config_pushdown_enabled;
723
724        let mut source = self.clone();
725        let filters: Vec<PushedDownPredicate> = filters
726            .into_iter()
727            .map(|filter| {
728                if can_expr_be_pushed_down_with_schemas(&filter, &table_schema) {
729                    PushedDownPredicate::supported(filter)
730                } else {
731                    PushedDownPredicate::unsupported(filter)
732                }
733            })
734            .collect();
735        if filters
736            .iter()
737            .all(|f| matches!(f.discriminant, PushedDown::No))
738        {
739            // No filters can be pushed down, so we can just return the remaining filters
740            // and avoid replacing the source in the physical plan.
741            return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
742                vec![PushedDown::No; filters.len()],
743            ));
744        }
745        let allowed_filters = filters
746            .iter()
747            .filter_map(|f| match f.discriminant {
748                PushedDown::Yes => Some(Arc::clone(&f.predicate)),
749                PushedDown::No => None,
750            })
751            .collect_vec();
752        let predicate = match source.predicate {
753            Some(predicate) => {
754                conjunction(std::iter::once(predicate).chain(allowed_filters))
755            }
756            None => conjunction(allowed_filters),
757        };
758        source.predicate = Some(predicate);
759        source = source.with_pushdown_filters(pushdown_filters);
760        let source = Arc::new(source);
761        // If pushdown_filters is false we tell our parents that they still have to handle the filters,
762        // even if we updated the predicate to include the filters (they will only be used for stats pruning).
763        if !pushdown_filters {
764            return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
765                vec![PushedDown::No; filters.len()],
766            )
767            .with_updated_node(source));
768        }
769        Ok(FilterPushdownPropagation::with_parent_pushdown_result(
770            filters.iter().map(|f| f.discriminant).collect(),
771        )
772        .with_updated_node(source))
773    }
774
775    fn with_schema_adapter_factory(
776        &self,
777        schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
778    ) -> datafusion_common::Result<Arc<dyn FileSource>> {
779        Ok(Arc::new(Self {
780            schema_adapter_factory: Some(schema_adapter_factory),
781            ..self.clone()
782        }))
783    }
784
785    fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
786        self.schema_adapter_factory.clone()
787    }
788}
789
790#[cfg(test)]
791mod tests {
792    use super::*;
793    use datafusion_physical_expr::expressions::lit;
794
795    #[test]
796    #[allow(deprecated)]
797    fn test_parquet_source_predicate_same_as_filter() {
798        let predicate = lit(true);
799
800        let parquet_source = ParquetSource::default().with_predicate(predicate);
801        // same value. but filter() call Arc::clone internally
802        assert_eq!(parquet_source.predicate(), parquet_source.filter().as_ref());
803    }
804}