datafusion_datasource_parquet/source.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! ParquetSource implementation for reading parquet files
19use std::any::Any;
20use std::fmt::Debug;
21use std::fmt::Formatter;
22use std::sync::Arc;
23
24use crate::opener::build_pruning_predicates;
25use crate::opener::ParquetOpener;
26use crate::row_filter::can_expr_be_pushed_down_with_schemas;
27use crate::DefaultParquetFileReaderFactory;
28use crate::ParquetFileReaderFactory;
29use datafusion_common::config::ConfigOptions;
30#[cfg(feature = "parquet_encryption")]
31use datafusion_common::config::EncryptionFactoryOptions;
32use datafusion_datasource::as_file_source;
33use datafusion_datasource::file_stream::FileOpener;
34use datafusion_datasource::schema_adapter::{
35 DefaultSchemaAdapterFactory, SchemaAdapterFactory,
36};
37
38use arrow::datatypes::TimeUnit;
39use datafusion_common::config::TableParquetOptions;
40use datafusion_common::{DataFusionError, Statistics};
41use datafusion_datasource::file::FileSource;
42use datafusion_datasource::file_scan_config::FileScanConfig;
43use datafusion_datasource::TableSchema;
44use datafusion_physical_expr::conjunction;
45use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
46use datafusion_physical_expr_common::physical_expr::fmt_sql;
47use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
48use datafusion_physical_plan::filter_pushdown::PushedDown;
49use datafusion_physical_plan::filter_pushdown::{
50 FilterPushdownPropagation, PushedDownPredicate,
51};
52use datafusion_physical_plan::metrics::Count;
53use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
54use datafusion_physical_plan::DisplayFormatType;
55
56#[cfg(feature = "parquet_encryption")]
57use datafusion_execution::parquet_encryption::EncryptionFactory;
58use itertools::Itertools;
59use object_store::ObjectStore;
60#[cfg(feature = "parquet_encryption")]
61use parquet::encryption::decrypt::FileDecryptionProperties;
62
63/// Execution plan for reading one or more Parquet files.
64///
65/// ```text
66/// ▲
67/// │
68/// │ Produce a stream of
69/// │ RecordBatches
70/// │
71/// ┌───────────────────────┐
72/// │ │
73/// │ DataSourceExec │
74/// │ │
75/// └───────────────────────┘
76/// ▲
77/// │ Asynchronously read from one
78/// │ or more parquet files via
79/// │ ObjectStore interface
80/// │
81/// │
82/// .───────────────────.
83/// │ )
84/// │`───────────────────'│
85/// │ ObjectStore │
86/// │.───────────────────.│
87/// │ )
88/// `───────────────────'
89/// ```
90///
91/// # Example: Create a `DataSourceExec`
92/// ```
93/// # use std::sync::Arc;
94/// # use arrow::datatypes::Schema;
95/// # use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
96/// # use datafusion_datasource_parquet::source::ParquetSource;
97/// # use datafusion_datasource::PartitionedFile;
98/// # use datafusion_execution::object_store::ObjectStoreUrl;
99/// # use datafusion_physical_expr::expressions::lit;
100/// # use datafusion_datasource::source::DataSourceExec;
101/// # use datafusion_common::config::TableParquetOptions;
102///
103/// # let file_schema = Arc::new(Schema::empty());
104/// # let object_store_url = ObjectStoreUrl::local_filesystem();
105/// # let predicate = lit(true);
106/// let source = Arc::new(
107/// ParquetSource::default()
108/// .with_predicate(predicate)
109/// );
110/// // Create a DataSourceExec for reading `file1.parquet` with a file size of 100MB
111/// let config = FileScanConfigBuilder::new(object_store_url, file_schema, source)
112/// .with_file(PartitionedFile::new("file1.parquet", 100*1024*1024)).build();
113/// let exec = DataSourceExec::from_data_source(config);
114/// ```
115///
116/// # Features
117///
118/// Supports the following optimizations:
119///
120/// * Concurrent reads: reads from one or more files in parallel as multiple
121/// partitions, including concurrently reading multiple row groups from a single
122/// file.
123///
124/// * Predicate push down: skips row groups, pages, rows based on metadata
125/// and late materialization. See "Predicate Pushdown" below.
126///
127/// * Projection pushdown: reads and decodes only the columns required.
128///
129/// * Limit pushdown: stop execution early after some number of rows are read.
130///
131/// * Custom readers: customize reading parquet files, e.g. to cache metadata,
132/// coalesce I/O operations, etc. See [`ParquetFileReaderFactory`] for more
133/// details.
134///
135/// * Schema evolution: read parquet files with different schemas into a unified
136/// table schema. See [`SchemaAdapterFactory`] for more details.
137///
138/// * metadata_size_hint: controls the number of bytes read from the end of the
139/// file in the initial I/O when the default [`ParquetFileReaderFactory`]. If a
140/// custom reader is used, it supplies the metadata directly and this parameter
141/// is ignored. [`ParquetSource::with_metadata_size_hint`] for more details.
142///
143/// * User provided `ParquetAccessPlan`s to skip row groups and/or pages
144/// based on external information. See "Implementing External Indexes" below
145///
146/// # Predicate Pushdown
147///
148/// `DataSourceExec` uses the provided [`PhysicalExpr`] predicate as a filter to
149/// skip reading unnecessary data and improve query performance using several techniques:
150///
151/// * Row group pruning: skips entire row groups based on min/max statistics
152/// found in [`ParquetMetaData`] and any Bloom filters that are present.
153///
154/// * Page pruning: skips individual pages within a ColumnChunk using the
155/// [Parquet PageIndex], if present.
156///
157/// * Row filtering: skips rows within a page using a form of late
158/// materialization. When possible, predicates are applied by the parquet
159/// decoder *during* decode (see [`ArrowPredicate`] and [`RowFilter`] for more
160/// details). This is only enabled if `ParquetScanOptions::pushdown_filters` is set to true.
161///
162/// Note: If the predicate can not be used to accelerate the scan, it is ignored
163/// (no error is raised on predicate evaluation errors).
164///
165/// [`ArrowPredicate`]: parquet::arrow::arrow_reader::ArrowPredicate
166/// [`RowFilter`]: parquet::arrow::arrow_reader::RowFilter
167/// [Parquet PageIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
168///
169/// # Example: rewriting `DataSourceExec`
170///
171/// You can modify a `DataSourceExec` using [`ParquetSource`], for example
172/// to change files or add a predicate.
173///
174/// ```no_run
175/// # use std::sync::Arc;
176/// # use arrow::datatypes::Schema;
177/// # use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
178/// # use datafusion_datasource::PartitionedFile;
179/// # use datafusion_datasource::source::DataSourceExec;
180///
181/// # fn parquet_exec() -> DataSourceExec { unimplemented!() }
182/// // Split a single DataSourceExec into multiple DataSourceExecs, one for each file
183/// let exec = parquet_exec();
184/// let data_source = exec.data_source();
185/// let base_config = data_source.as_any().downcast_ref::<FileScanConfig>().unwrap();
186/// let existing_file_groups = &base_config.file_groups;
187/// let new_execs = existing_file_groups
188/// .iter()
189/// .map(|file_group| {
190/// // create a new exec by copying the existing exec's source config
191/// let new_config = FileScanConfigBuilder::from(base_config.clone())
192/// .with_file_groups(vec![file_group.clone()])
193/// .build();
194///
195/// (DataSourceExec::from_data_source(new_config))
196/// })
197/// .collect::<Vec<_>>();
198/// ```
199///
200/// # Implementing External Indexes
201///
202/// It is possible to restrict the row groups and selections within those row
203/// groups that the DataSourceExec will consider by providing an initial
204/// `ParquetAccessPlan` as `extensions` on `PartitionedFile`. This can be
205/// used to implement external indexes on top of parquet files and select only
206/// portions of the files.
207///
208/// The `DataSourceExec` will try and reduce any provided `ParquetAccessPlan`
209/// further based on the contents of `ParquetMetadata` and other settings.
210///
211/// ## Example of providing a ParquetAccessPlan
212///
213/// ```
214/// # use std::sync::Arc;
215/// # use arrow::datatypes::{Schema, SchemaRef};
216/// # use datafusion_datasource::PartitionedFile;
217/// # use datafusion_datasource_parquet::ParquetAccessPlan;
218/// # use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
219/// # use datafusion_datasource_parquet::source::ParquetSource;
220/// # use datafusion_execution::object_store::ObjectStoreUrl;
221/// # use datafusion_datasource::source::DataSourceExec;
222///
223/// # fn schema() -> SchemaRef {
224/// # Arc::new(Schema::empty())
225/// # }
226/// // create an access plan to scan row group 0, 1 and 3 and skip row groups 2 and 4
227/// let mut access_plan = ParquetAccessPlan::new_all(5);
228/// access_plan.skip(2);
229/// access_plan.skip(4);
230/// // provide the plan as extension to the FileScanConfig
231/// let partitioned_file = PartitionedFile::new("my_file.parquet", 1234)
232/// .with_extensions(Arc::new(access_plan));
233/// // create a FileScanConfig to scan this file
234/// let config = FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), schema(), Arc::new(ParquetSource::default()))
235/// .with_file(partitioned_file).build();
236/// // this parquet DataSourceExec will not even try to read row groups 2 and 4. Additional
237/// // pruning based on predicates may also happen
238/// let exec = DataSourceExec::from_data_source(config);
239/// ```
240///
241/// For a complete example, see the [`advanced_parquet_index` example]).
242///
243/// [`parquet_index_advanced` example]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/advanced_parquet_index.rs
244///
245/// # Execution Overview
246///
247/// * Step 1: `DataSourceExec::execute` is called, returning a `FileStream`
248/// configured to open parquet files with a `ParquetOpener`.
249///
250/// * Step 2: When the stream is polled, the `ParquetOpener` is called to open
251/// the file.
252///
253/// * Step 3: The `ParquetOpener` gets the [`ParquetMetaData`] (file metadata)
254/// via [`ParquetFileReaderFactory`], creating a `ParquetAccessPlan` by
255/// applying predicates to metadata. The plan and projections are used to
256/// determine what pages must be read.
257///
258/// * Step 4: The stream begins reading data, fetching the required parquet
259/// pages incrementally decoding them, and applying any row filters (see
260/// [`Self::with_pushdown_filters`]).
261///
262/// * Step 5: As each [`RecordBatch`] is read, it may be adapted by a
263/// [`SchemaAdapter`] to match the table schema. By default missing columns are
264/// filled with nulls, but this can be customized via [`SchemaAdapterFactory`].
265///
266/// [`RecordBatch`]: arrow::record_batch::RecordBatch
267/// [`SchemaAdapter`]: datafusion_datasource::schema_adapter::SchemaAdapter
268/// [`ParquetMetadata`]: parquet::file::metadata::ParquetMetaData
269#[derive(Clone, Default, Debug)]
270pub struct ParquetSource {
271 /// Options for reading Parquet files
272 pub(crate) table_parquet_options: TableParquetOptions,
273 /// Optional metrics
274 pub(crate) metrics: ExecutionPlanMetricsSet,
275 /// The schema of the file.
276 /// In particular, this is the schema of the table without partition columns,
277 /// *not* the physical schema of the file.
278 pub(crate) table_schema: Option<TableSchema>,
279 /// Optional predicate for row filtering during parquet scan
280 pub(crate) predicate: Option<Arc<dyn PhysicalExpr>>,
281 /// Optional user defined parquet file reader factory
282 pub(crate) parquet_file_reader_factory: Option<Arc<dyn ParquetFileReaderFactory>>,
283 /// Optional user defined schema adapter
284 pub(crate) schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
285 /// Batch size configuration
286 pub(crate) batch_size: Option<usize>,
287 /// Optional hint for the size of the parquet metadata
288 pub(crate) metadata_size_hint: Option<usize>,
289 pub(crate) projected_statistics: Option<Statistics>,
290 #[cfg(feature = "parquet_encryption")]
291 pub(crate) encryption_factory: Option<Arc<dyn EncryptionFactory>>,
292}
293
294impl ParquetSource {
295 /// Create a new ParquetSource to read the data specified in the file scan
296 /// configuration with the provided `TableParquetOptions`.
297 /// if default values are going to be used, use `ParguetConfig::default()` instead
298 pub fn new(table_parquet_options: TableParquetOptions) -> Self {
299 Self {
300 table_parquet_options,
301 ..Self::default()
302 }
303 }
304
305 /// Set the metadata size hint
306 ///
307 /// This value determines how many bytes at the end of the file the default
308 /// [`ParquetFileReaderFactory`] will request in the initial IO. If this is
309 /// too small, the ParquetSource will need to make additional IO requests to
310 /// read the footer.
311 pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self {
312 self.metadata_size_hint = Some(metadata_size_hint);
313 self
314 }
315
316 /// Set predicate information
317 pub fn with_predicate(&self, predicate: Arc<dyn PhysicalExpr>) -> Self {
318 let mut conf = self.clone();
319 conf.predicate = Some(Arc::clone(&predicate));
320 conf
321 }
322
323 /// Set the encryption factory to use to generate file decryption properties
324 #[cfg(feature = "parquet_encryption")]
325 pub fn with_encryption_factory(
326 mut self,
327 encryption_factory: Arc<dyn EncryptionFactory>,
328 ) -> Self {
329 self.encryption_factory = Some(encryption_factory);
330 self
331 }
332
333 /// Options passed to the parquet reader for this scan
334 pub fn table_parquet_options(&self) -> &TableParquetOptions {
335 &self.table_parquet_options
336 }
337
338 /// Optional predicate.
339 #[deprecated(since = "50.2.0", note = "use `filter` instead")]
340 pub fn predicate(&self) -> Option<&Arc<dyn PhysicalExpr>> {
341 self.predicate.as_ref()
342 }
343
344 /// return the optional file reader factory
345 pub fn parquet_file_reader_factory(
346 &self,
347 ) -> Option<&Arc<dyn ParquetFileReaderFactory>> {
348 self.parquet_file_reader_factory.as_ref()
349 }
350
351 /// Optional user defined parquet file reader factory.
352 pub fn with_parquet_file_reader_factory(
353 mut self,
354 parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
355 ) -> Self {
356 self.parquet_file_reader_factory = Some(parquet_file_reader_factory);
357 self
358 }
359
360 /// If true, the predicate will be used during the parquet scan.
361 /// Defaults to false.
362 pub fn with_pushdown_filters(mut self, pushdown_filters: bool) -> Self {
363 self.table_parquet_options.global.pushdown_filters = pushdown_filters;
364 self
365 }
366
367 /// Return the value described in [`Self::with_pushdown_filters`]
368 pub(crate) fn pushdown_filters(&self) -> bool {
369 self.table_parquet_options.global.pushdown_filters
370 }
371
372 /// If true, the `RowFilter` made by `pushdown_filters` may try to
373 /// minimize the cost of filter evaluation by reordering the
374 /// predicate [`Expr`]s. If false, the predicates are applied in
375 /// the same order as specified in the query. Defaults to false.
376 ///
377 /// [`Expr`]: datafusion_expr::Expr
378 pub fn with_reorder_filters(mut self, reorder_filters: bool) -> Self {
379 self.table_parquet_options.global.reorder_filters = reorder_filters;
380 self
381 }
382
383 /// Return the value described in [`Self::with_reorder_filters`]
384 fn reorder_filters(&self) -> bool {
385 self.table_parquet_options.global.reorder_filters
386 }
387
388 /// If enabled, the reader will read the page index
389 /// This is used to optimize filter pushdown
390 /// via `RowSelector` and `RowFilter` by
391 /// eliminating unnecessary IO and decoding
392 pub fn with_enable_page_index(mut self, enable_page_index: bool) -> Self {
393 self.table_parquet_options.global.enable_page_index = enable_page_index;
394 self
395 }
396
397 /// Return the value described in [`Self::with_enable_page_index`]
398 fn enable_page_index(&self) -> bool {
399 self.table_parquet_options.global.enable_page_index
400 }
401
402 /// If enabled, the reader will read by the bloom filter
403 pub fn with_bloom_filter_on_read(mut self, bloom_filter_on_read: bool) -> Self {
404 self.table_parquet_options.global.bloom_filter_on_read = bloom_filter_on_read;
405 self
406 }
407
408 /// If enabled, the writer will write by the bloom filter
409 pub fn with_bloom_filter_on_write(
410 mut self,
411 enable_bloom_filter_on_write: bool,
412 ) -> Self {
413 self.table_parquet_options.global.bloom_filter_on_write =
414 enable_bloom_filter_on_write;
415 self
416 }
417
418 /// Return the value described in [`Self::with_bloom_filter_on_read`]
419 fn bloom_filter_on_read(&self) -> bool {
420 self.table_parquet_options.global.bloom_filter_on_read
421 }
422
423 /// Return the maximum predicate cache size, in bytes, used when
424 /// `pushdown_filters`
425 pub fn max_predicate_cache_size(&self) -> Option<usize> {
426 self.table_parquet_options.global.max_predicate_cache_size
427 }
428
429 /// Applies schema adapter factory from the FileScanConfig if present.
430 ///
431 /// # Arguments
432 /// * `conf` - FileScanConfig that may contain a schema adapter factory
433 /// # Returns
434 /// The converted FileSource with schema adapter factory applied if provided
435 pub fn apply_schema_adapter(
436 self,
437 conf: &FileScanConfig,
438 ) -> datafusion_common::Result<Arc<dyn FileSource>> {
439 let file_source: Arc<dyn FileSource> = self.into();
440
441 // If the FileScanConfig.file_source() has a schema adapter factory, apply it
442 if let Some(factory) = conf.file_source().schema_adapter_factory() {
443 file_source.with_schema_adapter_factory(
444 Arc::<dyn SchemaAdapterFactory>::clone(&factory),
445 )
446 } else {
447 Ok(file_source)
448 }
449 }
450
451 #[cfg(feature = "parquet_encryption")]
452 fn get_encryption_factory_with_config(
453 &self,
454 ) -> Option<(Arc<dyn EncryptionFactory>, EncryptionFactoryOptions)> {
455 match &self.encryption_factory {
456 None => None,
457 Some(factory) => Some((
458 Arc::clone(factory),
459 self.table_parquet_options.crypto.factory_options.clone(),
460 )),
461 }
462 }
463}
464
465/// Parses datafusion.common.config.ParquetOptions.coerce_int96 String to a arrow_schema.datatype.TimeUnit
466pub(crate) fn parse_coerce_int96_string(
467 str_setting: &str,
468) -> datafusion_common::Result<TimeUnit> {
469 let str_setting_lower: &str = &str_setting.to_lowercase();
470
471 match str_setting_lower {
472 "ns" => Ok(TimeUnit::Nanosecond),
473 "us" => Ok(TimeUnit::Microsecond),
474 "ms" => Ok(TimeUnit::Millisecond),
475 "s" => Ok(TimeUnit::Second),
476 _ => Err(DataFusionError::Configuration(format!(
477 "Unknown or unsupported parquet coerce_int96: \
478 {str_setting}. Valid values are: ns, us, ms, and s."
479 ))),
480 }
481}
482
483/// Allows easy conversion from ParquetSource to Arc<dyn FileSource>
484impl From<ParquetSource> for Arc<dyn FileSource> {
485 fn from(source: ParquetSource) -> Self {
486 as_file_source(source)
487 }
488}
489
490impl FileSource for ParquetSource {
491 fn create_file_opener(
492 &self,
493 object_store: Arc<dyn ObjectStore>,
494 base_config: &FileScanConfig,
495 partition: usize,
496 ) -> Arc<dyn FileOpener> {
497 let projection = base_config
498 .file_column_projection_indices()
499 .unwrap_or_else(|| (0..base_config.file_schema().fields().len()).collect());
500
501 let (expr_adapter_factory, schema_adapter_factory) = match (
502 base_config.expr_adapter_factory.as_ref(),
503 self.schema_adapter_factory.as_ref(),
504 ) {
505 (Some(expr_adapter_factory), Some(schema_adapter_factory)) => {
506 // Use both the schema adapter factory and the expr adapter factory.
507 // This results in the SchemaAdapter being used for projections (e.g. a column was selected that is a UInt32 in the file and a UInt64 in the table schema)
508 // but the PhysicalExprAdapterFactory being used for predicate pushdown and stats pruning.
509 (
510 Some(Arc::clone(expr_adapter_factory)),
511 Arc::clone(schema_adapter_factory),
512 )
513 }
514 (Some(expr_adapter_factory), None) => {
515 // If no custom schema adapter factory is provided but an expr adapter factory is provided use the expr adapter factory alongside the default schema adapter factory.
516 // This means that the PhysicalExprAdapterFactory will be used for predicate pushdown and stats pruning, while the default schema adapter factory will be used for projections.
517 (
518 Some(Arc::clone(expr_adapter_factory)),
519 Arc::new(DefaultSchemaAdapterFactory) as _,
520 )
521 }
522 (None, Some(schema_adapter_factory)) => {
523 // If a custom schema adapter factory is provided but no expr adapter factory is provided use the custom SchemaAdapter for both projections and predicate pushdown.
524 // This maximizes compatibility with existing code that uses the SchemaAdapter API and did not explicitly opt into the PhysicalExprAdapterFactory API.
525 (None, Arc::clone(schema_adapter_factory) as _)
526 }
527 (None, None) => {
528 // If no custom schema adapter factory or expr adapter factory is provided, use the default schema adapter factory and the default physical expr adapter factory.
529 // This means that the default SchemaAdapter will be used for projections (e.g. a column was selected that is a UInt32 in the file and a UInt64 in the table schema)
530 // and the default PhysicalExprAdapterFactory will be used for predicate pushdown and stats pruning.
531 // This is the default behavior with not customization and means that most users of DataFusion will be cut over to the new PhysicalExprAdapterFactory API.
532 (
533 Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _),
534 Arc::new(DefaultSchemaAdapterFactory) as _,
535 )
536 }
537 };
538
539 let parquet_file_reader_factory =
540 self.parquet_file_reader_factory.clone().unwrap_or_else(|| {
541 Arc::new(DefaultParquetFileReaderFactory::new(object_store)) as _
542 });
543
544 #[cfg(feature = "parquet_encryption")]
545 let file_decryption_properties = self
546 .table_parquet_options()
547 .crypto
548 .file_decryption
549 .clone()
550 .map(FileDecryptionProperties::from)
551 .map(Arc::new);
552
553 let coerce_int96 = self
554 .table_parquet_options
555 .global
556 .coerce_int96
557 .as_ref()
558 .map(|time_unit| parse_coerce_int96_string(time_unit.as_str()).unwrap());
559
560 Arc::new(ParquetOpener {
561 partition_index: partition,
562 projection: Arc::from(projection),
563 batch_size: self
564 .batch_size
565 .expect("Batch size must set before creating ParquetOpener"),
566 limit: base_config.limit,
567 predicate: self.predicate.clone(),
568 logical_file_schema: Arc::clone(base_config.file_schema()),
569 partition_fields: base_config.table_partition_cols().clone(),
570 metadata_size_hint: self.metadata_size_hint,
571 metrics: self.metrics().clone(),
572 parquet_file_reader_factory,
573 pushdown_filters: self.pushdown_filters(),
574 reorder_filters: self.reorder_filters(),
575 enable_page_index: self.enable_page_index(),
576 enable_bloom_filter: self.bloom_filter_on_read(),
577 enable_row_group_stats_pruning: self.table_parquet_options.global.pruning,
578 schema_adapter_factory,
579 coerce_int96,
580 #[cfg(feature = "parquet_encryption")]
581 file_decryption_properties,
582 expr_adapter_factory,
583 #[cfg(feature = "parquet_encryption")]
584 encryption_factory: self.get_encryption_factory_with_config(),
585 max_predicate_cache_size: self.max_predicate_cache_size(),
586 })
587 }
588
589 fn as_any(&self) -> &dyn Any {
590 self
591 }
592
593 fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
594 self.predicate.clone()
595 }
596
597 fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
598 let mut conf = self.clone();
599 conf.batch_size = Some(batch_size);
600 Arc::new(conf)
601 }
602
603 fn with_schema(&self, schema: TableSchema) -> Arc<dyn FileSource> {
604 Arc::new(Self {
605 table_schema: Some(schema),
606 ..self.clone()
607 })
608 }
609
610 fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
611 let mut conf = self.clone();
612 conf.projected_statistics = Some(statistics);
613 Arc::new(conf)
614 }
615
616 fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
617 Arc::new(Self { ..self.clone() })
618 }
619
620 fn metrics(&self) -> &ExecutionPlanMetricsSet {
621 &self.metrics
622 }
623
624 fn statistics(&self) -> datafusion_common::Result<Statistics> {
625 let statistics = &self.projected_statistics;
626 let statistics = statistics
627 .clone()
628 .expect("projected_statistics must be set");
629 // When filters are pushed down, we have no way of knowing the exact statistics.
630 // Note that pruning predicate is also a kind of filter pushdown.
631 // (bloom filters use `pruning_predicate` too).
632 // Because filter pushdown may happen dynamically as long as there is a predicate
633 // if we have *any* predicate applied, we can't guarantee the statistics are exact.
634 if self.filter().is_some() {
635 Ok(statistics.to_inexact())
636 } else {
637 Ok(statistics)
638 }
639 }
640
641 fn file_type(&self) -> &str {
642 "parquet"
643 }
644
645 fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
646 match t {
647 DisplayFormatType::Default | DisplayFormatType::Verbose => {
648 let predicate_string = self
649 .filter()
650 .map(|p| format!(", predicate={p}"))
651 .unwrap_or_default();
652
653 write!(f, "{predicate_string}")?;
654
655 // Try to build a the pruning predicates.
656 // These are only generated here because it's useful to have *some*
657 // idea of what pushdown is happening when viewing plans.
658 // However it is important to note that these predicates are *not*
659 // necessarily the predicates that are actually evaluated:
660 // the actual predicates are built in reference to the physical schema of
661 // each file, which we do not have at this point and hence cannot use.
662 // Instead we use the logical schema of the file (the table schema without partition columns).
663 if let (Some(file_schema), Some(predicate)) = (
664 &self.table_schema.as_ref().map(|ts| ts.file_schema()),
665 &self.predicate,
666 ) {
667 let predicate_creation_errors = Count::new();
668 if let (Some(pruning_predicate), _) = build_pruning_predicates(
669 Some(predicate),
670 file_schema,
671 &predicate_creation_errors,
672 ) {
673 let mut guarantees = pruning_predicate
674 .literal_guarantees()
675 .iter()
676 .map(|item| format!("{item}"))
677 .collect_vec();
678 guarantees.sort();
679 write!(
680 f,
681 ", pruning_predicate={}, required_guarantees=[{}]",
682 pruning_predicate.predicate_expr(),
683 guarantees.join(", ")
684 )?;
685 }
686 };
687 Ok(())
688 }
689 DisplayFormatType::TreeRender => {
690 if let Some(predicate) = self.filter() {
691 writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?;
692 }
693 Ok(())
694 }
695 }
696 }
697
698 fn try_pushdown_filters(
699 &self,
700 filters: Vec<Arc<dyn PhysicalExpr>>,
701 config: &ConfigOptions,
702 ) -> datafusion_common::Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
703 let Some(table_schema) = self
704 .table_schema
705 .as_ref()
706 .map(|ts| ts.table_schema())
707 .cloned()
708 else {
709 return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
710 vec![PushedDown::No; filters.len()],
711 ));
712 };
713 // Determine if based on configs we should push filters down.
714 // If either the table / scan itself or the config has pushdown enabled,
715 // we will push down the filters.
716 // If both are disabled, we will not push down the filters.
717 // By default they are both disabled.
718 // Regardless of pushdown, we will update the predicate to include the filters
719 // because even if scan pushdown is disabled we can still use the filters for stats pruning.
720 let config_pushdown_enabled = config.execution.parquet.pushdown_filters;
721 let table_pushdown_enabled = self.pushdown_filters();
722 let pushdown_filters = table_pushdown_enabled || config_pushdown_enabled;
723
724 let mut source = self.clone();
725 let filters: Vec<PushedDownPredicate> = filters
726 .into_iter()
727 .map(|filter| {
728 if can_expr_be_pushed_down_with_schemas(&filter, &table_schema) {
729 PushedDownPredicate::supported(filter)
730 } else {
731 PushedDownPredicate::unsupported(filter)
732 }
733 })
734 .collect();
735 if filters
736 .iter()
737 .all(|f| matches!(f.discriminant, PushedDown::No))
738 {
739 // No filters can be pushed down, so we can just return the remaining filters
740 // and avoid replacing the source in the physical plan.
741 return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
742 vec![PushedDown::No; filters.len()],
743 ));
744 }
745 let allowed_filters = filters
746 .iter()
747 .filter_map(|f| match f.discriminant {
748 PushedDown::Yes => Some(Arc::clone(&f.predicate)),
749 PushedDown::No => None,
750 })
751 .collect_vec();
752 let predicate = match source.predicate {
753 Some(predicate) => {
754 conjunction(std::iter::once(predicate).chain(allowed_filters))
755 }
756 None => conjunction(allowed_filters),
757 };
758 source.predicate = Some(predicate);
759 source = source.with_pushdown_filters(pushdown_filters);
760 let source = Arc::new(source);
761 // If pushdown_filters is false we tell our parents that they still have to handle the filters,
762 // even if we updated the predicate to include the filters (they will only be used for stats pruning).
763 if !pushdown_filters {
764 return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
765 vec![PushedDown::No; filters.len()],
766 )
767 .with_updated_node(source));
768 }
769 Ok(FilterPushdownPropagation::with_parent_pushdown_result(
770 filters.iter().map(|f| f.discriminant).collect(),
771 )
772 .with_updated_node(source))
773 }
774
775 fn with_schema_adapter_factory(
776 &self,
777 schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
778 ) -> datafusion_common::Result<Arc<dyn FileSource>> {
779 Ok(Arc::new(Self {
780 schema_adapter_factory: Some(schema_adapter_factory),
781 ..self.clone()
782 }))
783 }
784
785 fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
786 self.schema_adapter_factory.clone()
787 }
788}
789
790#[cfg(test)]
791mod tests {
792 use super::*;
793 use datafusion_physical_expr::expressions::lit;
794
795 #[test]
796 #[allow(deprecated)]
797 fn test_parquet_source_predicate_same_as_filter() {
798 let predicate = lit(true);
799
800 let parquet_source = ParquetSource::default().with_predicate(predicate);
801 // same value. but filter() call Arc::clone internally
802 assert_eq!(parquet_source.predicate(), parquet_source.filter().as_ref());
803 }
804}