1use std::sync::Arc;
21
22#[cfg(feature = "avro")]
23use crate::datasource::file_format::avro::AvroFormat;
24
25#[cfg(feature = "parquet")]
26use crate::datasource::file_format::parquet::ParquetFormat;
27
28use crate::datasource::file_format::arrow::ArrowFormat;
29use crate::datasource::file_format::file_compression_type::FileCompressionType;
30use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
31use crate::datasource::listing::ListingTableUrl;
32use crate::datasource::{file_format::csv::CsvFormat, listing::ListingOptions};
33use crate::error::Result;
34use crate::execution::context::{SessionConfig, SessionState};
35
36use arrow::datatypes::{DataType, Schema, SchemaRef};
37use datafusion_common::config::{ConfigFileDecryptionProperties, TableOptions};
38use datafusion_common::{
39 DEFAULT_ARROW_EXTENSION, DEFAULT_AVRO_EXTENSION, DEFAULT_CSV_EXTENSION,
40 DEFAULT_JSON_EXTENSION, DEFAULT_PARQUET_EXTENSION,
41};
42
43use async_trait::async_trait;
44use datafusion_datasource_json::file_format::JsonFormat;
45use datafusion_expr::SortExpr;
46
47#[derive(Clone)]
54pub struct CsvReadOptions<'a> {
55 pub has_header: bool,
60 pub delimiter: u8,
62 pub quote: u8,
64 pub terminator: Option<u8>,
66 pub escape: Option<u8>,
68 pub comment: Option<u8>,
70 pub newlines_in_values: bool,
78 pub schema: Option<&'a Schema>,
81 pub schema_infer_max_records: usize,
83 pub file_extension: &'a str,
86 pub table_partition_cols: Vec<(String, DataType)>,
88 pub file_compression_type: FileCompressionType,
90 pub file_sort_order: Vec<Vec<SortExpr>>,
92 pub null_regex: Option<String>,
94 pub truncated_rows: bool,
99}
100
101impl Default for CsvReadOptions<'_> {
102 fn default() -> Self {
103 Self::new()
104 }
105}
106
107impl<'a> CsvReadOptions<'a> {
108 pub fn new() -> Self {
110 Self {
111 has_header: true,
112 schema: None,
113 schema_infer_max_records: DEFAULT_SCHEMA_INFER_MAX_RECORD,
114 delimiter: b',',
115 quote: b'"',
116 terminator: None,
117 escape: None,
118 newlines_in_values: false,
119 file_extension: DEFAULT_CSV_EXTENSION,
120 table_partition_cols: vec![],
121 file_compression_type: FileCompressionType::UNCOMPRESSED,
122 file_sort_order: vec![],
123 comment: None,
124 null_regex: None,
125 truncated_rows: false,
126 }
127 }
128
129 pub fn has_header(mut self, has_header: bool) -> Self {
131 self.has_header = has_header;
132 self
133 }
134
135 pub fn comment(mut self, comment: u8) -> Self {
137 self.comment = Some(comment);
138 self
139 }
140
141 pub fn delimiter(mut self, delimiter: u8) -> Self {
143 self.delimiter = delimiter;
144 self
145 }
146
147 pub fn quote(mut self, quote: u8) -> Self {
149 self.quote = quote;
150 self
151 }
152
153 pub fn terminator(mut self, terminator: Option<u8>) -> Self {
155 self.terminator = terminator;
156 self
157 }
158
159 pub fn escape(mut self, escape: u8) -> Self {
161 self.escape = Some(escape);
162 self
163 }
164
165 pub fn newlines_in_values(mut self, newlines_in_values: bool) -> Self {
173 self.newlines_in_values = newlines_in_values;
174 self
175 }
176
177 pub fn file_extension(mut self, file_extension: &'a str) -> Self {
179 self.file_extension = file_extension;
180 self
181 }
182
183 pub fn delimiter_option(mut self, delimiter: Option<u8>) -> Self {
185 if let Some(d) = delimiter {
186 self.delimiter = d;
187 }
188 self
189 }
190
191 pub fn schema(mut self, schema: &'a Schema) -> Self {
193 self.schema = Some(schema);
194 self
195 }
196
197 pub fn table_partition_cols(
199 mut self,
200 table_partition_cols: Vec<(String, DataType)>,
201 ) -> Self {
202 self.table_partition_cols = table_partition_cols;
203 self
204 }
205
206 pub fn schema_infer_max_records(mut self, max_records: usize) -> Self {
208 self.schema_infer_max_records = max_records;
209 self
210 }
211
212 pub fn file_compression_type(
214 mut self,
215 file_compression_type: FileCompressionType,
216 ) -> Self {
217 self.file_compression_type = file_compression_type;
218 self
219 }
220
221 pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<SortExpr>>) -> Self {
223 self.file_sort_order = file_sort_order;
224 self
225 }
226
227 pub fn null_regex(mut self, null_regex: Option<String>) -> Self {
229 self.null_regex = null_regex;
230 self
231 }
232
233 pub fn truncated_rows(mut self, truncated_rows: bool) -> Self {
238 self.truncated_rows = truncated_rows;
239 self
240 }
241}
242
243#[derive(Clone)]
250pub struct ParquetReadOptions<'a> {
251 pub file_extension: &'a str,
254 pub table_partition_cols: Vec<(String, DataType)>,
256 pub parquet_pruning: Option<bool>,
259 pub skip_metadata: Option<bool>,
265 pub schema: Option<&'a Schema>,
268 pub file_sort_order: Vec<Vec<SortExpr>>,
270 pub file_decryption_properties: Option<ConfigFileDecryptionProperties>,
272 pub metadata_size_hint: Option<usize>,
274}
275
276impl Default for ParquetReadOptions<'_> {
277 fn default() -> Self {
278 Self {
279 file_extension: DEFAULT_PARQUET_EXTENSION,
280 table_partition_cols: vec![],
281 parquet_pruning: None,
282 skip_metadata: None,
283 schema: None,
284 file_sort_order: vec![],
285 file_decryption_properties: None,
286 metadata_size_hint: None,
287 }
288 }
289}
290
291impl<'a> ParquetReadOptions<'a> {
292 pub fn new() -> Self {
294 Default::default()
295 }
296
297 pub fn file_extension(mut self, file_extension: &'a str) -> Self {
299 self.file_extension = file_extension;
300 self
301 }
302
303 pub fn parquet_pruning(mut self, parquet_pruning: bool) -> Self {
305 self.parquet_pruning = Some(parquet_pruning);
306 self
307 }
308
309 pub fn skip_metadata(mut self, skip_metadata: bool) -> Self {
313 self.skip_metadata = Some(skip_metadata);
314 self
315 }
316
317 pub fn schema(mut self, schema: &'a Schema) -> Self {
319 self.schema = Some(schema);
320 self
321 }
322
323 pub fn table_partition_cols(
325 mut self,
326 table_partition_cols: Vec<(String, DataType)>,
327 ) -> Self {
328 self.table_partition_cols = table_partition_cols;
329 self
330 }
331
332 pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<SortExpr>>) -> Self {
334 self.file_sort_order = file_sort_order;
335 self
336 }
337
338 pub fn file_decryption_properties(
340 mut self,
341 file_decryption_properties: ConfigFileDecryptionProperties,
342 ) -> Self {
343 self.file_decryption_properties = Some(file_decryption_properties);
344 self
345 }
346
347 pub fn metadata_size_hint(mut self, size_hint: Option<usize>) -> Self {
349 self.metadata_size_hint = size_hint;
350 self
351 }
352}
353
354#[derive(Clone)]
361pub struct ArrowReadOptions<'a> {
362 pub schema: Option<&'a Schema>,
364
365 pub file_extension: &'a str,
368
369 pub table_partition_cols: Vec<(String, DataType)>,
371}
372
373impl Default for ArrowReadOptions<'_> {
374 fn default() -> Self {
375 Self {
376 schema: None,
377 file_extension: DEFAULT_ARROW_EXTENSION,
378 table_partition_cols: vec![],
379 }
380 }
381}
382
383impl<'a> ArrowReadOptions<'a> {
384 pub fn table_partition_cols(
386 mut self,
387 table_partition_cols: Vec<(String, DataType)>,
388 ) -> Self {
389 self.table_partition_cols = table_partition_cols;
390 self
391 }
392
393 pub fn schema(mut self, schema: &'a Schema) -> Self {
395 self.schema = Some(schema);
396 self
397 }
398}
399
400#[derive(Clone)]
407pub struct AvroReadOptions<'a> {
408 pub schema: Option<&'a Schema>,
410
411 pub file_extension: &'a str,
414 pub table_partition_cols: Vec<(String, DataType)>,
416}
417
418impl Default for AvroReadOptions<'_> {
419 fn default() -> Self {
420 Self {
421 schema: None,
422 file_extension: DEFAULT_AVRO_EXTENSION,
423 table_partition_cols: vec![],
424 }
425 }
426}
427
428impl<'a> AvroReadOptions<'a> {
429 pub fn table_partition_cols(
431 mut self,
432 table_partition_cols: Vec<(String, DataType)>,
433 ) -> Self {
434 self.table_partition_cols = table_partition_cols;
435 self
436 }
437
438 pub fn schema(mut self, schema: &'a Schema) -> Self {
440 self.schema = Some(schema);
441 self
442 }
443}
444
445#[derive(Clone)]
452pub struct NdJsonReadOptions<'a> {
453 pub schema: Option<&'a Schema>,
455 pub schema_infer_max_records: usize,
457 pub file_extension: &'a str,
460 pub table_partition_cols: Vec<(String, DataType)>,
462 pub file_compression_type: FileCompressionType,
464 pub infinite: bool,
466 pub file_sort_order: Vec<Vec<SortExpr>>,
468}
469
470impl Default for NdJsonReadOptions<'_> {
471 fn default() -> Self {
472 Self {
473 schema: None,
474 schema_infer_max_records: DEFAULT_SCHEMA_INFER_MAX_RECORD,
475 file_extension: DEFAULT_JSON_EXTENSION,
476 table_partition_cols: vec![],
477 file_compression_type: FileCompressionType::UNCOMPRESSED,
478 infinite: false,
479 file_sort_order: vec![],
480 }
481 }
482}
483
484impl<'a> NdJsonReadOptions<'a> {
485 pub fn table_partition_cols(
487 mut self,
488 table_partition_cols: Vec<(String, DataType)>,
489 ) -> Self {
490 self.table_partition_cols = table_partition_cols;
491 self
492 }
493
494 pub fn file_extension(mut self, file_extension: &'a str) -> Self {
496 self.file_extension = file_extension;
497 self
498 }
499
500 pub fn mark_infinite(mut self, infinite: bool) -> Self {
502 self.infinite = infinite;
503 self
504 }
505
506 pub fn file_compression_type(
508 mut self,
509 file_compression_type: FileCompressionType,
510 ) -> Self {
511 self.file_compression_type = file_compression_type;
512 self
513 }
514
515 pub fn schema(mut self, schema: &'a Schema) -> Self {
517 self.schema = Some(schema);
518 self
519 }
520
521 pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<SortExpr>>) -> Self {
523 self.file_sort_order = file_sort_order;
524 self
525 }
526}
527
528#[async_trait]
529pub trait ReadOptions<'a> {
531 fn to_listing_options(
533 &self,
534 config: &SessionConfig,
535 table_options: TableOptions,
536 ) -> ListingOptions;
537
538 async fn get_resolved_schema(
540 &self,
541 config: &SessionConfig,
542 state: SessionState,
543 table_path: ListingTableUrl,
544 ) -> Result<SchemaRef>;
545
546 async fn _get_resolved_schema(
548 &'a self,
549 config: &SessionConfig,
550 state: SessionState,
551 table_path: ListingTableUrl,
552 schema: Option<&'a Schema>,
553 ) -> Result<SchemaRef>
554 where
555 'a: 'async_trait,
556 {
557 if let Some(s) = schema {
558 return Ok(Arc::new(s.to_owned()));
559 }
560
561 self.to_listing_options(config, state.default_table_options())
562 .infer_schema(&state, &table_path)
563 .await
564 }
565}
566
567#[async_trait]
568impl ReadOptions<'_> for CsvReadOptions<'_> {
569 fn to_listing_options(
570 &self,
571 config: &SessionConfig,
572 table_options: TableOptions,
573 ) -> ListingOptions {
574 let file_format = CsvFormat::default()
575 .with_options(table_options.csv)
576 .with_has_header(self.has_header)
577 .with_comment(self.comment)
578 .with_delimiter(self.delimiter)
579 .with_quote(self.quote)
580 .with_escape(self.escape)
581 .with_terminator(self.terminator)
582 .with_newlines_in_values(self.newlines_in_values)
583 .with_schema_infer_max_rec(self.schema_infer_max_records)
584 .with_file_compression_type(self.file_compression_type.to_owned())
585 .with_null_regex(self.null_regex.clone())
586 .with_truncated_rows(self.truncated_rows);
587
588 ListingOptions::new(Arc::new(file_format))
589 .with_file_extension(self.file_extension)
590 .with_session_config_options(config)
591 .with_table_partition_cols(self.table_partition_cols.clone())
592 .with_file_sort_order(self.file_sort_order.clone())
593 }
594
595 async fn get_resolved_schema(
596 &self,
597 config: &SessionConfig,
598 state: SessionState,
599 table_path: ListingTableUrl,
600 ) -> Result<SchemaRef> {
601 self._get_resolved_schema(config, state, table_path, self.schema)
602 .await
603 }
604}
605
606#[cfg(feature = "parquet")]
607#[async_trait]
608impl ReadOptions<'_> for ParquetReadOptions<'_> {
609 fn to_listing_options(
610 &self,
611 config: &SessionConfig,
612 table_options: TableOptions,
613 ) -> ListingOptions {
614 let mut options = table_options.parquet;
615 if let Some(file_decryption_properties) = &self.file_decryption_properties {
616 options.crypto.file_decryption = Some(file_decryption_properties.clone());
617 }
618 if let Some(metadata_size_hint) = self.metadata_size_hint {
620 options.global.metadata_size_hint = Some(metadata_size_hint);
621 }
622
623 let mut file_format = ParquetFormat::new().with_options(options);
624
625 if let Some(parquet_pruning) = self.parquet_pruning {
626 file_format = file_format.with_enable_pruning(parquet_pruning)
627 }
628 if let Some(skip_metadata) = self.skip_metadata {
629 file_format = file_format.with_skip_metadata(skip_metadata)
630 }
631
632 ListingOptions::new(Arc::new(file_format))
633 .with_file_extension(self.file_extension)
634 .with_table_partition_cols(self.table_partition_cols.clone())
635 .with_file_sort_order(self.file_sort_order.clone())
636 .with_session_config_options(config)
637 }
638
639 async fn get_resolved_schema(
640 &self,
641 config: &SessionConfig,
642 state: SessionState,
643 table_path: ListingTableUrl,
644 ) -> Result<SchemaRef> {
645 self._get_resolved_schema(config, state, table_path, self.schema)
646 .await
647 }
648}
649
650#[async_trait]
651impl ReadOptions<'_> for NdJsonReadOptions<'_> {
652 fn to_listing_options(
653 &self,
654 config: &SessionConfig,
655 table_options: TableOptions,
656 ) -> ListingOptions {
657 let file_format = JsonFormat::default()
658 .with_options(table_options.json)
659 .with_schema_infer_max_rec(self.schema_infer_max_records)
660 .with_file_compression_type(self.file_compression_type.to_owned());
661
662 ListingOptions::new(Arc::new(file_format))
663 .with_file_extension(self.file_extension)
664 .with_session_config_options(config)
665 .with_table_partition_cols(self.table_partition_cols.clone())
666 .with_file_sort_order(self.file_sort_order.clone())
667 }
668
669 async fn get_resolved_schema(
670 &self,
671 config: &SessionConfig,
672 state: SessionState,
673 table_path: ListingTableUrl,
674 ) -> Result<SchemaRef> {
675 self._get_resolved_schema(config, state, table_path, self.schema)
676 .await
677 }
678}
679
680#[cfg(feature = "avro")]
681#[async_trait]
682impl ReadOptions<'_> for AvroReadOptions<'_> {
683 fn to_listing_options(
684 &self,
685 config: &SessionConfig,
686 _table_options: TableOptions,
687 ) -> ListingOptions {
688 let file_format = AvroFormat;
689
690 ListingOptions::new(Arc::new(file_format))
691 .with_file_extension(self.file_extension)
692 .with_session_config_options(config)
693 .with_table_partition_cols(self.table_partition_cols.clone())
694 }
695
696 async fn get_resolved_schema(
697 &self,
698 config: &SessionConfig,
699 state: SessionState,
700 table_path: ListingTableUrl,
701 ) -> Result<SchemaRef> {
702 self._get_resolved_schema(config, state, table_path, self.schema)
703 .await
704 }
705}
706
707#[async_trait]
708impl ReadOptions<'_> for ArrowReadOptions<'_> {
709 fn to_listing_options(
710 &self,
711 config: &SessionConfig,
712 _table_options: TableOptions,
713 ) -> ListingOptions {
714 let file_format = ArrowFormat;
715
716 ListingOptions::new(Arc::new(file_format))
717 .with_file_extension(self.file_extension)
718 .with_session_config_options(config)
719 .with_table_partition_cols(self.table_partition_cols.clone())
720 }
721
722 async fn get_resolved_schema(
723 &self,
724 config: &SessionConfig,
725 state: SessionState,
726 table_path: ListingTableUrl,
727 ) -> Result<SchemaRef> {
728 self._get_resolved_schema(config, state, table_path, self.schema)
729 .await
730 }
731}