datafusion/datasource/listing/
table.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::execution::SessionState;
19use async_trait::async_trait;
20use datafusion_catalog_listing::{ListingOptions, ListingTableConfig};
21use datafusion_common::{config_datafusion_err, internal_datafusion_err};
22use datafusion_session::Session;
23use futures::StreamExt;
24use std::collections::HashMap;
25
26/// Extension trait for [`ListingTableConfig`] that supports inferring schemas
27///
28/// This trait exists because the following inference methods only
29/// work for [`SessionState`] implementations of [`Session`].
30/// See [`ListingTableConfig`] for the remaining inference methods.
31#[async_trait]
32pub trait ListingTableConfigExt {
33    /// Infer `ListingOptions` based on `table_path` and file suffix.
34    ///
35    /// The format is inferred based on the first `table_path`.
36    async fn infer_options(
37        self,
38        state: &dyn Session,
39    ) -> datafusion_common::Result<ListingTableConfig>;
40
41    /// Convenience method to call both [`Self::infer_options`] and [`ListingTableConfig::infer_schema`]
42    async fn infer(
43        self,
44        state: &dyn Session,
45    ) -> datafusion_common::Result<ListingTableConfig>;
46}
47
48#[async_trait]
49impl ListingTableConfigExt for ListingTableConfig {
50    async fn infer_options(
51        self,
52        state: &dyn Session,
53    ) -> datafusion_common::Result<ListingTableConfig> {
54        let store = if let Some(url) = self.table_paths.first() {
55            state.runtime_env().object_store(url)?
56        } else {
57            return Ok(self);
58        };
59
60        let file = self
61            .table_paths
62            .first()
63            .unwrap()
64            .list_all_files(state, store.as_ref(), "")
65            .await?
66            .next()
67            .await
68            .ok_or_else(|| internal_datafusion_err!("No files for table"))??;
69
70        let (file_extension, maybe_compression_type) =
71            ListingTableConfig::infer_file_extension_and_compression_type(
72                file.location.as_ref(),
73            )?;
74
75        let mut format_options = HashMap::new();
76        if let Some(ref compression_type) = maybe_compression_type {
77            format_options
78                .insert("format.compression".to_string(), compression_type.clone());
79        }
80        let state = state.as_any().downcast_ref::<SessionState>().unwrap();
81        let file_format = state
82            .get_file_format_factory(&file_extension)
83            .ok_or(config_datafusion_err!(
84                "No file_format found with extension {file_extension}"
85            ))?
86            .create(state, &format_options)?;
87
88        let listing_file_extension =
89            if let Some(compression_type) = maybe_compression_type {
90                format!("{}.{}", &file_extension, &compression_type)
91            } else {
92                file_extension
93            };
94
95        let listing_options = ListingOptions::new(file_format)
96            .with_file_extension(listing_file_extension)
97            .with_target_partitions(state.config().target_partitions())
98            .with_collect_stat(state.config().collect_statistics());
99
100        Ok(self.with_listing_options(listing_options))
101    }
102
103    async fn infer(self, state: &dyn Session) -> datafusion_common::Result<Self> {
104        self.infer_options(state).await?.infer_schema(state).await
105    }
106}
107
108#[cfg(test)]
109mod tests {
110    #[cfg(feature = "parquet")]
111    use crate::datasource::file_format::parquet::ParquetFormat;
112    use crate::datasource::listing::table::ListingTableConfigExt;
113    use crate::prelude::*;
114    use crate::{
115        datasource::{
116            file_format::csv::CsvFormat, file_format::json::JsonFormat,
117            provider_as_source, DefaultTableSource, MemTable,
118        },
119        execution::options::ArrowReadOptions,
120        test::{
121            columns, object_store::ensure_head_concurrency,
122            object_store::make_test_store_and_state, object_store::register_test_store,
123        },
124    };
125    use arrow::{compute::SortOptions, record_batch::RecordBatch};
126    use arrow_schema::{DataType, Field, Schema, SchemaRef};
127    use datafusion_catalog::TableProvider;
128    use datafusion_catalog_listing::{
129        ListingOptions, ListingTable, ListingTableConfig, SchemaSource,
130    };
131    use datafusion_common::{
132        assert_contains, plan_err,
133        stats::Precision,
134        test_util::{batches_to_string, datafusion_test_data},
135        ColumnStatistics, DataFusionError, Result, ScalarValue,
136    };
137    use datafusion_datasource::file_compression_type::FileCompressionType;
138    use datafusion_datasource::file_format::FileFormat;
139    use datafusion_datasource::schema_adapter::{
140        SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
141    };
142    use datafusion_datasource::ListingTableUrl;
143    use datafusion_expr::dml::InsertOp;
144    use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator};
145    use datafusion_physical_expr::expressions::binary;
146    use datafusion_physical_expr::PhysicalSortExpr;
147    use datafusion_physical_expr_common::sort_expr::LexOrdering;
148    use datafusion_physical_plan::empty::EmptyExec;
149    use datafusion_physical_plan::{collect, ExecutionPlanProperties};
150    use rstest::rstest;
151    use std::collections::HashMap;
152    use std::io::Write;
153    use std::sync::Arc;
154    use tempfile::TempDir;
155    use url::Url;
156
157    const DUMMY_NULL_COUNT: Precision<usize> = Precision::Exact(42);
158
159    /// Creates a test schema with standard field types used in tests
160    fn create_test_schema() -> SchemaRef {
161        Arc::new(Schema::new(vec![
162            Field::new("c1", DataType::Float32, true),
163            Field::new("c2", DataType::Float64, true),
164            Field::new("c3", DataType::Boolean, true),
165            Field::new("c4", DataType::Utf8, true),
166        ]))
167    }
168
169    /// Helper function to generate test file paths with given prefix, count, and optional start index
170    fn generate_test_files(prefix: &str, count: usize) -> Vec<String> {
171        generate_test_files_with_start(prefix, count, 0)
172    }
173
174    /// Helper function to generate test file paths with given prefix, count, and start index
175    fn generate_test_files_with_start(
176        prefix: &str,
177        count: usize,
178        start_index: usize,
179    ) -> Vec<String> {
180        (start_index..start_index + count)
181            .map(|i| format!("{prefix}/file{i}"))
182            .collect()
183    }
184
185    #[tokio::test]
186    async fn test_schema_source_tracking_comprehensive() -> Result<()> {
187        let ctx = SessionContext::new();
188        let testdata = datafusion_test_data();
189        let filename = format!("{testdata}/aggregate_simple.csv");
190        let table_path = ListingTableUrl::parse(filename)?;
191
192        // Test default schema source
193        let format = CsvFormat::default();
194        let options = ListingOptions::new(Arc::new(format));
195        let config =
196            ListingTableConfig::new(table_path.clone()).with_listing_options(options);
197        assert_eq!(config.schema_source(), SchemaSource::Unset);
198
199        // Test schema source after setting a schema explicitly
200        let provided_schema = create_test_schema();
201        let config_with_schema = config.clone().with_schema(provided_schema.clone());
202        assert_eq!(config_with_schema.schema_source(), SchemaSource::Specified);
203
204        // Test schema source after inferring schema
205        assert_eq!(config.schema_source(), SchemaSource::Unset);
206
207        let config_with_inferred = config.infer_schema(&ctx.state()).await?;
208        assert_eq!(config_with_inferred.schema_source(), SchemaSource::Inferred);
209
210        // Test schema preservation through operations
211        let config_with_schema_and_options = config_with_schema.clone();
212        assert_eq!(
213            config_with_schema_and_options.schema_source(),
214            SchemaSource::Specified
215        );
216
217        // Make sure inferred schema doesn't override specified schema
218        let config_with_schema_and_infer = config_with_schema_and_options
219            .clone()
220            .infer(&ctx.state())
221            .await?;
222        assert_eq!(
223            config_with_schema_and_infer.schema_source(),
224            SchemaSource::Specified
225        );
226
227        // Verify sources in actual ListingTable objects
228        let table_specified = ListingTable::try_new(config_with_schema_and_options)?;
229        assert_eq!(table_specified.schema_source(), SchemaSource::Specified);
230
231        let table_inferred = ListingTable::try_new(config_with_inferred)?;
232        assert_eq!(table_inferred.schema_source(), SchemaSource::Inferred);
233
234        Ok(())
235    }
236
237    #[tokio::test]
238    async fn read_single_file() -> Result<()> {
239        let ctx = SessionContext::new_with_config(
240            SessionConfig::new().with_collect_statistics(true),
241        );
242
243        let table = load_table(&ctx, "alltypes_plain.parquet").await?;
244        let projection = None;
245        let exec = table
246            .scan(&ctx.state(), projection, &[], None)
247            .await
248            .expect("Scan table");
249
250        assert_eq!(exec.children().len(), 0);
251        assert_eq!(exec.output_partitioning().partition_count(), 1);
252
253        // test metadata
254        assert_eq!(
255            exec.partition_statistics(None)?.num_rows,
256            Precision::Exact(8)
257        );
258        assert_eq!(
259            exec.partition_statistics(None)?.total_byte_size,
260            Precision::Exact(671)
261        );
262
263        Ok(())
264    }
265
266    #[cfg(feature = "parquet")]
267    #[tokio::test]
268    async fn test_try_create_output_ordering() {
269        let testdata = crate::test_util::parquet_test_data();
270        let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
271        let table_path = ListingTableUrl::parse(filename).unwrap();
272
273        let ctx = SessionContext::new();
274        let state = ctx.state();
275        let options = ListingOptions::new(Arc::new(ParquetFormat::default()));
276        let schema = options.infer_schema(&state, &table_path).await.unwrap();
277
278        use crate::datasource::file_format::parquet::ParquetFormat;
279        use datafusion_physical_plan::expressions::col as physical_col;
280        use datafusion_physical_plan::expressions::lit as physical_lit;
281        use std::ops::Add;
282
283        // (file_sort_order, expected_result)
284        let cases = vec![
285            (
286                vec![],
287                Ok::<Vec<LexOrdering>, DataFusionError>(Vec::<LexOrdering>::new()),
288            ),
289            // sort expr, but non column
290            (
291                vec![vec![col("int_col").add(lit(1)).sort(true, true)]],
292                Ok(vec![[PhysicalSortExpr {
293                    expr: binary(
294                        physical_col("int_col", &schema).unwrap(),
295                        Operator::Plus,
296                        physical_lit(1),
297                        &schema,
298                    )
299                    .unwrap(),
300                    options: SortOptions {
301                        descending: false,
302                        nulls_first: true,
303                    },
304                }]
305                .into()]),
306            ),
307            // ok with one column
308            (
309                vec![vec![col("string_col").sort(true, false)]],
310                Ok(vec![[PhysicalSortExpr {
311                    expr: physical_col("string_col", &schema).unwrap(),
312                    options: SortOptions {
313                        descending: false,
314                        nulls_first: false,
315                    },
316                }]
317                .into()]),
318            ),
319            // ok with two columns, different options
320            (
321                vec![vec![
322                    col("string_col").sort(true, false),
323                    col("int_col").sort(false, true),
324                ]],
325                Ok(vec![[
326                    PhysicalSortExpr::new_default(
327                        physical_col("string_col", &schema).unwrap(),
328                    )
329                    .asc()
330                    .nulls_last(),
331                    PhysicalSortExpr::new_default(
332                        physical_col("int_col", &schema).unwrap(),
333                    )
334                    .desc()
335                    .nulls_first(),
336                ]
337                .into()]),
338            ),
339        ];
340
341        for (file_sort_order, expected_result) in cases {
342            let options = options.clone().with_file_sort_order(file_sort_order);
343
344            let config = ListingTableConfig::new(table_path.clone())
345                .with_listing_options(options)
346                .with_schema(schema.clone());
347
348            let table =
349                ListingTable::try_new(config.clone()).expect("Creating the table");
350            let ordering_result =
351                table.try_create_output_ordering(state.execution_props());
352
353            match (expected_result, ordering_result) {
354                (Ok(expected), Ok(result)) => {
355                    assert_eq!(expected, result);
356                }
357                (Err(expected), Err(result)) => {
358                    // can't compare the DataFusionError directly
359                    let result = result.to_string();
360                    let expected = expected.to_string();
361                    assert_contains!(result.to_string(), expected);
362                }
363                (expected_result, ordering_result) => {
364                    panic!(
365                        "expected: {expected_result:#?}\n\nactual:{ordering_result:#?}"
366                    );
367                }
368            }
369        }
370    }
371
372    #[tokio::test]
373    async fn read_empty_table() -> Result<()> {
374        let ctx = SessionContext::new();
375        let path = String::from("table/p1=v1/file.json");
376        register_test_store(&ctx, &[(&path, 100)]);
377
378        let format = JsonFormat::default();
379        let ext = format.get_ext();
380
381        let opt = ListingOptions::new(Arc::new(format))
382            .with_file_extension(ext)
383            .with_table_partition_cols(vec![(String::from("p1"), DataType::Utf8)])
384            .with_target_partitions(4);
385
386        let table_path = ListingTableUrl::parse("test:///table/")?;
387        let file_schema =
388            Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)]));
389        let config = ListingTableConfig::new(table_path)
390            .with_listing_options(opt)
391            .with_schema(file_schema);
392        let table = ListingTable::try_new(config)?;
393
394        assert_eq!(
395            columns(&table.schema()),
396            vec!["a".to_owned(), "p1".to_owned()]
397        );
398
399        // this will filter out the only file in the store
400        let filter = Expr::not_eq(col("p1"), lit("v1"));
401
402        let scan = table
403            .scan(&ctx.state(), None, &[filter], None)
404            .await
405            .expect("Empty execution plan");
406
407        assert!(scan.as_any().is::<EmptyExec>());
408        assert_eq!(
409            columns(&scan.schema()),
410            vec!["a".to_owned(), "p1".to_owned()]
411        );
412
413        Ok(())
414    }
415
416    async fn load_table(
417        ctx: &SessionContext,
418        name: &str,
419    ) -> Result<Arc<dyn TableProvider>> {
420        let testdata = crate::test_util::parquet_test_data();
421        let filename = format!("{testdata}/{name}");
422        let table_path = ListingTableUrl::parse(filename)?;
423
424        let config = ListingTableConfig::new(table_path)
425            .infer(&ctx.state())
426            .await?;
427        let table = ListingTable::try_new(config)?;
428        Ok(Arc::new(table))
429    }
430
431    /// Check that the files listed by the table match the specified `output_partitioning`
432    /// when the object store contains `files`.
433    async fn assert_list_files_for_scan_grouping(
434        files: &[&str],
435        table_prefix: &str,
436        target_partitions: usize,
437        output_partitioning: usize,
438        file_ext: Option<&str>,
439    ) -> Result<()> {
440        let ctx = SessionContext::new();
441        register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::<Vec<_>>());
442
443        let opt = ListingOptions::new(Arc::new(JsonFormat::default()))
444            .with_file_extension_opt(file_ext)
445            .with_target_partitions(target_partitions);
446
447        let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
448
449        let table_path = ListingTableUrl::parse(table_prefix)?;
450        let config = ListingTableConfig::new(table_path)
451            .with_listing_options(opt)
452            .with_schema(Arc::new(schema));
453
454        let table = ListingTable::try_new(config)?;
455
456        let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
457
458        assert_eq!(file_list.len(), output_partitioning);
459
460        Ok(())
461    }
462
463    /// Check that the files listed by the table match the specified `output_partitioning`
464    /// when the object store contains `files`.
465    async fn assert_list_files_for_multi_paths(
466        files: &[&str],
467        table_prefix: &[&str],
468        target_partitions: usize,
469        output_partitioning: usize,
470        file_ext: Option<&str>,
471    ) -> Result<()> {
472        let ctx = SessionContext::new();
473        register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::<Vec<_>>());
474
475        let opt = ListingOptions::new(Arc::new(JsonFormat::default()))
476            .with_file_extension_opt(file_ext)
477            .with_target_partitions(target_partitions);
478
479        let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
480
481        let table_paths = table_prefix
482            .iter()
483            .map(|t| ListingTableUrl::parse(t).unwrap())
484            .collect();
485        let config = ListingTableConfig::new_with_multi_paths(table_paths)
486            .with_listing_options(opt)
487            .with_schema(Arc::new(schema));
488
489        let table = ListingTable::try_new(config)?;
490
491        let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
492
493        assert_eq!(file_list.len(), output_partitioning);
494
495        Ok(())
496    }
497
498    /// Check that the files listed by the table match the specified `output_partitioning`
499    /// when the object store contains `files`, and validate that file metadata is fetched
500    /// concurrently
501    async fn assert_list_files_for_exact_paths(
502        files: &[&str],
503        target_partitions: usize,
504        output_partitioning: usize,
505        file_ext: Option<&str>,
506    ) -> Result<()> {
507        let ctx = SessionContext::new();
508        let (store, _) = make_test_store_and_state(
509            &files.iter().map(|f| (*f, 10)).collect::<Vec<_>>(),
510        );
511
512        let meta_fetch_concurrency = ctx
513            .state()
514            .config_options()
515            .execution
516            .meta_fetch_concurrency;
517        let expected_concurrency = files.len().min(meta_fetch_concurrency);
518        let head_concurrency_store = ensure_head_concurrency(store, expected_concurrency);
519
520        let url = Url::parse("test://").unwrap();
521        ctx.register_object_store(&url, head_concurrency_store.clone());
522
523        let format = JsonFormat::default();
524
525        let opt = ListingOptions::new(Arc::new(format))
526            .with_file_extension_opt(file_ext)
527            .with_target_partitions(target_partitions);
528
529        let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
530
531        let table_paths = files
532            .iter()
533            .map(|t| ListingTableUrl::parse(format!("test:///{t}")).unwrap())
534            .collect();
535        let config = ListingTableConfig::new_with_multi_paths(table_paths)
536            .with_listing_options(opt)
537            .with_schema(Arc::new(schema));
538
539        let table = ListingTable::try_new(config)?;
540
541        let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
542
543        assert_eq!(file_list.len(), output_partitioning);
544
545        Ok(())
546    }
547
548    #[tokio::test]
549    async fn test_insert_into_sql_csv_defaults() -> Result<()> {
550        helper_test_insert_into_sql("csv", FileCompressionType::UNCOMPRESSED, "", None)
551            .await?;
552        Ok(())
553    }
554
555    #[tokio::test]
556    async fn test_insert_into_sql_csv_defaults_header_row() -> Result<()> {
557        helper_test_insert_into_sql(
558            "csv",
559            FileCompressionType::UNCOMPRESSED,
560            "",
561            Some(HashMap::from([("has_header".into(), "true".into())])),
562        )
563        .await?;
564        Ok(())
565    }
566
567    #[tokio::test]
568    async fn test_insert_into_sql_json_defaults() -> Result<()> {
569        helper_test_insert_into_sql("json", FileCompressionType::UNCOMPRESSED, "", None)
570            .await?;
571        Ok(())
572    }
573
574    #[tokio::test]
575    async fn test_insert_into_sql_parquet_defaults() -> Result<()> {
576        helper_test_insert_into_sql(
577            "parquet",
578            FileCompressionType::UNCOMPRESSED,
579            "",
580            None,
581        )
582        .await?;
583        Ok(())
584    }
585
586    #[tokio::test]
587    async fn test_insert_into_sql_parquet_session_overrides() -> Result<()> {
588        let mut config_map: HashMap<String, String> = HashMap::new();
589        config_map.insert(
590            "datafusion.execution.parquet.compression".into(),
591            "zstd(5)".into(),
592        );
593        config_map.insert(
594            "datafusion.execution.parquet.dictionary_enabled".into(),
595            "false".into(),
596        );
597        config_map.insert(
598            "datafusion.execution.parquet.dictionary_page_size_limit".into(),
599            "100".into(),
600        );
601        config_map.insert(
602            "datafusion.execution.parquet.statistics_enabled".into(),
603            "none".into(),
604        );
605        config_map.insert(
606            "datafusion.execution.parquet.max_statistics_size".into(),
607            "10".into(),
608        );
609        config_map.insert(
610            "datafusion.execution.parquet.max_row_group_size".into(),
611            "5".into(),
612        );
613        config_map.insert(
614            "datafusion.execution.parquet.created_by".into(),
615            "datafusion test".into(),
616        );
617        config_map.insert(
618            "datafusion.execution.parquet.column_index_truncate_length".into(),
619            "50".into(),
620        );
621        config_map.insert(
622            "datafusion.execution.parquet.data_page_row_count_limit".into(),
623            "50".into(),
624        );
625        config_map.insert(
626            "datafusion.execution.parquet.bloom_filter_on_write".into(),
627            "true".into(),
628        );
629        config_map.insert(
630            "datafusion.execution.parquet.bloom_filter_fpp".into(),
631            "0.01".into(),
632        );
633        config_map.insert(
634            "datafusion.execution.parquet.bloom_filter_ndv".into(),
635            "1000".into(),
636        );
637        config_map.insert(
638            "datafusion.execution.parquet.writer_version".into(),
639            "2.0".into(),
640        );
641        config_map.insert(
642            "datafusion.execution.parquet.write_batch_size".into(),
643            "5".into(),
644        );
645        helper_test_insert_into_sql(
646            "parquet",
647            FileCompressionType::UNCOMPRESSED,
648            "",
649            Some(config_map),
650        )
651        .await?;
652        Ok(())
653    }
654
655    #[tokio::test]
656    async fn test_insert_into_append_new_parquet_files_session_overrides() -> Result<()> {
657        let mut config_map: HashMap<String, String> = HashMap::new();
658        config_map.insert(
659            "datafusion.execution.soft_max_rows_per_output_file".into(),
660            "10".into(),
661        );
662        config_map.insert(
663            "datafusion.execution.parquet.compression".into(),
664            "zstd(5)".into(),
665        );
666        config_map.insert(
667            "datafusion.execution.parquet.dictionary_enabled".into(),
668            "false".into(),
669        );
670        config_map.insert(
671            "datafusion.execution.parquet.dictionary_page_size_limit".into(),
672            "100".into(),
673        );
674        config_map.insert(
675            "datafusion.execution.parquet.statistics_enabled".into(),
676            "none".into(),
677        );
678        config_map.insert(
679            "datafusion.execution.parquet.max_statistics_size".into(),
680            "10".into(),
681        );
682        config_map.insert(
683            "datafusion.execution.parquet.max_row_group_size".into(),
684            "5".into(),
685        );
686        config_map.insert(
687            "datafusion.execution.parquet.created_by".into(),
688            "datafusion test".into(),
689        );
690        config_map.insert(
691            "datafusion.execution.parquet.column_index_truncate_length".into(),
692            "50".into(),
693        );
694        config_map.insert(
695            "datafusion.execution.parquet.data_page_row_count_limit".into(),
696            "50".into(),
697        );
698        config_map.insert(
699            "datafusion.execution.parquet.encoding".into(),
700            "delta_binary_packed".into(),
701        );
702        config_map.insert(
703            "datafusion.execution.parquet.bloom_filter_on_write".into(),
704            "true".into(),
705        );
706        config_map.insert(
707            "datafusion.execution.parquet.bloom_filter_fpp".into(),
708            "0.01".into(),
709        );
710        config_map.insert(
711            "datafusion.execution.parquet.bloom_filter_ndv".into(),
712            "1000".into(),
713        );
714        config_map.insert(
715            "datafusion.execution.parquet.writer_version".into(),
716            "2.0".into(),
717        );
718        config_map.insert(
719            "datafusion.execution.parquet.write_batch_size".into(),
720            "5".into(),
721        );
722        config_map.insert("datafusion.execution.batch_size".into(), "10".into());
723        helper_test_append_new_files_to_table(
724            ParquetFormat::default().get_ext(),
725            FileCompressionType::UNCOMPRESSED,
726            Some(config_map),
727            2,
728        )
729        .await?;
730        Ok(())
731    }
732
733    #[tokio::test]
734    async fn test_insert_into_append_new_parquet_files_invalid_session_fails(
735    ) -> Result<()> {
736        let mut config_map: HashMap<String, String> = HashMap::new();
737        config_map.insert(
738            "datafusion.execution.parquet.compression".into(),
739            "zstd".into(),
740        );
741        let e = helper_test_append_new_files_to_table(
742            ParquetFormat::default().get_ext(),
743            FileCompressionType::UNCOMPRESSED,
744            Some(config_map),
745            2,
746        )
747        .await
748        .expect_err("Example should fail!");
749        assert_eq!(e.strip_backtrace(), "Invalid or Unsupported Configuration: zstd compression requires specifying a level such as zstd(4)");
750
751        Ok(())
752    }
753
754    async fn helper_test_append_new_files_to_table(
755        file_type_ext: String,
756        file_compression_type: FileCompressionType,
757        session_config_map: Option<HashMap<String, String>>,
758        expected_n_files_per_insert: usize,
759    ) -> Result<()> {
760        // Create the initial context, schema, and batch.
761        let session_ctx = match session_config_map {
762            Some(cfg) => {
763                let config = SessionConfig::from_string_hash_map(&cfg)?;
764                SessionContext::new_with_config(config)
765            }
766            None => SessionContext::new(),
767        };
768
769        // Create a new schema with one field called "a" of type Int32
770        let schema = Arc::new(Schema::new(vec![Field::new(
771            "column1",
772            DataType::Int32,
773            false,
774        )]));
775
776        let filter_predicate = Expr::BinaryExpr(BinaryExpr::new(
777            Box::new(Expr::Column("column1".into())),
778            Operator::GtEq,
779            Box::new(Expr::Literal(ScalarValue::Int32(Some(0)), None)),
780        ));
781
782        // Create a new batch of data to insert into the table
783        let batch = RecordBatch::try_new(
784            schema.clone(),
785            vec![Arc::new(arrow::array::Int32Array::from(vec![
786                1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
787            ]))],
788        )?;
789
790        // Register appropriate table depending on file_type we want to test
791        let tmp_dir = TempDir::new()?;
792        match file_type_ext.as_str() {
793            "csv" => {
794                session_ctx
795                    .register_csv(
796                        "t",
797                        tmp_dir.path().to_str().unwrap(),
798                        CsvReadOptions::new()
799                            .schema(schema.as_ref())
800                            .file_compression_type(file_compression_type),
801                    )
802                    .await?;
803            }
804            "json" => {
805                session_ctx
806                    .register_json(
807                        "t",
808                        tmp_dir.path().to_str().unwrap(),
809                        NdJsonReadOptions::default()
810                            .schema(schema.as_ref())
811                            .file_compression_type(file_compression_type),
812                    )
813                    .await?;
814            }
815            #[cfg(feature = "parquet")]
816            "parquet" => {
817                session_ctx
818                    .register_parquet(
819                        "t",
820                        tmp_dir.path().to_str().unwrap(),
821                        ParquetReadOptions::default().schema(schema.as_ref()),
822                    )
823                    .await?;
824            }
825            #[cfg(feature = "avro")]
826            "avro" => {
827                session_ctx
828                    .register_avro(
829                        "t",
830                        tmp_dir.path().to_str().unwrap(),
831                        AvroReadOptions::default().schema(schema.as_ref()),
832                    )
833                    .await?;
834            }
835            "arrow" => {
836                session_ctx
837                    .register_arrow(
838                        "t",
839                        tmp_dir.path().to_str().unwrap(),
840                        ArrowReadOptions::default().schema(schema.as_ref()),
841                    )
842                    .await?;
843            }
844            _ => panic!("Unrecognized file extension {file_type_ext}"),
845        }
846
847        // Create and register the source table with the provided schema and inserted data
848        let source_table = Arc::new(MemTable::try_new(
849            schema.clone(),
850            vec![vec![batch.clone(), batch.clone()]],
851        )?);
852        session_ctx.register_table("source", source_table.clone())?;
853        // Convert the source table into a provider so that it can be used in a query
854        let source = provider_as_source(source_table);
855        let target = session_ctx.table_provider("t").await?;
856        let target = Arc::new(DefaultTableSource::new(target));
857        // Create a table scan logical plan to read from the source table
858        let scan_plan = LogicalPlanBuilder::scan("source", source, None)?
859            .filter(filter_predicate)?
860            .build()?;
861        // Since logical plan contains a filter, increasing parallelism is helpful.
862        // Therefore, we will have 8 partitions in the final plan.
863        // Create an insert plan to insert the source data into the initial table
864        let insert_into_table =
865            LogicalPlanBuilder::insert_into(scan_plan, "t", target, InsertOp::Append)?
866                .build()?;
867        // Create a physical plan from the insert plan
868        let plan = session_ctx
869            .state()
870            .create_physical_plan(&insert_into_table)
871            .await?;
872        // Execute the physical plan and collect the results
873        let res = collect(plan, session_ctx.task_ctx()).await?;
874        // Insert returns the number of rows written, in our case this would be 6.
875
876        insta::allow_duplicates! {insta::assert_snapshot!(batches_to_string(&res),@r###"
877            +-------+
878            | count |
879            +-------+
880            | 20    |
881            +-------+
882        "###);}
883
884        // Read the records in the table
885        let batches = session_ctx
886            .sql("select count(*) as count from t")
887            .await?
888            .collect()
889            .await?;
890
891        insta::allow_duplicates! {insta::assert_snapshot!(batches_to_string(&batches),@r###"
892            +-------+
893            | count |
894            +-------+
895            | 20    |
896            +-------+
897        "###);}
898
899        // Assert that `target_partition_number` many files were added to the table.
900        let num_files = tmp_dir.path().read_dir()?.count();
901        assert_eq!(num_files, expected_n_files_per_insert);
902
903        // Create a physical plan from the insert plan
904        let plan = session_ctx
905            .state()
906            .create_physical_plan(&insert_into_table)
907            .await?;
908
909        // Again, execute the physical plan and collect the results
910        let res = collect(plan, session_ctx.task_ctx()).await?;
911
912        insta::allow_duplicates! {insta::assert_snapshot!(batches_to_string(&res),@r###"
913            +-------+
914            | count |
915            +-------+
916            | 20    |
917            +-------+
918        "###);}
919
920        // Read the contents of the table
921        let batches = session_ctx
922            .sql("select count(*) AS count from t")
923            .await?
924            .collect()
925            .await?;
926
927        insta::allow_duplicates! {insta::assert_snapshot!(batches_to_string(&batches),@r###"
928            +-------+
929            | count |
930            +-------+
931            | 40    |
932            +-------+
933        "###);}
934
935        // Assert that another `target_partition_number` many files were added to the table.
936        let num_files = tmp_dir.path().read_dir()?.count();
937        assert_eq!(num_files, expected_n_files_per_insert * 2);
938
939        // Return Ok if the function
940        Ok(())
941    }
942
943    /// tests insert into with end to end sql
944    /// create external table + insert into statements
945    async fn helper_test_insert_into_sql(
946        file_type: &str,
947        // TODO test with create statement options such as compression
948        _file_compression_type: FileCompressionType,
949        external_table_options: &str,
950        session_config_map: Option<HashMap<String, String>>,
951    ) -> Result<()> {
952        // Create the initial context
953        let session_ctx = match session_config_map {
954            Some(cfg) => {
955                let config = SessionConfig::from_string_hash_map(&cfg)?;
956                SessionContext::new_with_config(config)
957            }
958            None => SessionContext::new(),
959        };
960
961        // create table
962        let tmp_dir = TempDir::new()?;
963        let str_path = tmp_dir
964            .path()
965            .to_str()
966            .expect("Temp path should convert to &str");
967        session_ctx
968            .sql(&format!(
969                "create external table foo(a varchar, b varchar, c int) \
970                        stored as {file_type} \
971                        location '{str_path}' \
972                        {external_table_options}"
973            ))
974            .await?
975            .collect()
976            .await?;
977
978        // insert data
979        session_ctx.sql("insert into foo values ('foo', 'bar', 1),('foo', 'bar', 2), ('foo', 'bar', 3)")
980            .await?
981            .collect()
982            .await?;
983
984        // check count
985        let batches = session_ctx
986            .sql("select * from foo")
987            .await?
988            .collect()
989            .await?;
990
991        insta::allow_duplicates! {insta::assert_snapshot!(batches_to_string(&batches),@r###"
992            +-----+-----+---+
993            | a   | b   | c |
994            +-----+-----+---+
995            | foo | bar | 1 |
996            | foo | bar | 2 |
997            | foo | bar | 3 |
998            +-----+-----+---+
999        "###);}
1000
1001        Ok(())
1002    }
1003
1004    #[tokio::test]
1005    async fn test_infer_options_compressed_csv() -> Result<()> {
1006        let testdata = crate::test_util::arrow_test_data();
1007        let filename = format!("{testdata}/csv/aggregate_test_100.csv.gz");
1008        let table_path = ListingTableUrl::parse(filename)?;
1009
1010        let ctx = SessionContext::new();
1011
1012        let config = ListingTableConfig::new(table_path);
1013        let config_with_opts = config.infer_options(&ctx.state()).await?;
1014        let config_with_schema = config_with_opts.infer_schema(&ctx.state()).await?;
1015
1016        let schema = config_with_schema.file_schema.unwrap();
1017
1018        assert_eq!(schema.fields.len(), 13);
1019
1020        Ok(())
1021    }
1022
1023    #[tokio::test]
1024    async fn infer_preserves_provided_schema() -> Result<()> {
1025        let ctx = SessionContext::new();
1026
1027        let testdata = datafusion_test_data();
1028        let filename = format!("{testdata}/aggregate_simple.csv");
1029        let table_path = ListingTableUrl::parse(filename)?;
1030
1031        let provided_schema = create_test_schema();
1032
1033        let format = CsvFormat::default();
1034        let options = ListingOptions::new(Arc::new(format));
1035        let config = ListingTableConfig::new(table_path)
1036            .with_listing_options(options)
1037            .with_schema(Arc::clone(&provided_schema));
1038
1039        let config = config.infer(&ctx.state()).await?;
1040
1041        assert_eq!(*config.file_schema.unwrap(), *provided_schema);
1042
1043        Ok(())
1044    }
1045
1046    #[tokio::test]
1047    async fn test_listing_table_config_with_multiple_files_comprehensive() -> Result<()> {
1048        let ctx = SessionContext::new();
1049
1050        // Create test files with different schemas
1051        let tmp_dir = TempDir::new()?;
1052        let file_path1 = tmp_dir.path().join("file1.csv");
1053        let file_path2 = tmp_dir.path().join("file2.csv");
1054
1055        // File 1: c1,c2,c3
1056        let mut file1 = std::fs::File::create(&file_path1)?;
1057        writeln!(file1, "c1,c2,c3")?;
1058        writeln!(file1, "1,2,3")?;
1059        writeln!(file1, "4,5,6")?;
1060
1061        // File 2: c1,c2,c3,c4
1062        let mut file2 = std::fs::File::create(&file_path2)?;
1063        writeln!(file2, "c1,c2,c3,c4")?;
1064        writeln!(file2, "7,8,9,10")?;
1065        writeln!(file2, "11,12,13,14")?;
1066
1067        // Parse paths
1068        let table_path1 = ListingTableUrl::parse(file_path1.to_str().unwrap())?;
1069        let table_path2 = ListingTableUrl::parse(file_path2.to_str().unwrap())?;
1070
1071        // Create format and options
1072        let format = CsvFormat::default().with_has_header(true);
1073        let options = ListingOptions::new(Arc::new(format));
1074
1075        // Test case 1: Infer schema using first file's schema
1076        let config1 = ListingTableConfig::new_with_multi_paths(vec![
1077            table_path1.clone(),
1078            table_path2.clone(),
1079        ])
1080        .with_listing_options(options.clone());
1081        let config1 = config1.infer_schema(&ctx.state()).await?;
1082        assert_eq!(config1.schema_source(), SchemaSource::Inferred);
1083
1084        // Verify schema matches first file
1085        let schema1 = config1.file_schema.as_ref().unwrap().clone();
1086        assert_eq!(schema1.fields().len(), 3);
1087        assert_eq!(schema1.field(0).name(), "c1");
1088        assert_eq!(schema1.field(1).name(), "c2");
1089        assert_eq!(schema1.field(2).name(), "c3");
1090
1091        // Test case 2: Use specified schema with 3 columns
1092        let schema_3cols = Arc::new(Schema::new(vec![
1093            Field::new("c1", DataType::Utf8, true),
1094            Field::new("c2", DataType::Utf8, true),
1095            Field::new("c3", DataType::Utf8, true),
1096        ]));
1097
1098        let config2 = ListingTableConfig::new_with_multi_paths(vec![
1099            table_path1.clone(),
1100            table_path2.clone(),
1101        ])
1102        .with_listing_options(options.clone())
1103        .with_schema(schema_3cols);
1104        let config2 = config2.infer_schema(&ctx.state()).await?;
1105        assert_eq!(config2.schema_source(), SchemaSource::Specified);
1106
1107        // Verify that the schema is still the one we specified (3 columns)
1108        let schema2 = config2.file_schema.as_ref().unwrap().clone();
1109        assert_eq!(schema2.fields().len(), 3);
1110        assert_eq!(schema2.field(0).name(), "c1");
1111        assert_eq!(schema2.field(1).name(), "c2");
1112        assert_eq!(schema2.field(2).name(), "c3");
1113
1114        // Test case 3: Use specified schema with 4 columns
1115        let schema_4cols = Arc::new(Schema::new(vec![
1116            Field::new("c1", DataType::Utf8, true),
1117            Field::new("c2", DataType::Utf8, true),
1118            Field::new("c3", DataType::Utf8, true),
1119            Field::new("c4", DataType::Utf8, true),
1120        ]));
1121
1122        let config3 = ListingTableConfig::new_with_multi_paths(vec![
1123            table_path1.clone(),
1124            table_path2.clone(),
1125        ])
1126        .with_listing_options(options.clone())
1127        .with_schema(schema_4cols);
1128        let config3 = config3.infer_schema(&ctx.state()).await?;
1129        assert_eq!(config3.schema_source(), SchemaSource::Specified);
1130
1131        // Verify that the schema is still the one we specified (4 columns)
1132        let schema3 = config3.file_schema.as_ref().unwrap().clone();
1133        assert_eq!(schema3.fields().len(), 4);
1134        assert_eq!(schema3.field(0).name(), "c1");
1135        assert_eq!(schema3.field(1).name(), "c2");
1136        assert_eq!(schema3.field(2).name(), "c3");
1137        assert_eq!(schema3.field(3).name(), "c4");
1138
1139        // Test case 4: Verify order matters when inferring schema
1140        let config4 = ListingTableConfig::new_with_multi_paths(vec![
1141            table_path2.clone(),
1142            table_path1.clone(),
1143        ])
1144        .with_listing_options(options);
1145        let config4 = config4.infer_schema(&ctx.state()).await?;
1146
1147        // Should use first file's schema, which now has 4 columns
1148        let schema4 = config4.file_schema.as_ref().unwrap().clone();
1149        assert_eq!(schema4.fields().len(), 4);
1150        assert_eq!(schema4.field(0).name(), "c1");
1151        assert_eq!(schema4.field(1).name(), "c2");
1152        assert_eq!(schema4.field(2).name(), "c3");
1153        assert_eq!(schema4.field(3).name(), "c4");
1154
1155        Ok(())
1156    }
1157
1158    #[tokio::test]
1159    async fn test_list_files_configurations() -> Result<()> {
1160        // Define common test cases as (description, files, paths, target_partitions, expected_partitions, file_ext)
1161        let test_cases = vec![
1162            // Single path cases
1163            (
1164                "Single path, more partitions than files",
1165                generate_test_files("bucket/key-prefix", 5),
1166                vec!["test:///bucket/key-prefix/"],
1167                12,
1168                5,
1169                Some(""),
1170            ),
1171            (
1172                "Single path, equal partitions and files",
1173                generate_test_files("bucket/key-prefix", 4),
1174                vec!["test:///bucket/key-prefix/"],
1175                4,
1176                4,
1177                Some(""),
1178            ),
1179            (
1180                "Single path, more files than partitions",
1181                generate_test_files("bucket/key-prefix", 5),
1182                vec!["test:///bucket/key-prefix/"],
1183                2,
1184                2,
1185                Some(""),
1186            ),
1187            // Multi path cases
1188            (
1189                "Multi path, more partitions than files",
1190                {
1191                    let mut files = generate_test_files("bucket/key1", 3);
1192                    files.extend(generate_test_files_with_start("bucket/key2", 2, 3));
1193                    files.extend(generate_test_files_with_start("bucket/key3", 1, 5));
1194                    files
1195                },
1196                vec!["test:///bucket/key1/", "test:///bucket/key2/"],
1197                12,
1198                5,
1199                Some(""),
1200            ),
1201            // No files case
1202            (
1203                "No files",
1204                vec![],
1205                vec!["test:///bucket/key-prefix/"],
1206                2,
1207                0,
1208                Some(""),
1209            ),
1210            // Exact path cases
1211            (
1212                "Exact paths test",
1213                {
1214                    let mut files = generate_test_files("bucket/key1", 3);
1215                    files.extend(generate_test_files_with_start("bucket/key2", 2, 3));
1216                    files
1217                },
1218                vec![
1219                    "test:///bucket/key1/file0",
1220                    "test:///bucket/key1/file1",
1221                    "test:///bucket/key1/file2",
1222                    "test:///bucket/key2/file3",
1223                    "test:///bucket/key2/file4",
1224                ],
1225                12,
1226                5,
1227                Some(""),
1228            ),
1229        ];
1230
1231        // Run each test case
1232        for (test_name, files, paths, target_partitions, expected_partitions, file_ext) in
1233            test_cases
1234        {
1235            println!("Running test: {test_name}");
1236
1237            if files.is_empty() {
1238                // Test empty files case
1239                assert_list_files_for_multi_paths(
1240                    &[],
1241                    &paths,
1242                    target_partitions,
1243                    expected_partitions,
1244                    file_ext,
1245                )
1246                .await?;
1247            } else if paths.len() == 1 {
1248                // Test using single path API
1249                let file_refs: Vec<&str> = files.iter().map(|s| s.as_str()).collect();
1250                assert_list_files_for_scan_grouping(
1251                    &file_refs,
1252                    paths[0],
1253                    target_partitions,
1254                    expected_partitions,
1255                    file_ext,
1256                )
1257                .await?;
1258            } else if paths[0].contains("test:///bucket/key") {
1259                // Test using multi path API
1260                let file_refs: Vec<&str> = files.iter().map(|s| s.as_str()).collect();
1261                assert_list_files_for_multi_paths(
1262                    &file_refs,
1263                    &paths,
1264                    target_partitions,
1265                    expected_partitions,
1266                    file_ext,
1267                )
1268                .await?;
1269            } else {
1270                // Test using exact path API for specific cases
1271                let file_refs: Vec<&str> = files.iter().map(|s| s.as_str()).collect();
1272                assert_list_files_for_exact_paths(
1273                    &file_refs,
1274                    target_partitions,
1275                    expected_partitions,
1276                    file_ext,
1277                )
1278                .await?;
1279            }
1280        }
1281
1282        Ok(())
1283    }
1284
1285    #[tokio::test]
1286    async fn test_listing_table_prunes_extra_files_in_hive() -> Result<()> {
1287        let files = [
1288            "bucket/test/pid=1/file1",
1289            "bucket/test/pid=1/file2",
1290            "bucket/test/pid=2/file3",
1291            "bucket/test/pid=2/file4",
1292            "bucket/test/other/file5",
1293        ];
1294
1295        let ctx = SessionContext::new();
1296        register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::<Vec<_>>());
1297
1298        let opt = ListingOptions::new(Arc::new(JsonFormat::default()))
1299            .with_file_extension_opt(Some(""))
1300            .with_table_partition_cols(vec![("pid".to_string(), DataType::Int32)]);
1301
1302        let table_path = ListingTableUrl::parse("test:///bucket/test/").unwrap();
1303        let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
1304        let config = ListingTableConfig::new(table_path)
1305            .with_listing_options(opt)
1306            .with_schema(Arc::new(schema));
1307
1308        let table = ListingTable::try_new(config)?;
1309
1310        let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
1311        assert_eq!(file_list.len(), 1);
1312
1313        let files = file_list[0].clone();
1314
1315        assert_eq!(
1316            files
1317                .iter()
1318                .map(|f| f.path().to_string())
1319                .collect::<Vec<_>>(),
1320            vec![
1321                "bucket/test/pid=1/file1",
1322                "bucket/test/pid=1/file2",
1323                "bucket/test/pid=2/file3",
1324                "bucket/test/pid=2/file4",
1325            ]
1326        );
1327
1328        Ok(())
1329    }
1330
1331    #[cfg(feature = "parquet")]
1332    #[tokio::test]
1333    async fn test_table_stats_behaviors() -> Result<()> {
1334        use crate::datasource::file_format::parquet::ParquetFormat;
1335
1336        let testdata = crate::test_util::parquet_test_data();
1337        let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
1338        let table_path = ListingTableUrl::parse(filename)?;
1339
1340        let ctx = SessionContext::new();
1341        let state = ctx.state();
1342
1343        // Test 1: Default behavior - stats not collected
1344        let opt_default = ListingOptions::new(Arc::new(ParquetFormat::default()));
1345        let schema_default = opt_default.infer_schema(&state, &table_path).await?;
1346        let config_default = ListingTableConfig::new(table_path.clone())
1347            .with_listing_options(opt_default)
1348            .with_schema(schema_default);
1349
1350        let table_default = ListingTable::try_new(config_default)?;
1351
1352        let exec_default = table_default.scan(&state, None, &[], None).await?;
1353        assert_eq!(
1354            exec_default.partition_statistics(None)?.num_rows,
1355            Precision::Absent
1356        );
1357
1358        // TODO correct byte size: https://github.com/apache/datafusion/issues/14936
1359        assert_eq!(
1360            exec_default.partition_statistics(None)?.total_byte_size,
1361            Precision::Absent
1362        );
1363
1364        // Test 2: Explicitly disable stats
1365        let opt_disabled = ListingOptions::new(Arc::new(ParquetFormat::default()))
1366            .with_collect_stat(false);
1367        let schema_disabled = opt_disabled.infer_schema(&state, &table_path).await?;
1368        let config_disabled = ListingTableConfig::new(table_path.clone())
1369            .with_listing_options(opt_disabled)
1370            .with_schema(schema_disabled);
1371        let table_disabled = ListingTable::try_new(config_disabled)?;
1372
1373        let exec_disabled = table_disabled.scan(&state, None, &[], None).await?;
1374        assert_eq!(
1375            exec_disabled.partition_statistics(None)?.num_rows,
1376            Precision::Absent
1377        );
1378        assert_eq!(
1379            exec_disabled.partition_statistics(None)?.total_byte_size,
1380            Precision::Absent
1381        );
1382
1383        // Test 3: Explicitly enable stats
1384        let opt_enabled = ListingOptions::new(Arc::new(ParquetFormat::default()))
1385            .with_collect_stat(true);
1386        let schema_enabled = opt_enabled.infer_schema(&state, &table_path).await?;
1387        let config_enabled = ListingTableConfig::new(table_path)
1388            .with_listing_options(opt_enabled)
1389            .with_schema(schema_enabled);
1390        let table_enabled = ListingTable::try_new(config_enabled)?;
1391
1392        let exec_enabled = table_enabled.scan(&state, None, &[], None).await?;
1393        assert_eq!(
1394            exec_enabled.partition_statistics(None)?.num_rows,
1395            Precision::Exact(8)
1396        );
1397        // TODO correct byte size: https://github.com/apache/datafusion/issues/14936
1398        assert_eq!(
1399            exec_enabled.partition_statistics(None)?.total_byte_size,
1400            Precision::Exact(671)
1401        );
1402
1403        Ok(())
1404    }
1405
1406    #[tokio::test]
1407    async fn test_insert_into_parameterized() -> Result<()> {
1408        let test_cases = vec![
1409            // (file_format, batch_size, soft_max_rows, expected_files)
1410            ("json", 10, 10, 2),
1411            ("csv", 10, 10, 2),
1412            #[cfg(feature = "parquet")]
1413            ("parquet", 10, 10, 2),
1414            #[cfg(feature = "parquet")]
1415            ("parquet", 20, 20, 1),
1416        ];
1417
1418        for (format, batch_size, soft_max_rows, expected_files) in test_cases {
1419            println!("Testing insert with format: {format}, batch_size: {batch_size}, expected files: {expected_files}");
1420
1421            let mut config_map = HashMap::new();
1422            config_map.insert(
1423                "datafusion.execution.batch_size".into(),
1424                batch_size.to_string(),
1425            );
1426            config_map.insert(
1427                "datafusion.execution.soft_max_rows_per_output_file".into(),
1428                soft_max_rows.to_string(),
1429            );
1430
1431            let file_extension = match format {
1432                "json" => JsonFormat::default().get_ext(),
1433                "csv" => CsvFormat::default().get_ext(),
1434                #[cfg(feature = "parquet")]
1435                "parquet" => ParquetFormat::default().get_ext(),
1436                _ => unreachable!("Unsupported format"),
1437            };
1438
1439            helper_test_append_new_files_to_table(
1440                file_extension,
1441                FileCompressionType::UNCOMPRESSED,
1442                Some(config_map),
1443                expected_files,
1444            )
1445            .await?;
1446        }
1447
1448        Ok(())
1449    }
1450
1451    #[tokio::test]
1452    async fn test_statistics_mapping_with_custom_factory() -> Result<()> {
1453        let ctx = SessionContext::new();
1454        let table = create_test_listing_table_with_json_and_adapter(
1455            &ctx,
1456            false,
1457            // NullStatsAdapterFactory sets column_statistics null_count to DUMMY_NULL_COUNT
1458            Arc::new(NullStatsAdapterFactory {}),
1459        )?;
1460
1461        let (groups, stats) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
1462
1463        assert_eq!(stats.column_statistics[0].null_count, DUMMY_NULL_COUNT);
1464        for g in groups {
1465            if let Some(s) = g.file_statistics(None) {
1466                assert_eq!(s.column_statistics[0].null_count, DUMMY_NULL_COUNT);
1467            }
1468        }
1469
1470        Ok(())
1471    }
1472
1473    #[tokio::test]
1474    async fn test_statistics_mapping_with_default_factory() -> Result<()> {
1475        let ctx = SessionContext::new();
1476
1477        // Create a table without providing a custom schema adapter factory
1478        // This should fall back to using DefaultSchemaAdapterFactory
1479        let path = "table/file.json";
1480        register_test_store(&ctx, &[(path, 10)]);
1481
1482        let format = JsonFormat::default();
1483        let opt = ListingOptions::new(Arc::new(format)).with_collect_stat(false);
1484        let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
1485        let table_path = ListingTableUrl::parse("test:///table/")?;
1486
1487        let config = ListingTableConfig::new(table_path)
1488            .with_listing_options(opt)
1489            .with_schema(Arc::new(schema));
1490        // Note: NOT calling .with_schema_adapter_factory() to test default behavior
1491
1492        let table = ListingTable::try_new(config)?;
1493
1494        // Verify that no custom schema adapter factory is set
1495        assert!(table.schema_adapter_factory().is_none());
1496
1497        // The scan should work correctly with the default schema adapter
1498        let scan_result = table.scan(&ctx.state(), None, &[], None).await;
1499        assert!(
1500            scan_result.is_ok(),
1501            "Scan should succeed with default schema adapter"
1502        );
1503
1504        // Verify that the default adapter handles basic schema compatibility
1505        let (groups, _stats) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
1506        assert!(
1507            !groups.is_empty(),
1508            "Should list files successfully with default adapter"
1509        );
1510
1511        Ok(())
1512    }
1513
1514    #[rstest]
1515    #[case(MapSchemaError::TypeIncompatible, "Cannot map incompatible types")]
1516    #[case(MapSchemaError::GeneralFailure, "Schema adapter mapping failed")]
1517    #[case(
1518        MapSchemaError::InvalidProjection,
1519        "Invalid projection in schema mapping"
1520    )]
1521    #[tokio::test]
1522    async fn test_schema_adapter_map_schema_errors(
1523        #[case] error_type: MapSchemaError,
1524        #[case] expected_error_msg: &str,
1525    ) -> Result<()> {
1526        let ctx = SessionContext::new();
1527        let table = create_test_listing_table_with_json_and_adapter(
1528            &ctx,
1529            false,
1530            Arc::new(FailingMapSchemaAdapterFactory { error_type }),
1531        )?;
1532
1533        // The error should bubble up from the scan operation when schema mapping fails
1534        let scan_result = table.scan(&ctx.state(), None, &[], None).await;
1535
1536        assert!(scan_result.is_err());
1537        let error_msg = scan_result.unwrap_err().to_string();
1538        assert!(
1539            error_msg.contains(expected_error_msg),
1540            "Expected error containing '{expected_error_msg}', got: {error_msg}"
1541        );
1542
1543        Ok(())
1544    }
1545
1546    // Test that errors during file listing also bubble up correctly
1547    #[tokio::test]
1548    async fn test_schema_adapter_error_during_file_listing() -> Result<()> {
1549        let ctx = SessionContext::new();
1550        let table = create_test_listing_table_with_json_and_adapter(
1551            &ctx,
1552            true,
1553            Arc::new(FailingMapSchemaAdapterFactory {
1554                error_type: MapSchemaError::TypeIncompatible,
1555            }),
1556        )?;
1557
1558        // The error should bubble up from list_files_for_scan when collecting statistics
1559        let list_result = table.list_files_for_scan(&ctx.state(), &[], None).await;
1560
1561        assert!(list_result.is_err());
1562        let error_msg = list_result.unwrap_err().to_string();
1563        assert!(
1564            error_msg.contains("Cannot map incompatible types"),
1565            "Expected type incompatibility error during file listing, got: {error_msg}"
1566        );
1567
1568        Ok(())
1569    }
1570
1571    #[derive(Debug, Copy, Clone)]
1572    enum MapSchemaError {
1573        TypeIncompatible,
1574        GeneralFailure,
1575        InvalidProjection,
1576    }
1577
1578    #[derive(Debug)]
1579    struct FailingMapSchemaAdapterFactory {
1580        error_type: MapSchemaError,
1581    }
1582
1583    impl SchemaAdapterFactory for FailingMapSchemaAdapterFactory {
1584        fn create(
1585            &self,
1586            projected_table_schema: SchemaRef,
1587            _table_schema: SchemaRef,
1588        ) -> Box<dyn SchemaAdapter> {
1589            Box::new(FailingMapSchemaAdapter {
1590                schema: projected_table_schema,
1591                error_type: self.error_type,
1592            })
1593        }
1594    }
1595
1596    #[derive(Debug)]
1597    struct FailingMapSchemaAdapter {
1598        schema: SchemaRef,
1599        error_type: MapSchemaError,
1600    }
1601
1602    impl SchemaAdapter for FailingMapSchemaAdapter {
1603        fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
1604            let field = self.schema.field(index);
1605            file_schema.fields.find(field.name()).map(|(i, _)| i)
1606        }
1607
1608        fn map_schema(
1609            &self,
1610            _file_schema: &Schema,
1611        ) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
1612            // Always fail with different error types based on the configured error_type
1613            match self.error_type {
1614                MapSchemaError::TypeIncompatible => {
1615                    plan_err!(
1616                        "Cannot map incompatible types: Boolean cannot be cast to Utf8"
1617                    )
1618                }
1619                MapSchemaError::GeneralFailure => {
1620                    plan_err!("Schema adapter mapping failed due to internal error")
1621                }
1622                MapSchemaError::InvalidProjection => {
1623                    plan_err!("Invalid projection in schema mapping: column index out of bounds")
1624                }
1625            }
1626        }
1627    }
1628
1629    #[derive(Debug)]
1630    struct NullStatsAdapterFactory;
1631
1632    impl SchemaAdapterFactory for NullStatsAdapterFactory {
1633        fn create(
1634            &self,
1635            projected_table_schema: SchemaRef,
1636            _table_schema: SchemaRef,
1637        ) -> Box<dyn SchemaAdapter> {
1638            Box::new(NullStatsAdapter {
1639                schema: projected_table_schema,
1640            })
1641        }
1642    }
1643
1644    #[derive(Debug)]
1645    struct NullStatsAdapter {
1646        schema: SchemaRef,
1647    }
1648
1649    impl SchemaAdapter for NullStatsAdapter {
1650        fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
1651            let field = self.schema.field(index);
1652            file_schema.fields.find(field.name()).map(|(i, _)| i)
1653        }
1654
1655        fn map_schema(
1656            &self,
1657            file_schema: &Schema,
1658        ) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
1659            let projection = (0..file_schema.fields().len()).collect();
1660            Ok((Arc::new(NullStatsMapper {}), projection))
1661        }
1662    }
1663
1664    #[derive(Debug)]
1665    struct NullStatsMapper;
1666
1667    impl SchemaMapper for NullStatsMapper {
1668        fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
1669            Ok(batch)
1670        }
1671
1672        fn map_column_statistics(
1673            &self,
1674            stats: &[ColumnStatistics],
1675        ) -> Result<Vec<ColumnStatistics>> {
1676            Ok(stats
1677                .iter()
1678                .map(|s| {
1679                    let mut s = s.clone();
1680                    s.null_count = DUMMY_NULL_COUNT;
1681                    s
1682                })
1683                .collect())
1684        }
1685    }
1686
1687    /// Helper function to create a test ListingTable with JSON format and custom schema adapter factory
1688    fn create_test_listing_table_with_json_and_adapter(
1689        ctx: &SessionContext,
1690        collect_stat: bool,
1691        schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
1692    ) -> Result<ListingTable> {
1693        let path = "table/file.json";
1694        register_test_store(ctx, &[(path, 10)]);
1695
1696        let format = JsonFormat::default();
1697        let opt = ListingOptions::new(Arc::new(format)).with_collect_stat(collect_stat);
1698        let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
1699        let table_path = ListingTableUrl::parse("test:///table/")?;
1700
1701        let config = ListingTableConfig::new(table_path)
1702            .with_listing_options(opt)
1703            .with_schema(Arc::new(schema))
1704            .with_schema_adapter_factory(schema_adapter_factory);
1705
1706        ListingTable::try_new(config)
1707    }
1708}