1use crate::execution::SessionState;
19use async_trait::async_trait;
20use datafusion_catalog_listing::{ListingOptions, ListingTableConfig};
21use datafusion_common::{config_datafusion_err, internal_datafusion_err};
22use datafusion_session::Session;
23use futures::StreamExt;
24use std::collections::HashMap;
25
26#[async_trait]
32pub trait ListingTableConfigExt {
33 async fn infer_options(
37 self,
38 state: &dyn Session,
39 ) -> datafusion_common::Result<ListingTableConfig>;
40
41 async fn infer(
43 self,
44 state: &dyn Session,
45 ) -> datafusion_common::Result<ListingTableConfig>;
46}
47
48#[async_trait]
49impl ListingTableConfigExt for ListingTableConfig {
50 async fn infer_options(
51 self,
52 state: &dyn Session,
53 ) -> datafusion_common::Result<ListingTableConfig> {
54 let store = if let Some(url) = self.table_paths.first() {
55 state.runtime_env().object_store(url)?
56 } else {
57 return Ok(self);
58 };
59
60 let file = self
61 .table_paths
62 .first()
63 .unwrap()
64 .list_all_files(state, store.as_ref(), "")
65 .await?
66 .next()
67 .await
68 .ok_or_else(|| internal_datafusion_err!("No files for table"))??;
69
70 let (file_extension, maybe_compression_type) =
71 ListingTableConfig::infer_file_extension_and_compression_type(
72 file.location.as_ref(),
73 )?;
74
75 let mut format_options = HashMap::new();
76 if let Some(ref compression_type) = maybe_compression_type {
77 format_options
78 .insert("format.compression".to_string(), compression_type.clone());
79 }
80 let state = state.as_any().downcast_ref::<SessionState>().unwrap();
81 let file_format = state
82 .get_file_format_factory(&file_extension)
83 .ok_or(config_datafusion_err!(
84 "No file_format found with extension {file_extension}"
85 ))?
86 .create(state, &format_options)?;
87
88 let listing_file_extension =
89 if let Some(compression_type) = maybe_compression_type {
90 format!("{}.{}", &file_extension, &compression_type)
91 } else {
92 file_extension
93 };
94
95 let listing_options = ListingOptions::new(file_format)
96 .with_file_extension(listing_file_extension)
97 .with_target_partitions(state.config().target_partitions())
98 .with_collect_stat(state.config().collect_statistics());
99
100 Ok(self.with_listing_options(listing_options))
101 }
102
103 async fn infer(self, state: &dyn Session) -> datafusion_common::Result<Self> {
104 self.infer_options(state).await?.infer_schema(state).await
105 }
106}
107
108#[cfg(test)]
109mod tests {
110 #[cfg(feature = "parquet")]
111 use crate::datasource::file_format::parquet::ParquetFormat;
112 use crate::datasource::listing::table::ListingTableConfigExt;
113 use crate::prelude::*;
114 use crate::{
115 datasource::{
116 file_format::csv::CsvFormat, file_format::json::JsonFormat,
117 provider_as_source, DefaultTableSource, MemTable,
118 },
119 execution::options::ArrowReadOptions,
120 test::{
121 columns, object_store::ensure_head_concurrency,
122 object_store::make_test_store_and_state, object_store::register_test_store,
123 },
124 };
125 use arrow::{compute::SortOptions, record_batch::RecordBatch};
126 use arrow_schema::{DataType, Field, Schema, SchemaRef};
127 use datafusion_catalog::TableProvider;
128 use datafusion_catalog_listing::{
129 ListingOptions, ListingTable, ListingTableConfig, SchemaSource,
130 };
131 use datafusion_common::{
132 assert_contains, plan_err,
133 stats::Precision,
134 test_util::{batches_to_string, datafusion_test_data},
135 ColumnStatistics, DataFusionError, Result, ScalarValue,
136 };
137 use datafusion_datasource::file_compression_type::FileCompressionType;
138 use datafusion_datasource::file_format::FileFormat;
139 use datafusion_datasource::schema_adapter::{
140 SchemaAdapter, SchemaAdapterFactory, SchemaMapper,
141 };
142 use datafusion_datasource::ListingTableUrl;
143 use datafusion_expr::dml::InsertOp;
144 use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator};
145 use datafusion_physical_expr::expressions::binary;
146 use datafusion_physical_expr::PhysicalSortExpr;
147 use datafusion_physical_expr_common::sort_expr::LexOrdering;
148 use datafusion_physical_plan::empty::EmptyExec;
149 use datafusion_physical_plan::{collect, ExecutionPlanProperties};
150 use rstest::rstest;
151 use std::collections::HashMap;
152 use std::io::Write;
153 use std::sync::Arc;
154 use tempfile::TempDir;
155 use url::Url;
156
157 const DUMMY_NULL_COUNT: Precision<usize> = Precision::Exact(42);
158
159 fn create_test_schema() -> SchemaRef {
161 Arc::new(Schema::new(vec![
162 Field::new("c1", DataType::Float32, true),
163 Field::new("c2", DataType::Float64, true),
164 Field::new("c3", DataType::Boolean, true),
165 Field::new("c4", DataType::Utf8, true),
166 ]))
167 }
168
169 fn generate_test_files(prefix: &str, count: usize) -> Vec<String> {
171 generate_test_files_with_start(prefix, count, 0)
172 }
173
174 fn generate_test_files_with_start(
176 prefix: &str,
177 count: usize,
178 start_index: usize,
179 ) -> Vec<String> {
180 (start_index..start_index + count)
181 .map(|i| format!("{prefix}/file{i}"))
182 .collect()
183 }
184
185 #[tokio::test]
186 async fn test_schema_source_tracking_comprehensive() -> Result<()> {
187 let ctx = SessionContext::new();
188 let testdata = datafusion_test_data();
189 let filename = format!("{testdata}/aggregate_simple.csv");
190 let table_path = ListingTableUrl::parse(filename)?;
191
192 let format = CsvFormat::default();
194 let options = ListingOptions::new(Arc::new(format));
195 let config =
196 ListingTableConfig::new(table_path.clone()).with_listing_options(options);
197 assert_eq!(config.schema_source(), SchemaSource::Unset);
198
199 let provided_schema = create_test_schema();
201 let config_with_schema = config.clone().with_schema(provided_schema.clone());
202 assert_eq!(config_with_schema.schema_source(), SchemaSource::Specified);
203
204 assert_eq!(config.schema_source(), SchemaSource::Unset);
206
207 let config_with_inferred = config.infer_schema(&ctx.state()).await?;
208 assert_eq!(config_with_inferred.schema_source(), SchemaSource::Inferred);
209
210 let config_with_schema_and_options = config_with_schema.clone();
212 assert_eq!(
213 config_with_schema_and_options.schema_source(),
214 SchemaSource::Specified
215 );
216
217 let config_with_schema_and_infer = config_with_schema_and_options
219 .clone()
220 .infer(&ctx.state())
221 .await?;
222 assert_eq!(
223 config_with_schema_and_infer.schema_source(),
224 SchemaSource::Specified
225 );
226
227 let table_specified = ListingTable::try_new(config_with_schema_and_options)?;
229 assert_eq!(table_specified.schema_source(), SchemaSource::Specified);
230
231 let table_inferred = ListingTable::try_new(config_with_inferred)?;
232 assert_eq!(table_inferred.schema_source(), SchemaSource::Inferred);
233
234 Ok(())
235 }
236
237 #[tokio::test]
238 async fn read_single_file() -> Result<()> {
239 let ctx = SessionContext::new_with_config(
240 SessionConfig::new().with_collect_statistics(true),
241 );
242
243 let table = load_table(&ctx, "alltypes_plain.parquet").await?;
244 let projection = None;
245 let exec = table
246 .scan(&ctx.state(), projection, &[], None)
247 .await
248 .expect("Scan table");
249
250 assert_eq!(exec.children().len(), 0);
251 assert_eq!(exec.output_partitioning().partition_count(), 1);
252
253 assert_eq!(
255 exec.partition_statistics(None)?.num_rows,
256 Precision::Exact(8)
257 );
258 assert_eq!(
259 exec.partition_statistics(None)?.total_byte_size,
260 Precision::Exact(671)
261 );
262
263 Ok(())
264 }
265
266 #[cfg(feature = "parquet")]
267 #[tokio::test]
268 async fn test_try_create_output_ordering() {
269 let testdata = crate::test_util::parquet_test_data();
270 let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
271 let table_path = ListingTableUrl::parse(filename).unwrap();
272
273 let ctx = SessionContext::new();
274 let state = ctx.state();
275 let options = ListingOptions::new(Arc::new(ParquetFormat::default()));
276 let schema = options.infer_schema(&state, &table_path).await.unwrap();
277
278 use crate::datasource::file_format::parquet::ParquetFormat;
279 use datafusion_physical_plan::expressions::col as physical_col;
280 use datafusion_physical_plan::expressions::lit as physical_lit;
281 use std::ops::Add;
282
283 let cases = vec![
285 (
286 vec![],
287 Ok::<Vec<LexOrdering>, DataFusionError>(Vec::<LexOrdering>::new()),
288 ),
289 (
291 vec![vec![col("int_col").add(lit(1)).sort(true, true)]],
292 Ok(vec![[PhysicalSortExpr {
293 expr: binary(
294 physical_col("int_col", &schema).unwrap(),
295 Operator::Plus,
296 physical_lit(1),
297 &schema,
298 )
299 .unwrap(),
300 options: SortOptions {
301 descending: false,
302 nulls_first: true,
303 },
304 }]
305 .into()]),
306 ),
307 (
309 vec![vec![col("string_col").sort(true, false)]],
310 Ok(vec![[PhysicalSortExpr {
311 expr: physical_col("string_col", &schema).unwrap(),
312 options: SortOptions {
313 descending: false,
314 nulls_first: false,
315 },
316 }]
317 .into()]),
318 ),
319 (
321 vec![vec![
322 col("string_col").sort(true, false),
323 col("int_col").sort(false, true),
324 ]],
325 Ok(vec![[
326 PhysicalSortExpr::new_default(
327 physical_col("string_col", &schema).unwrap(),
328 )
329 .asc()
330 .nulls_last(),
331 PhysicalSortExpr::new_default(
332 physical_col("int_col", &schema).unwrap(),
333 )
334 .desc()
335 .nulls_first(),
336 ]
337 .into()]),
338 ),
339 ];
340
341 for (file_sort_order, expected_result) in cases {
342 let options = options.clone().with_file_sort_order(file_sort_order);
343
344 let config = ListingTableConfig::new(table_path.clone())
345 .with_listing_options(options)
346 .with_schema(schema.clone());
347
348 let table =
349 ListingTable::try_new(config.clone()).expect("Creating the table");
350 let ordering_result =
351 table.try_create_output_ordering(state.execution_props());
352
353 match (expected_result, ordering_result) {
354 (Ok(expected), Ok(result)) => {
355 assert_eq!(expected, result);
356 }
357 (Err(expected), Err(result)) => {
358 let result = result.to_string();
360 let expected = expected.to_string();
361 assert_contains!(result.to_string(), expected);
362 }
363 (expected_result, ordering_result) => {
364 panic!(
365 "expected: {expected_result:#?}\n\nactual:{ordering_result:#?}"
366 );
367 }
368 }
369 }
370 }
371
372 #[tokio::test]
373 async fn read_empty_table() -> Result<()> {
374 let ctx = SessionContext::new();
375 let path = String::from("table/p1=v1/file.json");
376 register_test_store(&ctx, &[(&path, 100)]);
377
378 let format = JsonFormat::default();
379 let ext = format.get_ext();
380
381 let opt = ListingOptions::new(Arc::new(format))
382 .with_file_extension(ext)
383 .with_table_partition_cols(vec![(String::from("p1"), DataType::Utf8)])
384 .with_target_partitions(4);
385
386 let table_path = ListingTableUrl::parse("test:///table/")?;
387 let file_schema =
388 Arc::new(Schema::new(vec![Field::new("a", DataType::Boolean, false)]));
389 let config = ListingTableConfig::new(table_path)
390 .with_listing_options(opt)
391 .with_schema(file_schema);
392 let table = ListingTable::try_new(config)?;
393
394 assert_eq!(
395 columns(&table.schema()),
396 vec!["a".to_owned(), "p1".to_owned()]
397 );
398
399 let filter = Expr::not_eq(col("p1"), lit("v1"));
401
402 let scan = table
403 .scan(&ctx.state(), None, &[filter], None)
404 .await
405 .expect("Empty execution plan");
406
407 assert!(scan.as_any().is::<EmptyExec>());
408 assert_eq!(
409 columns(&scan.schema()),
410 vec!["a".to_owned(), "p1".to_owned()]
411 );
412
413 Ok(())
414 }
415
416 async fn load_table(
417 ctx: &SessionContext,
418 name: &str,
419 ) -> Result<Arc<dyn TableProvider>> {
420 let testdata = crate::test_util::parquet_test_data();
421 let filename = format!("{testdata}/{name}");
422 let table_path = ListingTableUrl::parse(filename)?;
423
424 let config = ListingTableConfig::new(table_path)
425 .infer(&ctx.state())
426 .await?;
427 let table = ListingTable::try_new(config)?;
428 Ok(Arc::new(table))
429 }
430
431 async fn assert_list_files_for_scan_grouping(
434 files: &[&str],
435 table_prefix: &str,
436 target_partitions: usize,
437 output_partitioning: usize,
438 file_ext: Option<&str>,
439 ) -> Result<()> {
440 let ctx = SessionContext::new();
441 register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::<Vec<_>>());
442
443 let opt = ListingOptions::new(Arc::new(JsonFormat::default()))
444 .with_file_extension_opt(file_ext)
445 .with_target_partitions(target_partitions);
446
447 let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
448
449 let table_path = ListingTableUrl::parse(table_prefix)?;
450 let config = ListingTableConfig::new(table_path)
451 .with_listing_options(opt)
452 .with_schema(Arc::new(schema));
453
454 let table = ListingTable::try_new(config)?;
455
456 let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
457
458 assert_eq!(file_list.len(), output_partitioning);
459
460 Ok(())
461 }
462
463 async fn assert_list_files_for_multi_paths(
466 files: &[&str],
467 table_prefix: &[&str],
468 target_partitions: usize,
469 output_partitioning: usize,
470 file_ext: Option<&str>,
471 ) -> Result<()> {
472 let ctx = SessionContext::new();
473 register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::<Vec<_>>());
474
475 let opt = ListingOptions::new(Arc::new(JsonFormat::default()))
476 .with_file_extension_opt(file_ext)
477 .with_target_partitions(target_partitions);
478
479 let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
480
481 let table_paths = table_prefix
482 .iter()
483 .map(|t| ListingTableUrl::parse(t).unwrap())
484 .collect();
485 let config = ListingTableConfig::new_with_multi_paths(table_paths)
486 .with_listing_options(opt)
487 .with_schema(Arc::new(schema));
488
489 let table = ListingTable::try_new(config)?;
490
491 let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
492
493 assert_eq!(file_list.len(), output_partitioning);
494
495 Ok(())
496 }
497
498 async fn assert_list_files_for_exact_paths(
502 files: &[&str],
503 target_partitions: usize,
504 output_partitioning: usize,
505 file_ext: Option<&str>,
506 ) -> Result<()> {
507 let ctx = SessionContext::new();
508 let (store, _) = make_test_store_and_state(
509 &files.iter().map(|f| (*f, 10)).collect::<Vec<_>>(),
510 );
511
512 let meta_fetch_concurrency = ctx
513 .state()
514 .config_options()
515 .execution
516 .meta_fetch_concurrency;
517 let expected_concurrency = files.len().min(meta_fetch_concurrency);
518 let head_concurrency_store = ensure_head_concurrency(store, expected_concurrency);
519
520 let url = Url::parse("test://").unwrap();
521 ctx.register_object_store(&url, head_concurrency_store.clone());
522
523 let format = JsonFormat::default();
524
525 let opt = ListingOptions::new(Arc::new(format))
526 .with_file_extension_opt(file_ext)
527 .with_target_partitions(target_partitions);
528
529 let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
530
531 let table_paths = files
532 .iter()
533 .map(|t| ListingTableUrl::parse(format!("test:///{t}")).unwrap())
534 .collect();
535 let config = ListingTableConfig::new_with_multi_paths(table_paths)
536 .with_listing_options(opt)
537 .with_schema(Arc::new(schema));
538
539 let table = ListingTable::try_new(config)?;
540
541 let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
542
543 assert_eq!(file_list.len(), output_partitioning);
544
545 Ok(())
546 }
547
548 #[tokio::test]
549 async fn test_insert_into_sql_csv_defaults() -> Result<()> {
550 helper_test_insert_into_sql("csv", FileCompressionType::UNCOMPRESSED, "", None)
551 .await?;
552 Ok(())
553 }
554
555 #[tokio::test]
556 async fn test_insert_into_sql_csv_defaults_header_row() -> Result<()> {
557 helper_test_insert_into_sql(
558 "csv",
559 FileCompressionType::UNCOMPRESSED,
560 "",
561 Some(HashMap::from([("has_header".into(), "true".into())])),
562 )
563 .await?;
564 Ok(())
565 }
566
567 #[tokio::test]
568 async fn test_insert_into_sql_json_defaults() -> Result<()> {
569 helper_test_insert_into_sql("json", FileCompressionType::UNCOMPRESSED, "", None)
570 .await?;
571 Ok(())
572 }
573
574 #[tokio::test]
575 async fn test_insert_into_sql_parquet_defaults() -> Result<()> {
576 helper_test_insert_into_sql(
577 "parquet",
578 FileCompressionType::UNCOMPRESSED,
579 "",
580 None,
581 )
582 .await?;
583 Ok(())
584 }
585
586 #[tokio::test]
587 async fn test_insert_into_sql_parquet_session_overrides() -> Result<()> {
588 let mut config_map: HashMap<String, String> = HashMap::new();
589 config_map.insert(
590 "datafusion.execution.parquet.compression".into(),
591 "zstd(5)".into(),
592 );
593 config_map.insert(
594 "datafusion.execution.parquet.dictionary_enabled".into(),
595 "false".into(),
596 );
597 config_map.insert(
598 "datafusion.execution.parquet.dictionary_page_size_limit".into(),
599 "100".into(),
600 );
601 config_map.insert(
602 "datafusion.execution.parquet.statistics_enabled".into(),
603 "none".into(),
604 );
605 config_map.insert(
606 "datafusion.execution.parquet.max_statistics_size".into(),
607 "10".into(),
608 );
609 config_map.insert(
610 "datafusion.execution.parquet.max_row_group_size".into(),
611 "5".into(),
612 );
613 config_map.insert(
614 "datafusion.execution.parquet.created_by".into(),
615 "datafusion test".into(),
616 );
617 config_map.insert(
618 "datafusion.execution.parquet.column_index_truncate_length".into(),
619 "50".into(),
620 );
621 config_map.insert(
622 "datafusion.execution.parquet.data_page_row_count_limit".into(),
623 "50".into(),
624 );
625 config_map.insert(
626 "datafusion.execution.parquet.bloom_filter_on_write".into(),
627 "true".into(),
628 );
629 config_map.insert(
630 "datafusion.execution.parquet.bloom_filter_fpp".into(),
631 "0.01".into(),
632 );
633 config_map.insert(
634 "datafusion.execution.parquet.bloom_filter_ndv".into(),
635 "1000".into(),
636 );
637 config_map.insert(
638 "datafusion.execution.parquet.writer_version".into(),
639 "2.0".into(),
640 );
641 config_map.insert(
642 "datafusion.execution.parquet.write_batch_size".into(),
643 "5".into(),
644 );
645 helper_test_insert_into_sql(
646 "parquet",
647 FileCompressionType::UNCOMPRESSED,
648 "",
649 Some(config_map),
650 )
651 .await?;
652 Ok(())
653 }
654
655 #[tokio::test]
656 async fn test_insert_into_append_new_parquet_files_session_overrides() -> Result<()> {
657 let mut config_map: HashMap<String, String> = HashMap::new();
658 config_map.insert(
659 "datafusion.execution.soft_max_rows_per_output_file".into(),
660 "10".into(),
661 );
662 config_map.insert(
663 "datafusion.execution.parquet.compression".into(),
664 "zstd(5)".into(),
665 );
666 config_map.insert(
667 "datafusion.execution.parquet.dictionary_enabled".into(),
668 "false".into(),
669 );
670 config_map.insert(
671 "datafusion.execution.parquet.dictionary_page_size_limit".into(),
672 "100".into(),
673 );
674 config_map.insert(
675 "datafusion.execution.parquet.statistics_enabled".into(),
676 "none".into(),
677 );
678 config_map.insert(
679 "datafusion.execution.parquet.max_statistics_size".into(),
680 "10".into(),
681 );
682 config_map.insert(
683 "datafusion.execution.parquet.max_row_group_size".into(),
684 "5".into(),
685 );
686 config_map.insert(
687 "datafusion.execution.parquet.created_by".into(),
688 "datafusion test".into(),
689 );
690 config_map.insert(
691 "datafusion.execution.parquet.column_index_truncate_length".into(),
692 "50".into(),
693 );
694 config_map.insert(
695 "datafusion.execution.parquet.data_page_row_count_limit".into(),
696 "50".into(),
697 );
698 config_map.insert(
699 "datafusion.execution.parquet.encoding".into(),
700 "delta_binary_packed".into(),
701 );
702 config_map.insert(
703 "datafusion.execution.parquet.bloom_filter_on_write".into(),
704 "true".into(),
705 );
706 config_map.insert(
707 "datafusion.execution.parquet.bloom_filter_fpp".into(),
708 "0.01".into(),
709 );
710 config_map.insert(
711 "datafusion.execution.parquet.bloom_filter_ndv".into(),
712 "1000".into(),
713 );
714 config_map.insert(
715 "datafusion.execution.parquet.writer_version".into(),
716 "2.0".into(),
717 );
718 config_map.insert(
719 "datafusion.execution.parquet.write_batch_size".into(),
720 "5".into(),
721 );
722 config_map.insert("datafusion.execution.batch_size".into(), "10".into());
723 helper_test_append_new_files_to_table(
724 ParquetFormat::default().get_ext(),
725 FileCompressionType::UNCOMPRESSED,
726 Some(config_map),
727 2,
728 )
729 .await?;
730 Ok(())
731 }
732
733 #[tokio::test]
734 async fn test_insert_into_append_new_parquet_files_invalid_session_fails(
735 ) -> Result<()> {
736 let mut config_map: HashMap<String, String> = HashMap::new();
737 config_map.insert(
738 "datafusion.execution.parquet.compression".into(),
739 "zstd".into(),
740 );
741 let e = helper_test_append_new_files_to_table(
742 ParquetFormat::default().get_ext(),
743 FileCompressionType::UNCOMPRESSED,
744 Some(config_map),
745 2,
746 )
747 .await
748 .expect_err("Example should fail!");
749 assert_eq!(e.strip_backtrace(), "Invalid or Unsupported Configuration: zstd compression requires specifying a level such as zstd(4)");
750
751 Ok(())
752 }
753
754 async fn helper_test_append_new_files_to_table(
755 file_type_ext: String,
756 file_compression_type: FileCompressionType,
757 session_config_map: Option<HashMap<String, String>>,
758 expected_n_files_per_insert: usize,
759 ) -> Result<()> {
760 let session_ctx = match session_config_map {
762 Some(cfg) => {
763 let config = SessionConfig::from_string_hash_map(&cfg)?;
764 SessionContext::new_with_config(config)
765 }
766 None => SessionContext::new(),
767 };
768
769 let schema = Arc::new(Schema::new(vec![Field::new(
771 "column1",
772 DataType::Int32,
773 false,
774 )]));
775
776 let filter_predicate = Expr::BinaryExpr(BinaryExpr::new(
777 Box::new(Expr::Column("column1".into())),
778 Operator::GtEq,
779 Box::new(Expr::Literal(ScalarValue::Int32(Some(0)), None)),
780 ));
781
782 let batch = RecordBatch::try_new(
784 schema.clone(),
785 vec![Arc::new(arrow::array::Int32Array::from(vec![
786 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
787 ]))],
788 )?;
789
790 let tmp_dir = TempDir::new()?;
792 match file_type_ext.as_str() {
793 "csv" => {
794 session_ctx
795 .register_csv(
796 "t",
797 tmp_dir.path().to_str().unwrap(),
798 CsvReadOptions::new()
799 .schema(schema.as_ref())
800 .file_compression_type(file_compression_type),
801 )
802 .await?;
803 }
804 "json" => {
805 session_ctx
806 .register_json(
807 "t",
808 tmp_dir.path().to_str().unwrap(),
809 NdJsonReadOptions::default()
810 .schema(schema.as_ref())
811 .file_compression_type(file_compression_type),
812 )
813 .await?;
814 }
815 #[cfg(feature = "parquet")]
816 "parquet" => {
817 session_ctx
818 .register_parquet(
819 "t",
820 tmp_dir.path().to_str().unwrap(),
821 ParquetReadOptions::default().schema(schema.as_ref()),
822 )
823 .await?;
824 }
825 #[cfg(feature = "avro")]
826 "avro" => {
827 session_ctx
828 .register_avro(
829 "t",
830 tmp_dir.path().to_str().unwrap(),
831 AvroReadOptions::default().schema(schema.as_ref()),
832 )
833 .await?;
834 }
835 "arrow" => {
836 session_ctx
837 .register_arrow(
838 "t",
839 tmp_dir.path().to_str().unwrap(),
840 ArrowReadOptions::default().schema(schema.as_ref()),
841 )
842 .await?;
843 }
844 _ => panic!("Unrecognized file extension {file_type_ext}"),
845 }
846
847 let source_table = Arc::new(MemTable::try_new(
849 schema.clone(),
850 vec![vec![batch.clone(), batch.clone()]],
851 )?);
852 session_ctx.register_table("source", source_table.clone())?;
853 let source = provider_as_source(source_table);
855 let target = session_ctx.table_provider("t").await?;
856 let target = Arc::new(DefaultTableSource::new(target));
857 let scan_plan = LogicalPlanBuilder::scan("source", source, None)?
859 .filter(filter_predicate)?
860 .build()?;
861 let insert_into_table =
865 LogicalPlanBuilder::insert_into(scan_plan, "t", target, InsertOp::Append)?
866 .build()?;
867 let plan = session_ctx
869 .state()
870 .create_physical_plan(&insert_into_table)
871 .await?;
872 let res = collect(plan, session_ctx.task_ctx()).await?;
874 insta::allow_duplicates! {insta::assert_snapshot!(batches_to_string(&res),@r###"
877 +-------+
878 | count |
879 +-------+
880 | 20 |
881 +-------+
882 "###);}
883
884 let batches = session_ctx
886 .sql("select count(*) as count from t")
887 .await?
888 .collect()
889 .await?;
890
891 insta::allow_duplicates! {insta::assert_snapshot!(batches_to_string(&batches),@r###"
892 +-------+
893 | count |
894 +-------+
895 | 20 |
896 +-------+
897 "###);}
898
899 let num_files = tmp_dir.path().read_dir()?.count();
901 assert_eq!(num_files, expected_n_files_per_insert);
902
903 let plan = session_ctx
905 .state()
906 .create_physical_plan(&insert_into_table)
907 .await?;
908
909 let res = collect(plan, session_ctx.task_ctx()).await?;
911
912 insta::allow_duplicates! {insta::assert_snapshot!(batches_to_string(&res),@r###"
913 +-------+
914 | count |
915 +-------+
916 | 20 |
917 +-------+
918 "###);}
919
920 let batches = session_ctx
922 .sql("select count(*) AS count from t")
923 .await?
924 .collect()
925 .await?;
926
927 insta::allow_duplicates! {insta::assert_snapshot!(batches_to_string(&batches),@r###"
928 +-------+
929 | count |
930 +-------+
931 | 40 |
932 +-------+
933 "###);}
934
935 let num_files = tmp_dir.path().read_dir()?.count();
937 assert_eq!(num_files, expected_n_files_per_insert * 2);
938
939 Ok(())
941 }
942
943 async fn helper_test_insert_into_sql(
946 file_type: &str,
947 _file_compression_type: FileCompressionType,
949 external_table_options: &str,
950 session_config_map: Option<HashMap<String, String>>,
951 ) -> Result<()> {
952 let session_ctx = match session_config_map {
954 Some(cfg) => {
955 let config = SessionConfig::from_string_hash_map(&cfg)?;
956 SessionContext::new_with_config(config)
957 }
958 None => SessionContext::new(),
959 };
960
961 let tmp_dir = TempDir::new()?;
963 let str_path = tmp_dir
964 .path()
965 .to_str()
966 .expect("Temp path should convert to &str");
967 session_ctx
968 .sql(&format!(
969 "create external table foo(a varchar, b varchar, c int) \
970 stored as {file_type} \
971 location '{str_path}' \
972 {external_table_options}"
973 ))
974 .await?
975 .collect()
976 .await?;
977
978 session_ctx.sql("insert into foo values ('foo', 'bar', 1),('foo', 'bar', 2), ('foo', 'bar', 3)")
980 .await?
981 .collect()
982 .await?;
983
984 let batches = session_ctx
986 .sql("select * from foo")
987 .await?
988 .collect()
989 .await?;
990
991 insta::allow_duplicates! {insta::assert_snapshot!(batches_to_string(&batches),@r###"
992 +-----+-----+---+
993 | a | b | c |
994 +-----+-----+---+
995 | foo | bar | 1 |
996 | foo | bar | 2 |
997 | foo | bar | 3 |
998 +-----+-----+---+
999 "###);}
1000
1001 Ok(())
1002 }
1003
1004 #[tokio::test]
1005 async fn test_infer_options_compressed_csv() -> Result<()> {
1006 let testdata = crate::test_util::arrow_test_data();
1007 let filename = format!("{testdata}/csv/aggregate_test_100.csv.gz");
1008 let table_path = ListingTableUrl::parse(filename)?;
1009
1010 let ctx = SessionContext::new();
1011
1012 let config = ListingTableConfig::new(table_path);
1013 let config_with_opts = config.infer_options(&ctx.state()).await?;
1014 let config_with_schema = config_with_opts.infer_schema(&ctx.state()).await?;
1015
1016 let schema = config_with_schema.file_schema.unwrap();
1017
1018 assert_eq!(schema.fields.len(), 13);
1019
1020 Ok(())
1021 }
1022
1023 #[tokio::test]
1024 async fn infer_preserves_provided_schema() -> Result<()> {
1025 let ctx = SessionContext::new();
1026
1027 let testdata = datafusion_test_data();
1028 let filename = format!("{testdata}/aggregate_simple.csv");
1029 let table_path = ListingTableUrl::parse(filename)?;
1030
1031 let provided_schema = create_test_schema();
1032
1033 let format = CsvFormat::default();
1034 let options = ListingOptions::new(Arc::new(format));
1035 let config = ListingTableConfig::new(table_path)
1036 .with_listing_options(options)
1037 .with_schema(Arc::clone(&provided_schema));
1038
1039 let config = config.infer(&ctx.state()).await?;
1040
1041 assert_eq!(*config.file_schema.unwrap(), *provided_schema);
1042
1043 Ok(())
1044 }
1045
1046 #[tokio::test]
1047 async fn test_listing_table_config_with_multiple_files_comprehensive() -> Result<()> {
1048 let ctx = SessionContext::new();
1049
1050 let tmp_dir = TempDir::new()?;
1052 let file_path1 = tmp_dir.path().join("file1.csv");
1053 let file_path2 = tmp_dir.path().join("file2.csv");
1054
1055 let mut file1 = std::fs::File::create(&file_path1)?;
1057 writeln!(file1, "c1,c2,c3")?;
1058 writeln!(file1, "1,2,3")?;
1059 writeln!(file1, "4,5,6")?;
1060
1061 let mut file2 = std::fs::File::create(&file_path2)?;
1063 writeln!(file2, "c1,c2,c3,c4")?;
1064 writeln!(file2, "7,8,9,10")?;
1065 writeln!(file2, "11,12,13,14")?;
1066
1067 let table_path1 = ListingTableUrl::parse(file_path1.to_str().unwrap())?;
1069 let table_path2 = ListingTableUrl::parse(file_path2.to_str().unwrap())?;
1070
1071 let format = CsvFormat::default().with_has_header(true);
1073 let options = ListingOptions::new(Arc::new(format));
1074
1075 let config1 = ListingTableConfig::new_with_multi_paths(vec![
1077 table_path1.clone(),
1078 table_path2.clone(),
1079 ])
1080 .with_listing_options(options.clone());
1081 let config1 = config1.infer_schema(&ctx.state()).await?;
1082 assert_eq!(config1.schema_source(), SchemaSource::Inferred);
1083
1084 let schema1 = config1.file_schema.as_ref().unwrap().clone();
1086 assert_eq!(schema1.fields().len(), 3);
1087 assert_eq!(schema1.field(0).name(), "c1");
1088 assert_eq!(schema1.field(1).name(), "c2");
1089 assert_eq!(schema1.field(2).name(), "c3");
1090
1091 let schema_3cols = Arc::new(Schema::new(vec![
1093 Field::new("c1", DataType::Utf8, true),
1094 Field::new("c2", DataType::Utf8, true),
1095 Field::new("c3", DataType::Utf8, true),
1096 ]));
1097
1098 let config2 = ListingTableConfig::new_with_multi_paths(vec![
1099 table_path1.clone(),
1100 table_path2.clone(),
1101 ])
1102 .with_listing_options(options.clone())
1103 .with_schema(schema_3cols);
1104 let config2 = config2.infer_schema(&ctx.state()).await?;
1105 assert_eq!(config2.schema_source(), SchemaSource::Specified);
1106
1107 let schema2 = config2.file_schema.as_ref().unwrap().clone();
1109 assert_eq!(schema2.fields().len(), 3);
1110 assert_eq!(schema2.field(0).name(), "c1");
1111 assert_eq!(schema2.field(1).name(), "c2");
1112 assert_eq!(schema2.field(2).name(), "c3");
1113
1114 let schema_4cols = Arc::new(Schema::new(vec![
1116 Field::new("c1", DataType::Utf8, true),
1117 Field::new("c2", DataType::Utf8, true),
1118 Field::new("c3", DataType::Utf8, true),
1119 Field::new("c4", DataType::Utf8, true),
1120 ]));
1121
1122 let config3 = ListingTableConfig::new_with_multi_paths(vec![
1123 table_path1.clone(),
1124 table_path2.clone(),
1125 ])
1126 .with_listing_options(options.clone())
1127 .with_schema(schema_4cols);
1128 let config3 = config3.infer_schema(&ctx.state()).await?;
1129 assert_eq!(config3.schema_source(), SchemaSource::Specified);
1130
1131 let schema3 = config3.file_schema.as_ref().unwrap().clone();
1133 assert_eq!(schema3.fields().len(), 4);
1134 assert_eq!(schema3.field(0).name(), "c1");
1135 assert_eq!(schema3.field(1).name(), "c2");
1136 assert_eq!(schema3.field(2).name(), "c3");
1137 assert_eq!(schema3.field(3).name(), "c4");
1138
1139 let config4 = ListingTableConfig::new_with_multi_paths(vec![
1141 table_path2.clone(),
1142 table_path1.clone(),
1143 ])
1144 .with_listing_options(options);
1145 let config4 = config4.infer_schema(&ctx.state()).await?;
1146
1147 let schema4 = config4.file_schema.as_ref().unwrap().clone();
1149 assert_eq!(schema4.fields().len(), 4);
1150 assert_eq!(schema4.field(0).name(), "c1");
1151 assert_eq!(schema4.field(1).name(), "c2");
1152 assert_eq!(schema4.field(2).name(), "c3");
1153 assert_eq!(schema4.field(3).name(), "c4");
1154
1155 Ok(())
1156 }
1157
1158 #[tokio::test]
1159 async fn test_list_files_configurations() -> Result<()> {
1160 let test_cases = vec![
1162 (
1164 "Single path, more partitions than files",
1165 generate_test_files("bucket/key-prefix", 5),
1166 vec!["test:///bucket/key-prefix/"],
1167 12,
1168 5,
1169 Some(""),
1170 ),
1171 (
1172 "Single path, equal partitions and files",
1173 generate_test_files("bucket/key-prefix", 4),
1174 vec!["test:///bucket/key-prefix/"],
1175 4,
1176 4,
1177 Some(""),
1178 ),
1179 (
1180 "Single path, more files than partitions",
1181 generate_test_files("bucket/key-prefix", 5),
1182 vec!["test:///bucket/key-prefix/"],
1183 2,
1184 2,
1185 Some(""),
1186 ),
1187 (
1189 "Multi path, more partitions than files",
1190 {
1191 let mut files = generate_test_files("bucket/key1", 3);
1192 files.extend(generate_test_files_with_start("bucket/key2", 2, 3));
1193 files.extend(generate_test_files_with_start("bucket/key3", 1, 5));
1194 files
1195 },
1196 vec!["test:///bucket/key1/", "test:///bucket/key2/"],
1197 12,
1198 5,
1199 Some(""),
1200 ),
1201 (
1203 "No files",
1204 vec![],
1205 vec!["test:///bucket/key-prefix/"],
1206 2,
1207 0,
1208 Some(""),
1209 ),
1210 (
1212 "Exact paths test",
1213 {
1214 let mut files = generate_test_files("bucket/key1", 3);
1215 files.extend(generate_test_files_with_start("bucket/key2", 2, 3));
1216 files
1217 },
1218 vec![
1219 "test:///bucket/key1/file0",
1220 "test:///bucket/key1/file1",
1221 "test:///bucket/key1/file2",
1222 "test:///bucket/key2/file3",
1223 "test:///bucket/key2/file4",
1224 ],
1225 12,
1226 5,
1227 Some(""),
1228 ),
1229 ];
1230
1231 for (test_name, files, paths, target_partitions, expected_partitions, file_ext) in
1233 test_cases
1234 {
1235 println!("Running test: {test_name}");
1236
1237 if files.is_empty() {
1238 assert_list_files_for_multi_paths(
1240 &[],
1241 &paths,
1242 target_partitions,
1243 expected_partitions,
1244 file_ext,
1245 )
1246 .await?;
1247 } else if paths.len() == 1 {
1248 let file_refs: Vec<&str> = files.iter().map(|s| s.as_str()).collect();
1250 assert_list_files_for_scan_grouping(
1251 &file_refs,
1252 paths[0],
1253 target_partitions,
1254 expected_partitions,
1255 file_ext,
1256 )
1257 .await?;
1258 } else if paths[0].contains("test:///bucket/key") {
1259 let file_refs: Vec<&str> = files.iter().map(|s| s.as_str()).collect();
1261 assert_list_files_for_multi_paths(
1262 &file_refs,
1263 &paths,
1264 target_partitions,
1265 expected_partitions,
1266 file_ext,
1267 )
1268 .await?;
1269 } else {
1270 let file_refs: Vec<&str> = files.iter().map(|s| s.as_str()).collect();
1272 assert_list_files_for_exact_paths(
1273 &file_refs,
1274 target_partitions,
1275 expected_partitions,
1276 file_ext,
1277 )
1278 .await?;
1279 }
1280 }
1281
1282 Ok(())
1283 }
1284
1285 #[tokio::test]
1286 async fn test_listing_table_prunes_extra_files_in_hive() -> Result<()> {
1287 let files = [
1288 "bucket/test/pid=1/file1",
1289 "bucket/test/pid=1/file2",
1290 "bucket/test/pid=2/file3",
1291 "bucket/test/pid=2/file4",
1292 "bucket/test/other/file5",
1293 ];
1294
1295 let ctx = SessionContext::new();
1296 register_test_store(&ctx, &files.iter().map(|f| (*f, 10)).collect::<Vec<_>>());
1297
1298 let opt = ListingOptions::new(Arc::new(JsonFormat::default()))
1299 .with_file_extension_opt(Some(""))
1300 .with_table_partition_cols(vec![("pid".to_string(), DataType::Int32)]);
1301
1302 let table_path = ListingTableUrl::parse("test:///bucket/test/").unwrap();
1303 let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
1304 let config = ListingTableConfig::new(table_path)
1305 .with_listing_options(opt)
1306 .with_schema(Arc::new(schema));
1307
1308 let table = ListingTable::try_new(config)?;
1309
1310 let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
1311 assert_eq!(file_list.len(), 1);
1312
1313 let files = file_list[0].clone();
1314
1315 assert_eq!(
1316 files
1317 .iter()
1318 .map(|f| f.path().to_string())
1319 .collect::<Vec<_>>(),
1320 vec![
1321 "bucket/test/pid=1/file1",
1322 "bucket/test/pid=1/file2",
1323 "bucket/test/pid=2/file3",
1324 "bucket/test/pid=2/file4",
1325 ]
1326 );
1327
1328 Ok(())
1329 }
1330
1331 #[cfg(feature = "parquet")]
1332 #[tokio::test]
1333 async fn test_table_stats_behaviors() -> Result<()> {
1334 use crate::datasource::file_format::parquet::ParquetFormat;
1335
1336 let testdata = crate::test_util::parquet_test_data();
1337 let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
1338 let table_path = ListingTableUrl::parse(filename)?;
1339
1340 let ctx = SessionContext::new();
1341 let state = ctx.state();
1342
1343 let opt_default = ListingOptions::new(Arc::new(ParquetFormat::default()));
1345 let schema_default = opt_default.infer_schema(&state, &table_path).await?;
1346 let config_default = ListingTableConfig::new(table_path.clone())
1347 .with_listing_options(opt_default)
1348 .with_schema(schema_default);
1349
1350 let table_default = ListingTable::try_new(config_default)?;
1351
1352 let exec_default = table_default.scan(&state, None, &[], None).await?;
1353 assert_eq!(
1354 exec_default.partition_statistics(None)?.num_rows,
1355 Precision::Absent
1356 );
1357
1358 assert_eq!(
1360 exec_default.partition_statistics(None)?.total_byte_size,
1361 Precision::Absent
1362 );
1363
1364 let opt_disabled = ListingOptions::new(Arc::new(ParquetFormat::default()))
1366 .with_collect_stat(false);
1367 let schema_disabled = opt_disabled.infer_schema(&state, &table_path).await?;
1368 let config_disabled = ListingTableConfig::new(table_path.clone())
1369 .with_listing_options(opt_disabled)
1370 .with_schema(schema_disabled);
1371 let table_disabled = ListingTable::try_new(config_disabled)?;
1372
1373 let exec_disabled = table_disabled.scan(&state, None, &[], None).await?;
1374 assert_eq!(
1375 exec_disabled.partition_statistics(None)?.num_rows,
1376 Precision::Absent
1377 );
1378 assert_eq!(
1379 exec_disabled.partition_statistics(None)?.total_byte_size,
1380 Precision::Absent
1381 );
1382
1383 let opt_enabled = ListingOptions::new(Arc::new(ParquetFormat::default()))
1385 .with_collect_stat(true);
1386 let schema_enabled = opt_enabled.infer_schema(&state, &table_path).await?;
1387 let config_enabled = ListingTableConfig::new(table_path)
1388 .with_listing_options(opt_enabled)
1389 .with_schema(schema_enabled);
1390 let table_enabled = ListingTable::try_new(config_enabled)?;
1391
1392 let exec_enabled = table_enabled.scan(&state, None, &[], None).await?;
1393 assert_eq!(
1394 exec_enabled.partition_statistics(None)?.num_rows,
1395 Precision::Exact(8)
1396 );
1397 assert_eq!(
1399 exec_enabled.partition_statistics(None)?.total_byte_size,
1400 Precision::Exact(671)
1401 );
1402
1403 Ok(())
1404 }
1405
1406 #[tokio::test]
1407 async fn test_insert_into_parameterized() -> Result<()> {
1408 let test_cases = vec![
1409 ("json", 10, 10, 2),
1411 ("csv", 10, 10, 2),
1412 #[cfg(feature = "parquet")]
1413 ("parquet", 10, 10, 2),
1414 #[cfg(feature = "parquet")]
1415 ("parquet", 20, 20, 1),
1416 ];
1417
1418 for (format, batch_size, soft_max_rows, expected_files) in test_cases {
1419 println!("Testing insert with format: {format}, batch_size: {batch_size}, expected files: {expected_files}");
1420
1421 let mut config_map = HashMap::new();
1422 config_map.insert(
1423 "datafusion.execution.batch_size".into(),
1424 batch_size.to_string(),
1425 );
1426 config_map.insert(
1427 "datafusion.execution.soft_max_rows_per_output_file".into(),
1428 soft_max_rows.to_string(),
1429 );
1430
1431 let file_extension = match format {
1432 "json" => JsonFormat::default().get_ext(),
1433 "csv" => CsvFormat::default().get_ext(),
1434 #[cfg(feature = "parquet")]
1435 "parquet" => ParquetFormat::default().get_ext(),
1436 _ => unreachable!("Unsupported format"),
1437 };
1438
1439 helper_test_append_new_files_to_table(
1440 file_extension,
1441 FileCompressionType::UNCOMPRESSED,
1442 Some(config_map),
1443 expected_files,
1444 )
1445 .await?;
1446 }
1447
1448 Ok(())
1449 }
1450
1451 #[tokio::test]
1452 async fn test_statistics_mapping_with_custom_factory() -> Result<()> {
1453 let ctx = SessionContext::new();
1454 let table = create_test_listing_table_with_json_and_adapter(
1455 &ctx,
1456 false,
1457 Arc::new(NullStatsAdapterFactory {}),
1459 )?;
1460
1461 let (groups, stats) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
1462
1463 assert_eq!(stats.column_statistics[0].null_count, DUMMY_NULL_COUNT);
1464 for g in groups {
1465 if let Some(s) = g.file_statistics(None) {
1466 assert_eq!(s.column_statistics[0].null_count, DUMMY_NULL_COUNT);
1467 }
1468 }
1469
1470 Ok(())
1471 }
1472
1473 #[tokio::test]
1474 async fn test_statistics_mapping_with_default_factory() -> Result<()> {
1475 let ctx = SessionContext::new();
1476
1477 let path = "table/file.json";
1480 register_test_store(&ctx, &[(path, 10)]);
1481
1482 let format = JsonFormat::default();
1483 let opt = ListingOptions::new(Arc::new(format)).with_collect_stat(false);
1484 let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
1485 let table_path = ListingTableUrl::parse("test:///table/")?;
1486
1487 let config = ListingTableConfig::new(table_path)
1488 .with_listing_options(opt)
1489 .with_schema(Arc::new(schema));
1490 let table = ListingTable::try_new(config)?;
1493
1494 assert!(table.schema_adapter_factory().is_none());
1496
1497 let scan_result = table.scan(&ctx.state(), None, &[], None).await;
1499 assert!(
1500 scan_result.is_ok(),
1501 "Scan should succeed with default schema adapter"
1502 );
1503
1504 let (groups, _stats) = table.list_files_for_scan(&ctx.state(), &[], None).await?;
1506 assert!(
1507 !groups.is_empty(),
1508 "Should list files successfully with default adapter"
1509 );
1510
1511 Ok(())
1512 }
1513
1514 #[rstest]
1515 #[case(MapSchemaError::TypeIncompatible, "Cannot map incompatible types")]
1516 #[case(MapSchemaError::GeneralFailure, "Schema adapter mapping failed")]
1517 #[case(
1518 MapSchemaError::InvalidProjection,
1519 "Invalid projection in schema mapping"
1520 )]
1521 #[tokio::test]
1522 async fn test_schema_adapter_map_schema_errors(
1523 #[case] error_type: MapSchemaError,
1524 #[case] expected_error_msg: &str,
1525 ) -> Result<()> {
1526 let ctx = SessionContext::new();
1527 let table = create_test_listing_table_with_json_and_adapter(
1528 &ctx,
1529 false,
1530 Arc::new(FailingMapSchemaAdapterFactory { error_type }),
1531 )?;
1532
1533 let scan_result = table.scan(&ctx.state(), None, &[], None).await;
1535
1536 assert!(scan_result.is_err());
1537 let error_msg = scan_result.unwrap_err().to_string();
1538 assert!(
1539 error_msg.contains(expected_error_msg),
1540 "Expected error containing '{expected_error_msg}', got: {error_msg}"
1541 );
1542
1543 Ok(())
1544 }
1545
1546 #[tokio::test]
1548 async fn test_schema_adapter_error_during_file_listing() -> Result<()> {
1549 let ctx = SessionContext::new();
1550 let table = create_test_listing_table_with_json_and_adapter(
1551 &ctx,
1552 true,
1553 Arc::new(FailingMapSchemaAdapterFactory {
1554 error_type: MapSchemaError::TypeIncompatible,
1555 }),
1556 )?;
1557
1558 let list_result = table.list_files_for_scan(&ctx.state(), &[], None).await;
1560
1561 assert!(list_result.is_err());
1562 let error_msg = list_result.unwrap_err().to_string();
1563 assert!(
1564 error_msg.contains("Cannot map incompatible types"),
1565 "Expected type incompatibility error during file listing, got: {error_msg}"
1566 );
1567
1568 Ok(())
1569 }
1570
1571 #[derive(Debug, Copy, Clone)]
1572 enum MapSchemaError {
1573 TypeIncompatible,
1574 GeneralFailure,
1575 InvalidProjection,
1576 }
1577
1578 #[derive(Debug)]
1579 struct FailingMapSchemaAdapterFactory {
1580 error_type: MapSchemaError,
1581 }
1582
1583 impl SchemaAdapterFactory for FailingMapSchemaAdapterFactory {
1584 fn create(
1585 &self,
1586 projected_table_schema: SchemaRef,
1587 _table_schema: SchemaRef,
1588 ) -> Box<dyn SchemaAdapter> {
1589 Box::new(FailingMapSchemaAdapter {
1590 schema: projected_table_schema,
1591 error_type: self.error_type,
1592 })
1593 }
1594 }
1595
1596 #[derive(Debug)]
1597 struct FailingMapSchemaAdapter {
1598 schema: SchemaRef,
1599 error_type: MapSchemaError,
1600 }
1601
1602 impl SchemaAdapter for FailingMapSchemaAdapter {
1603 fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
1604 let field = self.schema.field(index);
1605 file_schema.fields.find(field.name()).map(|(i, _)| i)
1606 }
1607
1608 fn map_schema(
1609 &self,
1610 _file_schema: &Schema,
1611 ) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
1612 match self.error_type {
1614 MapSchemaError::TypeIncompatible => {
1615 plan_err!(
1616 "Cannot map incompatible types: Boolean cannot be cast to Utf8"
1617 )
1618 }
1619 MapSchemaError::GeneralFailure => {
1620 plan_err!("Schema adapter mapping failed due to internal error")
1621 }
1622 MapSchemaError::InvalidProjection => {
1623 plan_err!("Invalid projection in schema mapping: column index out of bounds")
1624 }
1625 }
1626 }
1627 }
1628
1629 #[derive(Debug)]
1630 struct NullStatsAdapterFactory;
1631
1632 impl SchemaAdapterFactory for NullStatsAdapterFactory {
1633 fn create(
1634 &self,
1635 projected_table_schema: SchemaRef,
1636 _table_schema: SchemaRef,
1637 ) -> Box<dyn SchemaAdapter> {
1638 Box::new(NullStatsAdapter {
1639 schema: projected_table_schema,
1640 })
1641 }
1642 }
1643
1644 #[derive(Debug)]
1645 struct NullStatsAdapter {
1646 schema: SchemaRef,
1647 }
1648
1649 impl SchemaAdapter for NullStatsAdapter {
1650 fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option<usize> {
1651 let field = self.schema.field(index);
1652 file_schema.fields.find(field.name()).map(|(i, _)| i)
1653 }
1654
1655 fn map_schema(
1656 &self,
1657 file_schema: &Schema,
1658 ) -> Result<(Arc<dyn SchemaMapper>, Vec<usize>)> {
1659 let projection = (0..file_schema.fields().len()).collect();
1660 Ok((Arc::new(NullStatsMapper {}), projection))
1661 }
1662 }
1663
1664 #[derive(Debug)]
1665 struct NullStatsMapper;
1666
1667 impl SchemaMapper for NullStatsMapper {
1668 fn map_batch(&self, batch: RecordBatch) -> Result<RecordBatch> {
1669 Ok(batch)
1670 }
1671
1672 fn map_column_statistics(
1673 &self,
1674 stats: &[ColumnStatistics],
1675 ) -> Result<Vec<ColumnStatistics>> {
1676 Ok(stats
1677 .iter()
1678 .map(|s| {
1679 let mut s = s.clone();
1680 s.null_count = DUMMY_NULL_COUNT;
1681 s
1682 })
1683 .collect())
1684 }
1685 }
1686
1687 fn create_test_listing_table_with_json_and_adapter(
1689 ctx: &SessionContext,
1690 collect_stat: bool,
1691 schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
1692 ) -> Result<ListingTable> {
1693 let path = "table/file.json";
1694 register_test_store(ctx, &[(path, 10)]);
1695
1696 let format = JsonFormat::default();
1697 let opt = ListingOptions::new(Arc::new(format)).with_collect_stat(collect_stat);
1698 let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
1699 let table_path = ListingTableUrl::parse("test:///table/")?;
1700
1701 let config = ListingTableConfig::new(table_path)
1702 .with_listing_options(opt)
1703 .with_schema(Arc::new(schema))
1704 .with_schema_adapter_factory(schema_adapter_factory);
1705
1706 ListingTable::try_new(config)
1707 }
1708}