pub struct ParquetSource {
pub(crate) table_parquet_options: TableParquetOptions,
pub(crate) metrics: ExecutionPlanMetricsSet,
pub(crate) table_schema: Option<TableSchema>,
pub(crate) predicate: Option<Arc<dyn PhysicalExpr>>,
pub(crate) parquet_file_reader_factory: Option<Arc<dyn ParquetFileReaderFactory>>,
pub(crate) schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
pub(crate) batch_size: Option<usize>,
pub(crate) metadata_size_hint: Option<usize>,
pub(crate) projected_statistics: Option<Statistics>,
pub(crate) encryption_factory: Option<Arc<dyn EncryptionFactory>>,
}Expand description
Execution plan for reading one or more Parquet files.
▲
│
│ Produce a stream of
│ RecordBatches
│
┌───────────────────────┐
│ │
│ DataSourceExec │
│ │
└───────────────────────┘
▲
│ Asynchronously read from one
│ or more parquet files via
│ ObjectStore interface
│
│
.───────────────────.
│ )
│`───────────────────'│
│ ObjectStore │
│.───────────────────.│
│ )
`───────────────────'§Example: Create a DataSourceExec
let source = Arc::new(
ParquetSource::default()
.with_predicate(predicate)
);
// Create a DataSourceExec for reading `file1.parquet` with a file size of 100MB
let config = FileScanConfigBuilder::new(object_store_url, file_schema, source)
.with_file(PartitionedFile::new("file1.parquet", 100*1024*1024)).build();
let exec = DataSourceExec::from_data_source(config);§Features
Supports the following optimizations:
-
Concurrent reads: reads from one or more files in parallel as multiple partitions, including concurrently reading multiple row groups from a single file.
-
Predicate push down: skips row groups, pages, rows based on metadata and late materialization. See “Predicate Pushdown” below.
-
Projection pushdown: reads and decodes only the columns required.
-
Limit pushdown: stop execution early after some number of rows are read.
-
Custom readers: customize reading parquet files, e.g. to cache metadata, coalesce I/O operations, etc. See
ParquetFileReaderFactoryfor more details. -
Schema evolution: read parquet files with different schemas into a unified table schema. See
SchemaAdapterFactoryfor more details. -
metadata_size_hint: controls the number of bytes read from the end of the file in the initial I/O when the default
ParquetFileReaderFactory. If a custom reader is used, it supplies the metadata directly and this parameter is ignored.ParquetSource::with_metadata_size_hintfor more details. -
User provided
ParquetAccessPlans to skip row groups and/or pages based on external information. See “Implementing External Indexes” below
§Predicate Pushdown
DataSourceExec uses the provided PhysicalExpr predicate as a filter to
skip reading unnecessary data and improve query performance using several techniques:
-
Row group pruning: skips entire row groups based on min/max statistics found in
ParquetMetaDataand any Bloom filters that are present. -
Page pruning: skips individual pages within a ColumnChunk using the Parquet PageIndex, if present.
-
Row filtering: skips rows within a page using a form of late materialization. When possible, predicates are applied by the parquet decoder during decode (see
ArrowPredicateandRowFilterfor more details). This is only enabled ifParquetScanOptions::pushdown_filtersis set to true.
Note: If the predicate can not be used to accelerate the scan, it is ignored (no error is raised on predicate evaluation errors).
§Example: rewriting DataSourceExec
You can modify a DataSourceExec using ParquetSource, for example
to change files or add a predicate.
// Split a single DataSourceExec into multiple DataSourceExecs, one for each file
let exec = parquet_exec();
let data_source = exec.data_source();
let base_config = data_source.as_any().downcast_ref::<FileScanConfig>().unwrap();
let existing_file_groups = &base_config.file_groups;
let new_execs = existing_file_groups
.iter()
.map(|file_group| {
// create a new exec by copying the existing exec's source config
let new_config = FileScanConfigBuilder::from(base_config.clone())
.with_file_groups(vec![file_group.clone()])
.build();
(DataSourceExec::from_data_source(new_config))
})
.collect::<Vec<_>>();§Implementing External Indexes
It is possible to restrict the row groups and selections within those row
groups that the DataSourceExec will consider by providing an initial
ParquetAccessPlan as extensions on PartitionedFile. This can be
used to implement external indexes on top of parquet files and select only
portions of the files.
The DataSourceExec will try and reduce any provided ParquetAccessPlan
further based on the contents of ParquetMetadata and other settings.
§Example of providing a ParquetAccessPlan
// create an access plan to scan row group 0, 1 and 3 and skip row groups 2 and 4
let mut access_plan = ParquetAccessPlan::new_all(5);
access_plan.skip(2);
access_plan.skip(4);
// provide the plan as extension to the FileScanConfig
let partitioned_file = PartitionedFile::new("my_file.parquet", 1234)
.with_extensions(Arc::new(access_plan));
// create a FileScanConfig to scan this file
let config = FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), schema(), Arc::new(ParquetSource::default()))
.with_file(partitioned_file).build();
// this parquet DataSourceExec will not even try to read row groups 2 and 4. Additional
// pruning based on predicates may also happen
let exec = DataSourceExec::from_data_source(config);For a complete example, see the [advanced_parquet_index example]).
§Execution Overview
-
Step 1:
DataSourceExec::executeis called, returning aFileStreamconfigured to open parquet files with aParquetOpener. -
Step 2: When the stream is polled, the
ParquetOpeneris called to open the file. -
Step 3: The
ParquetOpenergets theParquetMetaData(file metadata) viaParquetFileReaderFactory, creating aParquetAccessPlanby applying predicates to metadata. The plan and projections are used to determine what pages must be read. -
Step 4: The stream begins reading data, fetching the required parquet pages incrementally decoding them, and applying any row filters (see
Self::with_pushdown_filters). -
Step 5: As each
RecordBatchis read, it may be adapted by aSchemaAdapterto match the table schema. By default missing columns are filled with nulls, but this can be customized viaSchemaAdapterFactory.
Fields§
§table_parquet_options: TableParquetOptions§metrics: ExecutionPlanMetricsSet§table_schema: Option<TableSchema>§predicate: Option<Arc<dyn PhysicalExpr>>§parquet_file_reader_factory: Option<Arc<dyn ParquetFileReaderFactory>>§schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>§batch_size: Option<usize>§metadata_size_hint: Option<usize>§projected_statistics: Option<Statistics>§encryption_factory: Option<Arc<dyn EncryptionFactory>>Implementations§
Source§impl ParquetSource
impl ParquetSource
Sourcepub fn new(table_parquet_options: TableParquetOptions) -> ParquetSource
pub fn new(table_parquet_options: TableParquetOptions) -> ParquetSource
Create a new ParquetSource to read the data specified in the file scan
configuration with the provided TableParquetOptions.
if default values are going to be used, use ParguetConfig::default() instead
Sourcepub fn with_metadata_size_hint(self, metadata_size_hint: usize) -> ParquetSource
pub fn with_metadata_size_hint(self, metadata_size_hint: usize) -> ParquetSource
Set the metadata size hint
This value determines how many bytes at the end of the file the default
ParquetFileReaderFactory will request in the initial IO. If this is
too small, the ParquetSource will need to make additional IO requests to
read the footer.
Sourcepub fn with_predicate(&self, predicate: Arc<dyn PhysicalExpr>) -> ParquetSource
pub fn with_predicate(&self, predicate: Arc<dyn PhysicalExpr>) -> ParquetSource
Set predicate information
Sourcepub fn with_encryption_factory(
self,
encryption_factory: Arc<dyn EncryptionFactory>,
) -> ParquetSource
pub fn with_encryption_factory( self, encryption_factory: Arc<dyn EncryptionFactory>, ) -> ParquetSource
Set the encryption factory to use to generate file decryption properties
Sourcepub fn table_parquet_options(&self) -> &TableParquetOptions
pub fn table_parquet_options(&self) -> &TableParquetOptions
Options passed to the parquet reader for this scan
Sourcepub fn predicate(&self) -> Option<&Arc<dyn PhysicalExpr>>
👎Deprecated since 50.2.0: use filter instead
pub fn predicate(&self) -> Option<&Arc<dyn PhysicalExpr>>
filter insteadOptional predicate.
Sourcepub fn parquet_file_reader_factory(
&self,
) -> Option<&Arc<dyn ParquetFileReaderFactory>>
pub fn parquet_file_reader_factory( &self, ) -> Option<&Arc<dyn ParquetFileReaderFactory>>
return the optional file reader factory
Sourcepub fn with_parquet_file_reader_factory(
self,
parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
) -> ParquetSource
pub fn with_parquet_file_reader_factory( self, parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>, ) -> ParquetSource
Optional user defined parquet file reader factory.
Sourcepub fn with_pushdown_filters(self, pushdown_filters: bool) -> ParquetSource
pub fn with_pushdown_filters(self, pushdown_filters: bool) -> ParquetSource
If true, the predicate will be used during the parquet scan. Defaults to false.
Sourcepub fn with_reorder_filters(self, reorder_filters: bool) -> ParquetSource
pub fn with_reorder_filters(self, reorder_filters: bool) -> ParquetSource
If true, the RowFilter made by pushdown_filters may try to
minimize the cost of filter evaluation by reordering the
predicate Exprs. If false, the predicates are applied in
the same order as specified in the query. Defaults to false.
Sourcepub fn with_enable_page_index(self, enable_page_index: bool) -> ParquetSource
pub fn with_enable_page_index(self, enable_page_index: bool) -> ParquetSource
If enabled, the reader will read the page index
This is used to optimize filter pushdown
via RowSelector and RowFilter by
eliminating unnecessary IO and decoding
Sourcepub fn with_bloom_filter_on_read(
self,
bloom_filter_on_read: bool,
) -> ParquetSource
pub fn with_bloom_filter_on_read( self, bloom_filter_on_read: bool, ) -> ParquetSource
If enabled, the reader will read by the bloom filter
Sourcepub fn with_bloom_filter_on_write(
self,
enable_bloom_filter_on_write: bool,
) -> ParquetSource
pub fn with_bloom_filter_on_write( self, enable_bloom_filter_on_write: bool, ) -> ParquetSource
If enabled, the writer will write by the bloom filter
Sourcepub fn max_predicate_cache_size(&self) -> Option<usize>
pub fn max_predicate_cache_size(&self) -> Option<usize>
Return the maximum predicate cache size, in bytes, used when
pushdown_filters
Sourcepub fn apply_schema_adapter(
self,
conf: &FileScanConfig,
) -> Result<Arc<dyn FileSource>, DataFusionError>
pub fn apply_schema_adapter( self, conf: &FileScanConfig, ) -> Result<Arc<dyn FileSource>, DataFusionError>
Trait Implementations§
Source§impl Clone for ParquetSource
impl Clone for ParquetSource
Source§fn clone(&self) -> ParquetSource
fn clone(&self) -> ParquetSource
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreSource§impl Debug for ParquetSource
impl Debug for ParquetSource
Source§impl Default for ParquetSource
impl Default for ParquetSource
Source§fn default() -> ParquetSource
fn default() -> ParquetSource
Source§impl FileSource for ParquetSource
impl FileSource for ParquetSource
Source§fn create_file_opener(
&self,
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
partition: usize,
) -> Arc<dyn FileOpener>
fn create_file_opener( &self, object_store: Arc<dyn ObjectStore>, base_config: &FileScanConfig, partition: usize, ) -> Arc<dyn FileOpener>
dyn FileOpener based on given parametersSource§fn filter(&self) -> Option<Arc<dyn PhysicalExpr>>
fn filter(&self) -> Option<Arc<dyn PhysicalExpr>>
Source§fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource>
fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource>
Source§fn with_schema(&self, schema: TableSchema) -> Arc<dyn FileSource>
fn with_schema(&self, schema: TableSchema) -> Arc<dyn FileSource>
Source§fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource>
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource>
Source§fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource>
fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource>
Source§fn metrics(&self) -> &ExecutionPlanMetricsSet
fn metrics(&self) -> &ExecutionPlanMetricsSet
Source§fn statistics(&self) -> Result<Statistics, DataFusionError>
fn statistics(&self) -> Result<Statistics, DataFusionError>
Source§fn file_type(&self) -> &str
fn file_type(&self) -> &str
Source§fn fmt_extra(
&self,
t: DisplayFormatType,
f: &mut Formatter<'_>,
) -> Result<(), Error>
fn fmt_extra( &self, t: DisplayFormatType, f: &mut Formatter<'_>, ) -> Result<(), Error>
Source§fn try_pushdown_filters(
&self,
filters: Vec<Arc<dyn PhysicalExpr>>,
config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>, DataFusionError>
fn try_pushdown_filters( &self, filters: Vec<Arc<dyn PhysicalExpr>>, config: &ConfigOptions, ) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>, DataFusionError>
ExecutionPlan::handle_child_pushdown_result for more details.Source§fn with_schema_adapter_factory(
&self,
schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
) -> Result<Arc<dyn FileSource>, DataFusionError>
fn with_schema_adapter_factory( &self, schema_adapter_factory: Arc<dyn SchemaAdapterFactory>, ) -> Result<Arc<dyn FileSource>, DataFusionError>
Source§fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>>
fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>>
Source§fn repartitioned(
&self,
target_partitions: usize,
repartition_file_min_size: usize,
output_ordering: Option<LexOrdering>,
config: &FileScanConfig,
) -> Result<Option<FileScanConfig>, DataFusionError>
fn repartitioned( &self, target_partitions: usize, repartition_file_min_size: usize, output_ordering: Option<LexOrdering>, config: &FileScanConfig, ) -> Result<Option<FileScanConfig>, DataFusionError>
FileSource, redistribute files across partitions
according to their size. Allows custom file formats to implement their
own repartitioning logic. Read moreSource§impl From<ParquetSource> for Arc<dyn FileSource>
Allows easy conversion from ParquetSource to Arc<dyn FileSource>
impl From<ParquetSource> for Arc<dyn FileSource>
Allows easy conversion from ParquetSource to Arc<dyn FileSource>
Source§fn from(source: ParquetSource) -> Arc<dyn FileSource>
fn from(source: ParquetSource) -> Arc<dyn FileSource>
Auto Trait Implementations§
impl Freeze for ParquetSource
impl !RefUnwindSafe for ParquetSource
impl Send for ParquetSource
impl Sync for ParquetSource
impl Unpin for ParquetSource
impl !UnwindSafe for ParquetSource
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more