ParquetSource

Struct ParquetSource 

Source
pub struct ParquetSource {
    pub(crate) table_parquet_options: TableParquetOptions,
    pub(crate) metrics: ExecutionPlanMetricsSet,
    pub(crate) table_schema: Option<TableSchema>,
    pub(crate) predicate: Option<Arc<dyn PhysicalExpr>>,
    pub(crate) parquet_file_reader_factory: Option<Arc<dyn ParquetFileReaderFactory>>,
    pub(crate) schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
    pub(crate) batch_size: Option<usize>,
    pub(crate) metadata_size_hint: Option<usize>,
    pub(crate) projected_statistics: Option<Statistics>,
    pub(crate) encryption_factory: Option<Arc<dyn EncryptionFactory>>,
}
Expand description

Execution plan for reading one or more Parquet files.

            ▲
            │
            │  Produce a stream of
            │  RecordBatches
            │
┌───────────────────────┐
│                       │
│     DataSourceExec    │
│                       │
└───────────────────────┘
            ▲
            │  Asynchronously read from one
            │  or more parquet files via
            │  ObjectStore interface
            │
            │
  .───────────────────.
 │                     )
 │`───────────────────'│
 │    ObjectStore      │
 │.───────────────────.│
 │                     )
  `───────────────────'

§Example: Create a DataSourceExec


let source = Arc::new(
    ParquetSource::default()
    .with_predicate(predicate)
);
// Create a DataSourceExec for reading `file1.parquet` with a file size of 100MB
let config = FileScanConfigBuilder::new(object_store_url, file_schema, source)
   .with_file(PartitionedFile::new("file1.parquet", 100*1024*1024)).build();
let exec = DataSourceExec::from_data_source(config);

§Features

Supports the following optimizations:

  • Concurrent reads: reads from one or more files in parallel as multiple partitions, including concurrently reading multiple row groups from a single file.

  • Predicate push down: skips row groups, pages, rows based on metadata and late materialization. See “Predicate Pushdown” below.

  • Projection pushdown: reads and decodes only the columns required.

  • Limit pushdown: stop execution early after some number of rows are read.

  • Custom readers: customize reading parquet files, e.g. to cache metadata, coalesce I/O operations, etc. See ParquetFileReaderFactory for more details.

  • Schema evolution: read parquet files with different schemas into a unified table schema. See SchemaAdapterFactory for more details.

  • metadata_size_hint: controls the number of bytes read from the end of the file in the initial I/O when the default ParquetFileReaderFactory. If a custom reader is used, it supplies the metadata directly and this parameter is ignored. ParquetSource::with_metadata_size_hint for more details.

  • User provided ParquetAccessPlans to skip row groups and/or pages based on external information. See “Implementing External Indexes” below

§Predicate Pushdown

DataSourceExec uses the provided PhysicalExpr predicate as a filter to skip reading unnecessary data and improve query performance using several techniques:

  • Row group pruning: skips entire row groups based on min/max statistics found in ParquetMetaData and any Bloom filters that are present.

  • Page pruning: skips individual pages within a ColumnChunk using the Parquet PageIndex, if present.

  • Row filtering: skips rows within a page using a form of late materialization. When possible, predicates are applied by the parquet decoder during decode (see ArrowPredicate and RowFilter for more details). This is only enabled if ParquetScanOptions::pushdown_filters is set to true.

Note: If the predicate can not be used to accelerate the scan, it is ignored (no error is raised on predicate evaluation errors).

§Example: rewriting DataSourceExec

You can modify a DataSourceExec using ParquetSource, for example to change files or add a predicate.


// Split a single DataSourceExec into multiple DataSourceExecs, one for each file
let exec = parquet_exec();
let data_source = exec.data_source();
let base_config = data_source.as_any().downcast_ref::<FileScanConfig>().unwrap();
let existing_file_groups = &base_config.file_groups;
let new_execs = existing_file_groups
  .iter()
  .map(|file_group| {
    // create a new exec by copying the existing exec's source config
    let new_config = FileScanConfigBuilder::from(base_config.clone())
       .with_file_groups(vec![file_group.clone()])
      .build();

    (DataSourceExec::from_data_source(new_config))
  })
  .collect::<Vec<_>>();

§Implementing External Indexes

It is possible to restrict the row groups and selections within those row groups that the DataSourceExec will consider by providing an initial ParquetAccessPlan as extensions on PartitionedFile. This can be used to implement external indexes on top of parquet files and select only portions of the files.

The DataSourceExec will try and reduce any provided ParquetAccessPlan further based on the contents of ParquetMetadata and other settings.

§Example of providing a ParquetAccessPlan


// create an access plan to scan row group 0, 1 and 3 and skip row groups 2 and 4
let mut access_plan = ParquetAccessPlan::new_all(5);
access_plan.skip(2);
access_plan.skip(4);
// provide the plan as extension to the FileScanConfig
let partitioned_file = PartitionedFile::new("my_file.parquet", 1234)
  .with_extensions(Arc::new(access_plan));
// create a FileScanConfig to scan this file
let config = FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), schema(), Arc::new(ParquetSource::default()))
    .with_file(partitioned_file).build();
// this parquet DataSourceExec will not even try to read row groups 2 and 4. Additional
// pruning based on predicates may also happen
let exec = DataSourceExec::from_data_source(config);

For a complete example, see the [advanced_parquet_index example]).

§Execution Overview

  • Step 1: DataSourceExec::execute is called, returning a FileStream configured to open parquet files with a ParquetOpener.

  • Step 2: When the stream is polled, the ParquetOpener is called to open the file.

  • Step 3: The ParquetOpener gets the ParquetMetaData (file metadata) via ParquetFileReaderFactory, creating a ParquetAccessPlan by applying predicates to metadata. The plan and projections are used to determine what pages must be read.

  • Step 4: The stream begins reading data, fetching the required parquet pages incrementally decoding them, and applying any row filters (see Self::with_pushdown_filters).

  • Step 5: As each RecordBatch is read, it may be adapted by a SchemaAdapter to match the table schema. By default missing columns are filled with nulls, but this can be customized via SchemaAdapterFactory.

Fields§

§table_parquet_options: TableParquetOptions§metrics: ExecutionPlanMetricsSet§table_schema: Option<TableSchema>§predicate: Option<Arc<dyn PhysicalExpr>>§parquet_file_reader_factory: Option<Arc<dyn ParquetFileReaderFactory>>§schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>§batch_size: Option<usize>§metadata_size_hint: Option<usize>§projected_statistics: Option<Statistics>§encryption_factory: Option<Arc<dyn EncryptionFactory>>

Implementations§

Source§

impl ParquetSource

Source

pub fn new(table_parquet_options: TableParquetOptions) -> ParquetSource

Create a new ParquetSource to read the data specified in the file scan configuration with the provided TableParquetOptions. if default values are going to be used, use ParguetConfig::default() instead

Source

pub fn with_metadata_size_hint(self, metadata_size_hint: usize) -> ParquetSource

Set the metadata size hint

This value determines how many bytes at the end of the file the default ParquetFileReaderFactory will request in the initial IO. If this is too small, the ParquetSource will need to make additional IO requests to read the footer.

Source

pub fn with_predicate(&self, predicate: Arc<dyn PhysicalExpr>) -> ParquetSource

Set predicate information

Source

pub fn with_encryption_factory( self, encryption_factory: Arc<dyn EncryptionFactory>, ) -> ParquetSource

Set the encryption factory to use to generate file decryption properties

Source

pub fn table_parquet_options(&self) -> &TableParquetOptions

Options passed to the parquet reader for this scan

Source

pub fn predicate(&self) -> Option<&Arc<dyn PhysicalExpr>>

👎Deprecated since 50.2.0: use filter instead

Optional predicate.

Source

pub fn parquet_file_reader_factory( &self, ) -> Option<&Arc<dyn ParquetFileReaderFactory>>

return the optional file reader factory

Source

pub fn with_parquet_file_reader_factory( self, parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>, ) -> ParquetSource

Optional user defined parquet file reader factory.

Source

pub fn with_pushdown_filters(self, pushdown_filters: bool) -> ParquetSource

If true, the predicate will be used during the parquet scan. Defaults to false.

Source

pub fn with_reorder_filters(self, reorder_filters: bool) -> ParquetSource

If true, the RowFilter made by pushdown_filters may try to minimize the cost of filter evaluation by reordering the predicate Exprs. If false, the predicates are applied in the same order as specified in the query. Defaults to false.

Source

pub fn with_enable_page_index(self, enable_page_index: bool) -> ParquetSource

If enabled, the reader will read the page index This is used to optimize filter pushdown via RowSelector and RowFilter by eliminating unnecessary IO and decoding

Source

pub fn with_bloom_filter_on_read( self, bloom_filter_on_read: bool, ) -> ParquetSource

If enabled, the reader will read by the bloom filter

Source

pub fn with_bloom_filter_on_write( self, enable_bloom_filter_on_write: bool, ) -> ParquetSource

If enabled, the writer will write by the bloom filter

Source

pub fn max_predicate_cache_size(&self) -> Option<usize>

Return the maximum predicate cache size, in bytes, used when pushdown_filters

Source

pub fn apply_schema_adapter( self, conf: &FileScanConfig, ) -> Result<Arc<dyn FileSource>, DataFusionError>

Applies schema adapter factory from the FileScanConfig if present.

§Arguments
  • conf - FileScanConfig that may contain a schema adapter factory
§Returns

The converted FileSource with schema adapter factory applied if provided

Trait Implementations§

Source§

impl Clone for ParquetSource

Source§

fn clone(&self) -> ParquetSource

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for ParquetSource

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), Error>

Formats the value using the given formatter. Read more
Source§

impl Default for ParquetSource

Source§

fn default() -> ParquetSource

Returns the “default value” for a type. Read more
Source§

impl FileSource for ParquetSource

Source§

fn create_file_opener( &self, object_store: Arc<dyn ObjectStore>, base_config: &FileScanConfig, partition: usize, ) -> Arc<dyn FileOpener>

Creates a dyn FileOpener based on given parameters
Source§

fn as_any(&self) -> &(dyn Any + 'static)

Any
Source§

fn filter(&self) -> Option<Arc<dyn PhysicalExpr>>

Returns the filter expression that will be applied during the file scan.
Source§

fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource>

Initialize new type with batch size configuration
Source§

fn with_schema(&self, schema: TableSchema) -> Arc<dyn FileSource>

Initialize new instance with a new schema
Source§

fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource>

Initialize new instance with projected statistics
Source§

fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource>

Initialize new instance with projection information
Source§

fn metrics(&self) -> &ExecutionPlanMetricsSet

Return execution plan metrics
Source§

fn statistics(&self) -> Result<Statistics, DataFusionError>

Return projected statistics
Source§

fn file_type(&self) -> &str

String representation of file source such as “csv”, “json”, “parquet”
Source§

fn fmt_extra( &self, t: DisplayFormatType, f: &mut Formatter<'_>, ) -> Result<(), Error>

Format FileType specific information
Source§

fn try_pushdown_filters( &self, filters: Vec<Arc<dyn PhysicalExpr>>, config: &ConfigOptions, ) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>, DataFusionError>

Try to push down filters into this FileSource. See ExecutionPlan::handle_child_pushdown_result for more details.
Source§

fn with_schema_adapter_factory( &self, schema_adapter_factory: Arc<dyn SchemaAdapterFactory>, ) -> Result<Arc<dyn FileSource>, DataFusionError>

Set optional schema adapter factory. Read more
Source§

fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>>

Returns the current schema adapter factory if set Read more
Source§

fn repartitioned( &self, target_partitions: usize, repartition_file_min_size: usize, output_ordering: Option<LexOrdering>, config: &FileScanConfig, ) -> Result<Option<FileScanConfig>, DataFusionError>

If supported by the FileSource, redistribute files across partitions according to their size. Allows custom file formats to implement their own repartitioning logic. Read more
Source§

impl From<ParquetSource> for Arc<dyn FileSource>

Allows easy conversion from ParquetSource to Arc<dyn FileSource>

Source§

fn from(source: ParquetSource) -> Arc<dyn FileSource>

Converts to this type from the input type.

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
§

impl<T> PolicyExt for T
where T: ?Sized,

§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] only if self and other return Action::Follow. Read more
§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns [Action::Follow] if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

impl<T> ErasedDestructor for T
where T: 'static,