datafusion_catalog_listing/
table.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::config::SchemaSource;
19use crate::helpers::{expr_applicable_for_cols, pruned_partition_list};
20use crate::{ListingOptions, ListingTableConfig};
21use arrow::datatypes::{Field, Schema, SchemaBuilder, SchemaRef};
22use async_trait::async_trait;
23use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider};
24use datafusion_common::stats::Precision;
25use datafusion_common::{
26    internal_datafusion_err, plan_err, project_schema, Constraints, DataFusionError,
27    SchemaExt, Statistics,
28};
29use datafusion_datasource::file::FileSource;
30use datafusion_datasource::file_groups::FileGroup;
31use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
32use datafusion_datasource::file_sink_config::FileSinkConfig;
33use datafusion_datasource::schema_adapter::{
34    DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory,
35};
36use datafusion_datasource::{
37    compute_all_files_statistics, ListingTableUrl, PartitionedFile,
38};
39use datafusion_execution::cache::cache_manager::FileStatisticsCache;
40use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
41use datafusion_expr::dml::InsertOp;
42use datafusion_expr::execution_props::ExecutionProps;
43use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
44use datafusion_physical_expr::create_lex_ordering;
45use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
46use datafusion_physical_expr_common::sort_expr::LexOrdering;
47use datafusion_physical_plan::empty::EmptyExec;
48use datafusion_physical_plan::ExecutionPlan;
49use futures::{future, stream, Stream, StreamExt, TryStreamExt};
50use object_store::ObjectStore;
51use std::any::Any;
52use std::collections::HashMap;
53use std::sync::Arc;
54
55/// Built in [`TableProvider`] that reads data from one or more files as a single table.
56///
57/// The files are read using an  [`ObjectStore`] instance, for example from
58/// local files or objects from AWS S3.
59///
60/// # Features:
61/// * Reading multiple files as a single table
62/// * Hive style partitioning (e.g., directories named `date=2024-06-01`)
63/// * Merges schemas from files with compatible but not identical schemas (see [`ListingTableConfig::file_schema`])
64/// * `limit`, `filter` and `projection` pushdown for formats that support it (e.g.,
65///   Parquet)
66/// * Statistics collection and pruning based on file metadata
67/// * Pre-existing sort order (see [`ListingOptions::file_sort_order`])
68/// * Metadata caching to speed up repeated queries (see [`FileMetadataCache`])
69/// * Statistics caching (see [`FileStatisticsCache`])
70///
71/// [`FileMetadataCache`]: datafusion_execution::cache::cache_manager::FileMetadataCache
72///
73/// # Reading Directories and Hive Style Partitioning
74///
75/// For example, given the `table1` directory (or object store prefix)
76///
77/// ```text
78/// table1
79///  ├── file1.parquet
80///  └── file2.parquet
81/// ```
82///
83/// A `ListingTable` would read the files `file1.parquet` and `file2.parquet` as
84/// a single table, merging the schemas if the files have compatible but not
85/// identical schemas.
86///
87/// Given the `table2` directory (or object store prefix)
88///
89/// ```text
90/// table2
91///  ├── date=2024-06-01
92///  │    ├── file3.parquet
93///  │    └── file4.parquet
94///  └── date=2024-06-02
95///       └── file5.parquet
96/// ```
97///
98/// A `ListingTable` would read the files `file3.parquet`, `file4.parquet`, and
99/// `file5.parquet` as a single table, again merging schemas if necessary.
100///
101/// Given the hive style partitioning structure (e.g,. directories named
102/// `date=2024-06-01` and `date=2026-06-02`), `ListingTable` also adds a `date`
103/// column when reading the table:
104/// * The files in `table2/date=2024-06-01` will have the value `2024-06-01`
105/// * The files in `table2/date=2024-06-02` will have the value `2024-06-02`.
106///
107/// If the query has a predicate like `WHERE date = '2024-06-01'`
108/// only the corresponding directory will be read.
109///
110/// # See Also
111///
112/// 1. [`ListingTableConfig`]: Configuration options
113/// 1. [`DataSourceExec`]: `ExecutionPlan` used by `ListingTable`
114///
115/// [`DataSourceExec`]: datafusion_datasource::source::DataSourceExec
116///
117/// # Caching Metadata
118///
119/// Some formats, such as Parquet, use the `FileMetadataCache` to cache file
120/// metadata that is needed to execute but expensive to read, such as row
121/// groups and statistics. The cache is scoped to the `SessionContext` and can
122/// be configured via the [runtime config options].
123///
124/// [runtime config options]: https://datafusion.apache.org/user-guide/configs.html#runtime-configuration-settings
125///
126/// # Example: Read a directory of parquet files using a [`ListingTable`]
127///
128/// ```no_run
129/// # use datafusion_common::Result;
130/// # use std::sync::Arc;
131/// # use datafusion_catalog::TableProvider;
132/// # use datafusion_catalog_listing::{ListingOptions, ListingTable, ListingTableConfig};
133/// # use datafusion_datasource::ListingTableUrl;
134/// # use datafusion_datasource_parquet::file_format::ParquetFormat;/// #
135/// # use datafusion_catalog::Session;
136/// async fn get_listing_table(session: &dyn Session) -> Result<Arc<dyn TableProvider>> {
137/// let table_path = "/path/to/parquet";
138///
139/// // Parse the path
140/// let table_path = ListingTableUrl::parse(table_path)?;
141///
142/// // Create default parquet options
143/// let file_format = ParquetFormat::new();
144/// let listing_options = ListingOptions::new(Arc::new(file_format))
145///   .with_file_extension(".parquet");
146///
147/// // Resolve the schema
148/// let resolved_schema = listing_options
149///    .infer_schema(session, &table_path)
150///    .await?;
151///
152/// let config = ListingTableConfig::new(table_path)
153///   .with_listing_options(listing_options)
154///   .with_schema(resolved_schema);
155///
156/// // Create a new TableProvider
157/// let provider = Arc::new(ListingTable::try_new(config)?);
158///
159/// # Ok(provider)
160/// # }
161/// ```
162#[derive(Debug, Clone)]
163pub struct ListingTable {
164    table_paths: Vec<ListingTableUrl>,
165    /// `file_schema` contains only the columns physically stored in the data files themselves.
166    ///     - Represents the actual fields found in files like Parquet, CSV, etc.
167    ///     - Used when reading the raw data from files
168    file_schema: SchemaRef,
169    /// `table_schema` combines `file_schema` + partition columns
170    ///     - Partition columns are derived from directory paths (not stored in files)
171    ///     - These are columns like "year=2022/month=01" in paths like `/data/year=2022/month=01/file.parquet`
172    table_schema: SchemaRef,
173    /// Indicates how the schema was derived (inferred or explicitly specified)
174    schema_source: SchemaSource,
175    /// Options used to configure the listing table such as the file format
176    /// and partitioning information
177    options: ListingOptions,
178    /// The SQL definition for this table, if any
179    definition: Option<String>,
180    /// Cache for collected file statistics
181    collected_statistics: FileStatisticsCache,
182    /// Constraints applied to this table
183    constraints: Constraints,
184    /// Column default expressions for columns that are not physically present in the data files
185    column_defaults: HashMap<String, Expr>,
186    /// Optional [`SchemaAdapterFactory`] for creating schema adapters
187    schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
188    /// Optional [`PhysicalExprAdapterFactory`] for creating physical expression adapters
189    expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
190}
191
192impl ListingTable {
193    /// Create new [`ListingTable`]
194    ///
195    /// See documentation and example on [`ListingTable`] and [`ListingTableConfig`]
196    pub fn try_new(config: ListingTableConfig) -> datafusion_common::Result<Self> {
197        // Extract schema_source before moving other parts of the config
198        let schema_source = config.schema_source();
199
200        let file_schema = config
201            .file_schema
202            .ok_or_else(|| internal_datafusion_err!("No schema provided."))?;
203
204        let options = config
205            .options
206            .ok_or_else(|| internal_datafusion_err!("No ListingOptions provided"))?;
207
208        // Add the partition columns to the file schema
209        let mut builder = SchemaBuilder::from(file_schema.as_ref().to_owned());
210        for (part_col_name, part_col_type) in &options.table_partition_cols {
211            builder.push(Field::new(part_col_name, part_col_type.clone(), false));
212        }
213
214        let table_schema = Arc::new(
215            builder
216                .finish()
217                .with_metadata(file_schema.metadata().clone()),
218        );
219
220        let table = Self {
221            table_paths: config.table_paths,
222            file_schema,
223            table_schema,
224            schema_source,
225            options,
226            definition: None,
227            collected_statistics: Arc::new(DefaultFileStatisticsCache::default()),
228            constraints: Constraints::default(),
229            column_defaults: HashMap::new(),
230            schema_adapter_factory: config.schema_adapter_factory,
231            expr_adapter_factory: config.expr_adapter_factory,
232        };
233
234        Ok(table)
235    }
236
237    /// Assign constraints
238    pub fn with_constraints(mut self, constraints: Constraints) -> Self {
239        self.constraints = constraints;
240        self
241    }
242
243    /// Assign column defaults
244    pub fn with_column_defaults(
245        mut self,
246        column_defaults: HashMap<String, Expr>,
247    ) -> Self {
248        self.column_defaults = column_defaults;
249        self
250    }
251
252    /// Set the [`FileStatisticsCache`] used to cache parquet file statistics.
253    ///
254    /// Setting a statistics cache on the `SessionContext` can avoid refetching statistics
255    /// multiple times in the same session.
256    ///
257    /// If `None`, creates a new [`DefaultFileStatisticsCache`] scoped to this query.
258    pub fn with_cache(mut self, cache: Option<FileStatisticsCache>) -> Self {
259        self.collected_statistics =
260            cache.unwrap_or_else(|| Arc::new(DefaultFileStatisticsCache::default()));
261        self
262    }
263
264    /// Specify the SQL definition for this table, if any
265    pub fn with_definition(mut self, definition: Option<String>) -> Self {
266        self.definition = definition;
267        self
268    }
269
270    /// Get paths ref
271    pub fn table_paths(&self) -> &Vec<ListingTableUrl> {
272        &self.table_paths
273    }
274
275    /// Get options ref
276    pub fn options(&self) -> &ListingOptions {
277        &self.options
278    }
279
280    /// Get the schema source
281    pub fn schema_source(&self) -> SchemaSource {
282        self.schema_source
283    }
284
285    /// Set the [`SchemaAdapterFactory`] for this [`ListingTable`]
286    ///
287    /// The schema adapter factory is used to create schema adapters that can
288    /// handle schema evolution and type conversions when reading files with
289    /// different schemas than the table schema.
290    ///
291    /// # Example: Adding Schema Evolution Support
292    /// ```rust
293    /// # use std::sync::Arc;
294    /// # use datafusion_catalog_listing::{ListingTable, ListingTableConfig, ListingOptions};
295    /// # use datafusion_datasource::ListingTableUrl;
296    /// # use datafusion_datasource::schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapter};
297    /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
298    /// # use arrow::datatypes::{SchemaRef, Schema, Field, DataType};
299    /// # let table_path = ListingTableUrl::parse("file:///path/to/data").unwrap();
300    /// # let options = ListingOptions::new(Arc::new(ParquetFormat::default()));
301    /// # let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
302    /// # let config = ListingTableConfig::new(table_path).with_listing_options(options).with_schema(schema);
303    /// # let table = ListingTable::try_new(config).unwrap();
304    /// let table_with_evolution = table
305    ///     .with_schema_adapter_factory(Arc::new(DefaultSchemaAdapterFactory));
306    /// ```
307    /// See [`ListingTableConfig::with_schema_adapter_factory`] for an example of custom SchemaAdapterFactory.
308    pub fn with_schema_adapter_factory(
309        self,
310        schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
311    ) -> Self {
312        Self {
313            schema_adapter_factory: Some(schema_adapter_factory),
314            ..self
315        }
316    }
317
318    /// Get the [`SchemaAdapterFactory`] for this table
319    pub fn schema_adapter_factory(&self) -> Option<&Arc<dyn SchemaAdapterFactory>> {
320        self.schema_adapter_factory.as_ref()
321    }
322
323    /// Creates a schema adapter for mapping between file and table schemas
324    ///
325    /// Uses the configured schema adapter factory if available, otherwise falls back
326    /// to the default implementation.
327    fn create_schema_adapter(&self) -> Box<dyn SchemaAdapter> {
328        let table_schema = self.schema();
329        match &self.schema_adapter_factory {
330            Some(factory) => {
331                factory.create_with_projected_schema(Arc::clone(&table_schema))
332            }
333            None => DefaultSchemaAdapterFactory::from_schema(Arc::clone(&table_schema)),
334        }
335    }
336
337    /// Creates a file source and applies schema adapter factory if available
338    fn create_file_source_with_schema_adapter(
339        &self,
340    ) -> datafusion_common::Result<Arc<dyn FileSource>> {
341        let mut source = self.options.format.file_source();
342        // Apply schema adapter to source if available
343        //
344        // The source will use this SchemaAdapter to adapt data batches as they flow up the plan.
345        // Note: ListingTable also creates a SchemaAdapter in `scan()` but that is only used to adapt collected statistics.
346        if let Some(factory) = &self.schema_adapter_factory {
347            source = source.with_schema_adapter_factory(Arc::clone(factory))?;
348        }
349        Ok(source)
350    }
351
352    /// If file_sort_order is specified, creates the appropriate physical expressions
353    pub fn try_create_output_ordering(
354        &self,
355        execution_props: &ExecutionProps,
356    ) -> datafusion_common::Result<Vec<LexOrdering>> {
357        create_lex_ordering(
358            &self.table_schema,
359            &self.options.file_sort_order,
360            execution_props,
361        )
362    }
363}
364
365// Expressions can be used for partition pruning if they can be evaluated using
366// only the partition columns and there are partition columns.
367fn can_be_evaluated_for_partition_pruning(
368    partition_column_names: &[&str],
369    expr: &Expr,
370) -> bool {
371    !partition_column_names.is_empty()
372        && expr_applicable_for_cols(partition_column_names, expr)
373}
374
375#[async_trait]
376impl TableProvider for ListingTable {
377    fn as_any(&self) -> &dyn Any {
378        self
379    }
380
381    fn schema(&self) -> SchemaRef {
382        Arc::clone(&self.table_schema)
383    }
384
385    fn constraints(&self) -> Option<&Constraints> {
386        Some(&self.constraints)
387    }
388
389    fn table_type(&self) -> TableType {
390        TableType::Base
391    }
392
393    async fn scan(
394        &self,
395        state: &dyn Session,
396        projection: Option<&Vec<usize>>,
397        filters: &[Expr],
398        limit: Option<usize>,
399    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
400        let options = ScanArgs::default()
401            .with_projection(projection.map(|p| p.as_slice()))
402            .with_filters(Some(filters))
403            .with_limit(limit);
404        Ok(self.scan_with_args(state, options).await?.into_inner())
405    }
406
407    async fn scan_with_args<'a>(
408        &self,
409        state: &dyn Session,
410        args: ScanArgs<'a>,
411    ) -> datafusion_common::Result<ScanResult> {
412        let projection = args.projection().map(|p| p.to_vec());
413        let filters = args.filters().map(|f| f.to_vec()).unwrap_or_default();
414        let limit = args.limit();
415
416        // extract types of partition columns
417        let table_partition_cols = self
418            .options
419            .table_partition_cols
420            .iter()
421            .map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone()))
422            .collect::<datafusion_common::Result<Vec<_>>>()?;
423
424        let table_partition_col_names = table_partition_cols
425            .iter()
426            .map(|field| field.name().as_str())
427            .collect::<Vec<_>>();
428
429        // If the filters can be resolved using only partition cols, there is no need to
430        // pushdown it to TableScan, otherwise, `unhandled` pruning predicates will be generated
431        let (partition_filters, filters): (Vec<_>, Vec<_>) =
432            filters.iter().cloned().partition(|filter| {
433                can_be_evaluated_for_partition_pruning(&table_partition_col_names, filter)
434            });
435
436        // We should not limit the number of partitioned files to scan if there are filters and limit
437        // at the same time. This is because the limit should be applied after the filters are applied.
438        let statistic_file_limit = if filters.is_empty() { limit } else { None };
439
440        let (mut partitioned_file_lists, statistics) = self
441            .list_files_for_scan(state, &partition_filters, statistic_file_limit)
442            .await?;
443
444        // if no files need to be read, return an `EmptyExec`
445        if partitioned_file_lists.is_empty() {
446            let projected_schema = project_schema(&self.schema(), projection.as_ref())?;
447            return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema))));
448        }
449
450        let output_ordering = self.try_create_output_ordering(state.execution_props())?;
451        match state
452            .config_options()
453            .execution
454            .split_file_groups_by_statistics
455            .then(|| {
456                output_ordering.first().map(|output_ordering| {
457                    FileScanConfig::split_groups_by_statistics_with_target_partitions(
458                        &self.table_schema,
459                        &partitioned_file_lists,
460                        output_ordering,
461                        self.options.target_partitions,
462                    )
463                })
464            })
465            .flatten()
466        {
467            Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"),
468            Some(Ok(new_groups)) => {
469                if new_groups.len() <= self.options.target_partitions {
470                    partitioned_file_lists = new_groups;
471                } else {
472                    log::debug!("attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered")
473                }
474            }
475            None => {} // no ordering required
476        };
477
478        let Some(object_store_url) =
479            self.table_paths.first().map(ListingTableUrl::object_store)
480        else {
481            return Ok(ScanResult::new(Arc::new(EmptyExec::new(Arc::new(
482                Schema::empty(),
483            )))));
484        };
485
486        let file_source = self.create_file_source_with_schema_adapter()?;
487
488        // create the execution plan
489        let plan = self
490            .options
491            .format
492            .create_physical_plan(
493                state,
494                FileScanConfigBuilder::new(
495                    object_store_url,
496                    Arc::clone(&self.file_schema),
497                    file_source,
498                )
499                .with_file_groups(partitioned_file_lists)
500                .with_constraints(self.constraints.clone())
501                .with_statistics(statistics)
502                .with_projection_indices(projection)
503                .with_limit(limit)
504                .with_output_ordering(output_ordering)
505                .with_table_partition_cols(table_partition_cols)
506                .with_expr_adapter(self.expr_adapter_factory.clone())
507                .build(),
508            )
509            .await?;
510
511        Ok(ScanResult::new(plan))
512    }
513
514    fn supports_filters_pushdown(
515        &self,
516        filters: &[&Expr],
517    ) -> datafusion_common::Result<Vec<TableProviderFilterPushDown>> {
518        let partition_column_names = self
519            .options
520            .table_partition_cols
521            .iter()
522            .map(|col| col.0.as_str())
523            .collect::<Vec<_>>();
524        filters
525            .iter()
526            .map(|filter| {
527                if can_be_evaluated_for_partition_pruning(&partition_column_names, filter)
528                {
529                    // if filter can be handled by partition pruning, it is exact
530                    return Ok(TableProviderFilterPushDown::Exact);
531                }
532
533                Ok(TableProviderFilterPushDown::Inexact)
534            })
535            .collect()
536    }
537
538    fn get_table_definition(&self) -> Option<&str> {
539        self.definition.as_deref()
540    }
541
542    async fn insert_into(
543        &self,
544        state: &dyn Session,
545        input: Arc<dyn ExecutionPlan>,
546        insert_op: InsertOp,
547    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
548        // Check that the schema of the plan matches the schema of this table.
549        self.schema()
550            .logically_equivalent_names_and_types(&input.schema())?;
551
552        let table_path = &self.table_paths()[0];
553        if !table_path.is_collection() {
554            return plan_err!(
555                "Inserting into a ListingTable backed by a single file is not supported, URL is possibly missing a trailing `/`. \
556                To append to an existing file use StreamTable, e.g. by using CREATE UNBOUNDED EXTERNAL TABLE"
557            );
558        }
559
560        // Get the object store for the table path.
561        let store = state.runtime_env().object_store(table_path)?;
562
563        let file_list_stream = pruned_partition_list(
564            state,
565            store.as_ref(),
566            table_path,
567            &[],
568            &self.options.file_extension,
569            &self.options.table_partition_cols,
570        )
571        .await?;
572
573        let file_group = file_list_stream.try_collect::<Vec<_>>().await?.into();
574        let keep_partition_by_columns =
575            state.config_options().execution.keep_partition_by_columns;
576
577        // Sink related option, apart from format
578        let config = FileSinkConfig {
579            original_url: String::default(),
580            object_store_url: self.table_paths()[0].object_store(),
581            table_paths: self.table_paths().clone(),
582            file_group,
583            output_schema: self.schema(),
584            table_partition_cols: self.options.table_partition_cols.clone(),
585            insert_op,
586            keep_partition_by_columns,
587            file_extension: self.options().format.get_ext(),
588        };
589
590        let orderings = self.try_create_output_ordering(state.execution_props())?;
591        // It is sufficient to pass only one of the equivalent orderings:
592        let order_requirements = orderings.into_iter().next().map(Into::into);
593
594        self.options()
595            .format
596            .create_writer_physical_plan(input, state, config, order_requirements)
597            .await
598    }
599
600    fn get_column_default(&self, column: &str) -> Option<&Expr> {
601        self.column_defaults.get(column)
602    }
603}
604
605impl ListingTable {
606    /// Get the list of files for a scan as well as the file level statistics.
607    /// The list is grouped to let the execution plan know how the files should
608    /// be distributed to different threads / executors.
609    pub async fn list_files_for_scan<'a>(
610        &'a self,
611        ctx: &'a dyn Session,
612        filters: &'a [Expr],
613        limit: Option<usize>,
614    ) -> datafusion_common::Result<(Vec<FileGroup>, Statistics)> {
615        let store = if let Some(url) = self.table_paths.first() {
616            ctx.runtime_env().object_store(url)?
617        } else {
618            return Ok((vec![], Statistics::new_unknown(&self.file_schema)));
619        };
620        // list files (with partitions)
621        let file_list = future::try_join_all(self.table_paths.iter().map(|table_path| {
622            pruned_partition_list(
623                ctx,
624                store.as_ref(),
625                table_path,
626                filters,
627                &self.options.file_extension,
628                &self.options.table_partition_cols,
629            )
630        }))
631        .await?;
632        let meta_fetch_concurrency =
633            ctx.config_options().execution.meta_fetch_concurrency;
634        let file_list = stream::iter(file_list).flatten_unordered(meta_fetch_concurrency);
635        // collect the statistics if required by the config
636        let files = file_list
637            .map(|part_file| async {
638                let part_file = part_file?;
639                let statistics = if self.options.collect_stat {
640                    self.do_collect_statistics(ctx, &store, &part_file).await?
641                } else {
642                    Arc::new(Statistics::new_unknown(&self.file_schema))
643                };
644                Ok(part_file.with_statistics(statistics))
645            })
646            .boxed()
647            .buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency);
648
649        let (file_group, inexact_stats) =
650            get_files_with_limit(files, limit, self.options.collect_stat).await?;
651
652        let file_groups = file_group.split_files(self.options.target_partitions);
653        let (mut file_groups, mut stats) = compute_all_files_statistics(
654            file_groups,
655            self.schema(),
656            self.options.collect_stat,
657            inexact_stats,
658        )?;
659
660        let schema_adapter = self.create_schema_adapter();
661        let (schema_mapper, _) = schema_adapter.map_schema(self.file_schema.as_ref())?;
662
663        stats.column_statistics =
664            schema_mapper.map_column_statistics(&stats.column_statistics)?;
665        file_groups.iter_mut().try_for_each(|file_group| {
666            if let Some(stat) = file_group.statistics_mut() {
667                stat.column_statistics =
668                    schema_mapper.map_column_statistics(&stat.column_statistics)?;
669            }
670            Ok::<_, DataFusionError>(())
671        })?;
672        Ok((file_groups, stats))
673    }
674
675    /// Collects statistics for a given partitioned file.
676    ///
677    /// This method first checks if the statistics for the given file are already cached.
678    /// If they are, it returns the cached statistics.
679    /// If they are not, it infers the statistics from the file and stores them in the cache.
680    async fn do_collect_statistics(
681        &self,
682        ctx: &dyn Session,
683        store: &Arc<dyn ObjectStore>,
684        part_file: &PartitionedFile,
685    ) -> datafusion_common::Result<Arc<Statistics>> {
686        match self
687            .collected_statistics
688            .get_with_extra(&part_file.object_meta.location, &part_file.object_meta)
689        {
690            Some(statistics) => Ok(statistics),
691            None => {
692                let statistics = self
693                    .options
694                    .format
695                    .infer_stats(
696                        ctx,
697                        store,
698                        Arc::clone(&self.file_schema),
699                        &part_file.object_meta,
700                    )
701                    .await?;
702                let statistics = Arc::new(statistics);
703                self.collected_statistics.put_with_extra(
704                    &part_file.object_meta.location,
705                    Arc::clone(&statistics),
706                    &part_file.object_meta,
707                );
708                Ok(statistics)
709            }
710        }
711    }
712}
713
714/// Processes a stream of partitioned files and returns a `FileGroup` containing the files.
715///
716/// This function collects files from the provided stream until either:
717/// 1. The stream is exhausted
718/// 2. The accumulated number of rows exceeds the provided `limit` (if specified)
719///
720/// # Arguments
721/// * `files` - A stream of `Result<PartitionedFile>` items to process
722/// * `limit` - An optional row count limit. If provided, the function will stop collecting files
723///   once the accumulated number of rows exceeds this limit
724/// * `collect_stats` - Whether to collect and accumulate statistics from the files
725///
726/// # Returns
727/// A `Result` containing a `FileGroup` with the collected files
728/// and a boolean indicating whether the statistics are inexact.
729///
730/// # Note
731/// The function will continue processing files if statistics are not available or if the
732/// limit is not provided. If `collect_stats` is false, statistics won't be accumulated
733/// but files will still be collected.
734async fn get_files_with_limit(
735    files: impl Stream<Item = datafusion_common::Result<PartitionedFile>>,
736    limit: Option<usize>,
737    collect_stats: bool,
738) -> datafusion_common::Result<(FileGroup, bool)> {
739    let mut file_group = FileGroup::default();
740    // Fusing the stream allows us to call next safely even once it is finished.
741    let mut all_files = Box::pin(files.fuse());
742    enum ProcessingState {
743        ReadingFiles,
744        ReachedLimit,
745    }
746
747    let mut state = ProcessingState::ReadingFiles;
748    let mut num_rows = Precision::Absent;
749
750    while let Some(file_result) = all_files.next().await {
751        // Early exit if we've already reached our limit
752        if matches!(state, ProcessingState::ReachedLimit) {
753            break;
754        }
755
756        let file = file_result?;
757
758        // Update file statistics regardless of state
759        if collect_stats {
760            if let Some(file_stats) = &file.statistics {
761                num_rows = if file_group.is_empty() {
762                    // For the first file, just take its row count
763                    file_stats.num_rows
764                } else {
765                    // For subsequent files, accumulate the counts
766                    num_rows.add(&file_stats.num_rows)
767                };
768            }
769        }
770
771        // Always add the file to our group
772        file_group.push(file);
773
774        // Check if we've hit the limit (if one was specified)
775        if let Some(limit) = limit {
776            if let Precision::Exact(row_count) = num_rows {
777                if row_count > limit {
778                    state = ProcessingState::ReachedLimit;
779                }
780            }
781        }
782    }
783    // If we still have files in the stream, it means that the limit kicked
784    // in, and the statistic could have been different had we processed the
785    // files in a different order.
786    let inexact_stats = all_files.next().await.is_some();
787    Ok((file_group, inexact_stats))
788}