datafusion_catalog_listing/table.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::config::SchemaSource;
19use crate::helpers::{expr_applicable_for_cols, pruned_partition_list};
20use crate::{ListingOptions, ListingTableConfig};
21use arrow::datatypes::{Field, Schema, SchemaBuilder, SchemaRef};
22use async_trait::async_trait;
23use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider};
24use datafusion_common::stats::Precision;
25use datafusion_common::{
26 internal_datafusion_err, plan_err, project_schema, Constraints, DataFusionError,
27 SchemaExt, Statistics,
28};
29use datafusion_datasource::file::FileSource;
30use datafusion_datasource::file_groups::FileGroup;
31use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
32use datafusion_datasource::file_sink_config::FileSinkConfig;
33use datafusion_datasource::schema_adapter::{
34 DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory,
35};
36use datafusion_datasource::{
37 compute_all_files_statistics, ListingTableUrl, PartitionedFile,
38};
39use datafusion_execution::cache::cache_manager::FileStatisticsCache;
40use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
41use datafusion_expr::dml::InsertOp;
42use datafusion_expr::execution_props::ExecutionProps;
43use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
44use datafusion_physical_expr::create_lex_ordering;
45use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
46use datafusion_physical_expr_common::sort_expr::LexOrdering;
47use datafusion_physical_plan::empty::EmptyExec;
48use datafusion_physical_plan::ExecutionPlan;
49use futures::{future, stream, Stream, StreamExt, TryStreamExt};
50use object_store::ObjectStore;
51use std::any::Any;
52use std::collections::HashMap;
53use std::sync::Arc;
54
55/// Built in [`TableProvider`] that reads data from one or more files as a single table.
56///
57/// The files are read using an [`ObjectStore`] instance, for example from
58/// local files or objects from AWS S3.
59///
60/// # Features:
61/// * Reading multiple files as a single table
62/// * Hive style partitioning (e.g., directories named `date=2024-06-01`)
63/// * Merges schemas from files with compatible but not identical schemas (see [`ListingTableConfig::file_schema`])
64/// * `limit`, `filter` and `projection` pushdown for formats that support it (e.g.,
65/// Parquet)
66/// * Statistics collection and pruning based on file metadata
67/// * Pre-existing sort order (see [`ListingOptions::file_sort_order`])
68/// * Metadata caching to speed up repeated queries (see [`FileMetadataCache`])
69/// * Statistics caching (see [`FileStatisticsCache`])
70///
71/// [`FileMetadataCache`]: datafusion_execution::cache::cache_manager::FileMetadataCache
72///
73/// # Reading Directories and Hive Style Partitioning
74///
75/// For example, given the `table1` directory (or object store prefix)
76///
77/// ```text
78/// table1
79/// ├── file1.parquet
80/// └── file2.parquet
81/// ```
82///
83/// A `ListingTable` would read the files `file1.parquet` and `file2.parquet` as
84/// a single table, merging the schemas if the files have compatible but not
85/// identical schemas.
86///
87/// Given the `table2` directory (or object store prefix)
88///
89/// ```text
90/// table2
91/// ├── date=2024-06-01
92/// │ ├── file3.parquet
93/// │ └── file4.parquet
94/// └── date=2024-06-02
95/// └── file5.parquet
96/// ```
97///
98/// A `ListingTable` would read the files `file3.parquet`, `file4.parquet`, and
99/// `file5.parquet` as a single table, again merging schemas if necessary.
100///
101/// Given the hive style partitioning structure (e.g,. directories named
102/// `date=2024-06-01` and `date=2026-06-02`), `ListingTable` also adds a `date`
103/// column when reading the table:
104/// * The files in `table2/date=2024-06-01` will have the value `2024-06-01`
105/// * The files in `table2/date=2024-06-02` will have the value `2024-06-02`.
106///
107/// If the query has a predicate like `WHERE date = '2024-06-01'`
108/// only the corresponding directory will be read.
109///
110/// # See Also
111///
112/// 1. [`ListingTableConfig`]: Configuration options
113/// 1. [`DataSourceExec`]: `ExecutionPlan` used by `ListingTable`
114///
115/// [`DataSourceExec`]: datafusion_datasource::source::DataSourceExec
116///
117/// # Caching Metadata
118///
119/// Some formats, such as Parquet, use the `FileMetadataCache` to cache file
120/// metadata that is needed to execute but expensive to read, such as row
121/// groups and statistics. The cache is scoped to the `SessionContext` and can
122/// be configured via the [runtime config options].
123///
124/// [runtime config options]: https://datafusion.apache.org/user-guide/configs.html#runtime-configuration-settings
125///
126/// # Example: Read a directory of parquet files using a [`ListingTable`]
127///
128/// ```no_run
129/// # use datafusion_common::Result;
130/// # use std::sync::Arc;
131/// # use datafusion_catalog::TableProvider;
132/// # use datafusion_catalog_listing::{ListingOptions, ListingTable, ListingTableConfig};
133/// # use datafusion_datasource::ListingTableUrl;
134/// # use datafusion_datasource_parquet::file_format::ParquetFormat;/// #
135/// # use datafusion_catalog::Session;
136/// async fn get_listing_table(session: &dyn Session) -> Result<Arc<dyn TableProvider>> {
137/// let table_path = "/path/to/parquet";
138///
139/// // Parse the path
140/// let table_path = ListingTableUrl::parse(table_path)?;
141///
142/// // Create default parquet options
143/// let file_format = ParquetFormat::new();
144/// let listing_options = ListingOptions::new(Arc::new(file_format))
145/// .with_file_extension(".parquet");
146///
147/// // Resolve the schema
148/// let resolved_schema = listing_options
149/// .infer_schema(session, &table_path)
150/// .await?;
151///
152/// let config = ListingTableConfig::new(table_path)
153/// .with_listing_options(listing_options)
154/// .with_schema(resolved_schema);
155///
156/// // Create a new TableProvider
157/// let provider = Arc::new(ListingTable::try_new(config)?);
158///
159/// # Ok(provider)
160/// # }
161/// ```
162#[derive(Debug, Clone)]
163pub struct ListingTable {
164 table_paths: Vec<ListingTableUrl>,
165 /// `file_schema` contains only the columns physically stored in the data files themselves.
166 /// - Represents the actual fields found in files like Parquet, CSV, etc.
167 /// - Used when reading the raw data from files
168 file_schema: SchemaRef,
169 /// `table_schema` combines `file_schema` + partition columns
170 /// - Partition columns are derived from directory paths (not stored in files)
171 /// - These are columns like "year=2022/month=01" in paths like `/data/year=2022/month=01/file.parquet`
172 table_schema: SchemaRef,
173 /// Indicates how the schema was derived (inferred or explicitly specified)
174 schema_source: SchemaSource,
175 /// Options used to configure the listing table such as the file format
176 /// and partitioning information
177 options: ListingOptions,
178 /// The SQL definition for this table, if any
179 definition: Option<String>,
180 /// Cache for collected file statistics
181 collected_statistics: FileStatisticsCache,
182 /// Constraints applied to this table
183 constraints: Constraints,
184 /// Column default expressions for columns that are not physically present in the data files
185 column_defaults: HashMap<String, Expr>,
186 /// Optional [`SchemaAdapterFactory`] for creating schema adapters
187 schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
188 /// Optional [`PhysicalExprAdapterFactory`] for creating physical expression adapters
189 expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
190}
191
192impl ListingTable {
193 /// Create new [`ListingTable`]
194 ///
195 /// See documentation and example on [`ListingTable`] and [`ListingTableConfig`]
196 pub fn try_new(config: ListingTableConfig) -> datafusion_common::Result<Self> {
197 // Extract schema_source before moving other parts of the config
198 let schema_source = config.schema_source();
199
200 let file_schema = config
201 .file_schema
202 .ok_or_else(|| internal_datafusion_err!("No schema provided."))?;
203
204 let options = config
205 .options
206 .ok_or_else(|| internal_datafusion_err!("No ListingOptions provided"))?;
207
208 // Add the partition columns to the file schema
209 let mut builder = SchemaBuilder::from(file_schema.as_ref().to_owned());
210 for (part_col_name, part_col_type) in &options.table_partition_cols {
211 builder.push(Field::new(part_col_name, part_col_type.clone(), false));
212 }
213
214 let table_schema = Arc::new(
215 builder
216 .finish()
217 .with_metadata(file_schema.metadata().clone()),
218 );
219
220 let table = Self {
221 table_paths: config.table_paths,
222 file_schema,
223 table_schema,
224 schema_source,
225 options,
226 definition: None,
227 collected_statistics: Arc::new(DefaultFileStatisticsCache::default()),
228 constraints: Constraints::default(),
229 column_defaults: HashMap::new(),
230 schema_adapter_factory: config.schema_adapter_factory,
231 expr_adapter_factory: config.expr_adapter_factory,
232 };
233
234 Ok(table)
235 }
236
237 /// Assign constraints
238 pub fn with_constraints(mut self, constraints: Constraints) -> Self {
239 self.constraints = constraints;
240 self
241 }
242
243 /// Assign column defaults
244 pub fn with_column_defaults(
245 mut self,
246 column_defaults: HashMap<String, Expr>,
247 ) -> Self {
248 self.column_defaults = column_defaults;
249 self
250 }
251
252 /// Set the [`FileStatisticsCache`] used to cache parquet file statistics.
253 ///
254 /// Setting a statistics cache on the `SessionContext` can avoid refetching statistics
255 /// multiple times in the same session.
256 ///
257 /// If `None`, creates a new [`DefaultFileStatisticsCache`] scoped to this query.
258 pub fn with_cache(mut self, cache: Option<FileStatisticsCache>) -> Self {
259 self.collected_statistics =
260 cache.unwrap_or_else(|| Arc::new(DefaultFileStatisticsCache::default()));
261 self
262 }
263
264 /// Specify the SQL definition for this table, if any
265 pub fn with_definition(mut self, definition: Option<String>) -> Self {
266 self.definition = definition;
267 self
268 }
269
270 /// Get paths ref
271 pub fn table_paths(&self) -> &Vec<ListingTableUrl> {
272 &self.table_paths
273 }
274
275 /// Get options ref
276 pub fn options(&self) -> &ListingOptions {
277 &self.options
278 }
279
280 /// Get the schema source
281 pub fn schema_source(&self) -> SchemaSource {
282 self.schema_source
283 }
284
285 /// Set the [`SchemaAdapterFactory`] for this [`ListingTable`]
286 ///
287 /// The schema adapter factory is used to create schema adapters that can
288 /// handle schema evolution and type conversions when reading files with
289 /// different schemas than the table schema.
290 ///
291 /// # Example: Adding Schema Evolution Support
292 /// ```rust
293 /// # use std::sync::Arc;
294 /// # use datafusion_catalog_listing::{ListingTable, ListingTableConfig, ListingOptions};
295 /// # use datafusion_datasource::ListingTableUrl;
296 /// # use datafusion_datasource::schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapter};
297 /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
298 /// # use arrow::datatypes::{SchemaRef, Schema, Field, DataType};
299 /// # let table_path = ListingTableUrl::parse("file:///path/to/data").unwrap();
300 /// # let options = ListingOptions::new(Arc::new(ParquetFormat::default()));
301 /// # let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
302 /// # let config = ListingTableConfig::new(table_path).with_listing_options(options).with_schema(schema);
303 /// # let table = ListingTable::try_new(config).unwrap();
304 /// let table_with_evolution = table
305 /// .with_schema_adapter_factory(Arc::new(DefaultSchemaAdapterFactory));
306 /// ```
307 /// See [`ListingTableConfig::with_schema_adapter_factory`] for an example of custom SchemaAdapterFactory.
308 pub fn with_schema_adapter_factory(
309 self,
310 schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
311 ) -> Self {
312 Self {
313 schema_adapter_factory: Some(schema_adapter_factory),
314 ..self
315 }
316 }
317
318 /// Get the [`SchemaAdapterFactory`] for this table
319 pub fn schema_adapter_factory(&self) -> Option<&Arc<dyn SchemaAdapterFactory>> {
320 self.schema_adapter_factory.as_ref()
321 }
322
323 /// Creates a schema adapter for mapping between file and table schemas
324 ///
325 /// Uses the configured schema adapter factory if available, otherwise falls back
326 /// to the default implementation.
327 fn create_schema_adapter(&self) -> Box<dyn SchemaAdapter> {
328 let table_schema = self.schema();
329 match &self.schema_adapter_factory {
330 Some(factory) => {
331 factory.create_with_projected_schema(Arc::clone(&table_schema))
332 }
333 None => DefaultSchemaAdapterFactory::from_schema(Arc::clone(&table_schema)),
334 }
335 }
336
337 /// Creates a file source and applies schema adapter factory if available
338 fn create_file_source_with_schema_adapter(
339 &self,
340 ) -> datafusion_common::Result<Arc<dyn FileSource>> {
341 let mut source = self.options.format.file_source();
342 // Apply schema adapter to source if available
343 //
344 // The source will use this SchemaAdapter to adapt data batches as they flow up the plan.
345 // Note: ListingTable also creates a SchemaAdapter in `scan()` but that is only used to adapt collected statistics.
346 if let Some(factory) = &self.schema_adapter_factory {
347 source = source.with_schema_adapter_factory(Arc::clone(factory))?;
348 }
349 Ok(source)
350 }
351
352 /// If file_sort_order is specified, creates the appropriate physical expressions
353 pub fn try_create_output_ordering(
354 &self,
355 execution_props: &ExecutionProps,
356 ) -> datafusion_common::Result<Vec<LexOrdering>> {
357 create_lex_ordering(
358 &self.table_schema,
359 &self.options.file_sort_order,
360 execution_props,
361 )
362 }
363}
364
365// Expressions can be used for partition pruning if they can be evaluated using
366// only the partition columns and there are partition columns.
367fn can_be_evaluated_for_partition_pruning(
368 partition_column_names: &[&str],
369 expr: &Expr,
370) -> bool {
371 !partition_column_names.is_empty()
372 && expr_applicable_for_cols(partition_column_names, expr)
373}
374
375#[async_trait]
376impl TableProvider for ListingTable {
377 fn as_any(&self) -> &dyn Any {
378 self
379 }
380
381 fn schema(&self) -> SchemaRef {
382 Arc::clone(&self.table_schema)
383 }
384
385 fn constraints(&self) -> Option<&Constraints> {
386 Some(&self.constraints)
387 }
388
389 fn table_type(&self) -> TableType {
390 TableType::Base
391 }
392
393 async fn scan(
394 &self,
395 state: &dyn Session,
396 projection: Option<&Vec<usize>>,
397 filters: &[Expr],
398 limit: Option<usize>,
399 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
400 let options = ScanArgs::default()
401 .with_projection(projection.map(|p| p.as_slice()))
402 .with_filters(Some(filters))
403 .with_limit(limit);
404 Ok(self.scan_with_args(state, options).await?.into_inner())
405 }
406
407 async fn scan_with_args<'a>(
408 &self,
409 state: &dyn Session,
410 args: ScanArgs<'a>,
411 ) -> datafusion_common::Result<ScanResult> {
412 let projection = args.projection().map(|p| p.to_vec());
413 let filters = args.filters().map(|f| f.to_vec()).unwrap_or_default();
414 let limit = args.limit();
415
416 // extract types of partition columns
417 let table_partition_cols = self
418 .options
419 .table_partition_cols
420 .iter()
421 .map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone()))
422 .collect::<datafusion_common::Result<Vec<_>>>()?;
423
424 let table_partition_col_names = table_partition_cols
425 .iter()
426 .map(|field| field.name().as_str())
427 .collect::<Vec<_>>();
428
429 // If the filters can be resolved using only partition cols, there is no need to
430 // pushdown it to TableScan, otherwise, `unhandled` pruning predicates will be generated
431 let (partition_filters, filters): (Vec<_>, Vec<_>) =
432 filters.iter().cloned().partition(|filter| {
433 can_be_evaluated_for_partition_pruning(&table_partition_col_names, filter)
434 });
435
436 // We should not limit the number of partitioned files to scan if there are filters and limit
437 // at the same time. This is because the limit should be applied after the filters are applied.
438 let statistic_file_limit = if filters.is_empty() { limit } else { None };
439
440 let (mut partitioned_file_lists, statistics) = self
441 .list_files_for_scan(state, &partition_filters, statistic_file_limit)
442 .await?;
443
444 // if no files need to be read, return an `EmptyExec`
445 if partitioned_file_lists.is_empty() {
446 let projected_schema = project_schema(&self.schema(), projection.as_ref())?;
447 return Ok(ScanResult::new(Arc::new(EmptyExec::new(projected_schema))));
448 }
449
450 let output_ordering = self.try_create_output_ordering(state.execution_props())?;
451 match state
452 .config_options()
453 .execution
454 .split_file_groups_by_statistics
455 .then(|| {
456 output_ordering.first().map(|output_ordering| {
457 FileScanConfig::split_groups_by_statistics_with_target_partitions(
458 &self.table_schema,
459 &partitioned_file_lists,
460 output_ordering,
461 self.options.target_partitions,
462 )
463 })
464 })
465 .flatten()
466 {
467 Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"),
468 Some(Ok(new_groups)) => {
469 if new_groups.len() <= self.options.target_partitions {
470 partitioned_file_lists = new_groups;
471 } else {
472 log::debug!("attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered")
473 }
474 }
475 None => {} // no ordering required
476 };
477
478 let Some(object_store_url) =
479 self.table_paths.first().map(ListingTableUrl::object_store)
480 else {
481 return Ok(ScanResult::new(Arc::new(EmptyExec::new(Arc::new(
482 Schema::empty(),
483 )))));
484 };
485
486 let file_source = self.create_file_source_with_schema_adapter()?;
487
488 // create the execution plan
489 let plan = self
490 .options
491 .format
492 .create_physical_plan(
493 state,
494 FileScanConfigBuilder::new(
495 object_store_url,
496 Arc::clone(&self.file_schema),
497 file_source,
498 )
499 .with_file_groups(partitioned_file_lists)
500 .with_constraints(self.constraints.clone())
501 .with_statistics(statistics)
502 .with_projection_indices(projection)
503 .with_limit(limit)
504 .with_output_ordering(output_ordering)
505 .with_table_partition_cols(table_partition_cols)
506 .with_expr_adapter(self.expr_adapter_factory.clone())
507 .build(),
508 )
509 .await?;
510
511 Ok(ScanResult::new(plan))
512 }
513
514 fn supports_filters_pushdown(
515 &self,
516 filters: &[&Expr],
517 ) -> datafusion_common::Result<Vec<TableProviderFilterPushDown>> {
518 let partition_column_names = self
519 .options
520 .table_partition_cols
521 .iter()
522 .map(|col| col.0.as_str())
523 .collect::<Vec<_>>();
524 filters
525 .iter()
526 .map(|filter| {
527 if can_be_evaluated_for_partition_pruning(&partition_column_names, filter)
528 {
529 // if filter can be handled by partition pruning, it is exact
530 return Ok(TableProviderFilterPushDown::Exact);
531 }
532
533 Ok(TableProviderFilterPushDown::Inexact)
534 })
535 .collect()
536 }
537
538 fn get_table_definition(&self) -> Option<&str> {
539 self.definition.as_deref()
540 }
541
542 async fn insert_into(
543 &self,
544 state: &dyn Session,
545 input: Arc<dyn ExecutionPlan>,
546 insert_op: InsertOp,
547 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
548 // Check that the schema of the plan matches the schema of this table.
549 self.schema()
550 .logically_equivalent_names_and_types(&input.schema())?;
551
552 let table_path = &self.table_paths()[0];
553 if !table_path.is_collection() {
554 return plan_err!(
555 "Inserting into a ListingTable backed by a single file is not supported, URL is possibly missing a trailing `/`. \
556 To append to an existing file use StreamTable, e.g. by using CREATE UNBOUNDED EXTERNAL TABLE"
557 );
558 }
559
560 // Get the object store for the table path.
561 let store = state.runtime_env().object_store(table_path)?;
562
563 let file_list_stream = pruned_partition_list(
564 state,
565 store.as_ref(),
566 table_path,
567 &[],
568 &self.options.file_extension,
569 &self.options.table_partition_cols,
570 )
571 .await?;
572
573 let file_group = file_list_stream.try_collect::<Vec<_>>().await?.into();
574 let keep_partition_by_columns =
575 state.config_options().execution.keep_partition_by_columns;
576
577 // Sink related option, apart from format
578 let config = FileSinkConfig {
579 original_url: String::default(),
580 object_store_url: self.table_paths()[0].object_store(),
581 table_paths: self.table_paths().clone(),
582 file_group,
583 output_schema: self.schema(),
584 table_partition_cols: self.options.table_partition_cols.clone(),
585 insert_op,
586 keep_partition_by_columns,
587 file_extension: self.options().format.get_ext(),
588 };
589
590 let orderings = self.try_create_output_ordering(state.execution_props())?;
591 // It is sufficient to pass only one of the equivalent orderings:
592 let order_requirements = orderings.into_iter().next().map(Into::into);
593
594 self.options()
595 .format
596 .create_writer_physical_plan(input, state, config, order_requirements)
597 .await
598 }
599
600 fn get_column_default(&self, column: &str) -> Option<&Expr> {
601 self.column_defaults.get(column)
602 }
603}
604
605impl ListingTable {
606 /// Get the list of files for a scan as well as the file level statistics.
607 /// The list is grouped to let the execution plan know how the files should
608 /// be distributed to different threads / executors.
609 pub async fn list_files_for_scan<'a>(
610 &'a self,
611 ctx: &'a dyn Session,
612 filters: &'a [Expr],
613 limit: Option<usize>,
614 ) -> datafusion_common::Result<(Vec<FileGroup>, Statistics)> {
615 let store = if let Some(url) = self.table_paths.first() {
616 ctx.runtime_env().object_store(url)?
617 } else {
618 return Ok((vec![], Statistics::new_unknown(&self.file_schema)));
619 };
620 // list files (with partitions)
621 let file_list = future::try_join_all(self.table_paths.iter().map(|table_path| {
622 pruned_partition_list(
623 ctx,
624 store.as_ref(),
625 table_path,
626 filters,
627 &self.options.file_extension,
628 &self.options.table_partition_cols,
629 )
630 }))
631 .await?;
632 let meta_fetch_concurrency =
633 ctx.config_options().execution.meta_fetch_concurrency;
634 let file_list = stream::iter(file_list).flatten_unordered(meta_fetch_concurrency);
635 // collect the statistics if required by the config
636 let files = file_list
637 .map(|part_file| async {
638 let part_file = part_file?;
639 let statistics = if self.options.collect_stat {
640 self.do_collect_statistics(ctx, &store, &part_file).await?
641 } else {
642 Arc::new(Statistics::new_unknown(&self.file_schema))
643 };
644 Ok(part_file.with_statistics(statistics))
645 })
646 .boxed()
647 .buffer_unordered(ctx.config_options().execution.meta_fetch_concurrency);
648
649 let (file_group, inexact_stats) =
650 get_files_with_limit(files, limit, self.options.collect_stat).await?;
651
652 let file_groups = file_group.split_files(self.options.target_partitions);
653 let (mut file_groups, mut stats) = compute_all_files_statistics(
654 file_groups,
655 self.schema(),
656 self.options.collect_stat,
657 inexact_stats,
658 )?;
659
660 let schema_adapter = self.create_schema_adapter();
661 let (schema_mapper, _) = schema_adapter.map_schema(self.file_schema.as_ref())?;
662
663 stats.column_statistics =
664 schema_mapper.map_column_statistics(&stats.column_statistics)?;
665 file_groups.iter_mut().try_for_each(|file_group| {
666 if let Some(stat) = file_group.statistics_mut() {
667 stat.column_statistics =
668 schema_mapper.map_column_statistics(&stat.column_statistics)?;
669 }
670 Ok::<_, DataFusionError>(())
671 })?;
672 Ok((file_groups, stats))
673 }
674
675 /// Collects statistics for a given partitioned file.
676 ///
677 /// This method first checks if the statistics for the given file are already cached.
678 /// If they are, it returns the cached statistics.
679 /// If they are not, it infers the statistics from the file and stores them in the cache.
680 async fn do_collect_statistics(
681 &self,
682 ctx: &dyn Session,
683 store: &Arc<dyn ObjectStore>,
684 part_file: &PartitionedFile,
685 ) -> datafusion_common::Result<Arc<Statistics>> {
686 match self
687 .collected_statistics
688 .get_with_extra(&part_file.object_meta.location, &part_file.object_meta)
689 {
690 Some(statistics) => Ok(statistics),
691 None => {
692 let statistics = self
693 .options
694 .format
695 .infer_stats(
696 ctx,
697 store,
698 Arc::clone(&self.file_schema),
699 &part_file.object_meta,
700 )
701 .await?;
702 let statistics = Arc::new(statistics);
703 self.collected_statistics.put_with_extra(
704 &part_file.object_meta.location,
705 Arc::clone(&statistics),
706 &part_file.object_meta,
707 );
708 Ok(statistics)
709 }
710 }
711 }
712}
713
714/// Processes a stream of partitioned files and returns a `FileGroup` containing the files.
715///
716/// This function collects files from the provided stream until either:
717/// 1. The stream is exhausted
718/// 2. The accumulated number of rows exceeds the provided `limit` (if specified)
719///
720/// # Arguments
721/// * `files` - A stream of `Result<PartitionedFile>` items to process
722/// * `limit` - An optional row count limit. If provided, the function will stop collecting files
723/// once the accumulated number of rows exceeds this limit
724/// * `collect_stats` - Whether to collect and accumulate statistics from the files
725///
726/// # Returns
727/// A `Result` containing a `FileGroup` with the collected files
728/// and a boolean indicating whether the statistics are inexact.
729///
730/// # Note
731/// The function will continue processing files if statistics are not available or if the
732/// limit is not provided. If `collect_stats` is false, statistics won't be accumulated
733/// but files will still be collected.
734async fn get_files_with_limit(
735 files: impl Stream<Item = datafusion_common::Result<PartitionedFile>>,
736 limit: Option<usize>,
737 collect_stats: bool,
738) -> datafusion_common::Result<(FileGroup, bool)> {
739 let mut file_group = FileGroup::default();
740 // Fusing the stream allows us to call next safely even once it is finished.
741 let mut all_files = Box::pin(files.fuse());
742 enum ProcessingState {
743 ReadingFiles,
744 ReachedLimit,
745 }
746
747 let mut state = ProcessingState::ReadingFiles;
748 let mut num_rows = Precision::Absent;
749
750 while let Some(file_result) = all_files.next().await {
751 // Early exit if we've already reached our limit
752 if matches!(state, ProcessingState::ReachedLimit) {
753 break;
754 }
755
756 let file = file_result?;
757
758 // Update file statistics regardless of state
759 if collect_stats {
760 if let Some(file_stats) = &file.statistics {
761 num_rows = if file_group.is_empty() {
762 // For the first file, just take its row count
763 file_stats.num_rows
764 } else {
765 // For subsequent files, accumulate the counts
766 num_rows.add(&file_stats.num_rows)
767 };
768 }
769 }
770
771 // Always add the file to our group
772 file_group.push(file);
773
774 // Check if we've hit the limit (if one was specified)
775 if let Some(limit) = limit {
776 if let Precision::Exact(row_count) = num_rows {
777 if row_count > limit {
778 state = ProcessingState::ReachedLimit;
779 }
780 }
781 }
782 }
783 // If we still have files in the stream, it means that the limit kicked
784 // in, and the statistic could have been different had we processed the
785 // files in a different order.
786 let inexact_stats = all_files.next().await.is_some();
787 Ok((file_group, inexact_stats))
788}