datafusion/datasource/file_format/
options.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! User facing options for the file formats readers
19
20use std::sync::Arc;
21
22#[cfg(feature = "avro")]
23use crate::datasource::file_format::avro::AvroFormat;
24
25#[cfg(feature = "parquet")]
26use crate::datasource::file_format::parquet::ParquetFormat;
27
28use crate::datasource::file_format::arrow::ArrowFormat;
29use crate::datasource::file_format::file_compression_type::FileCompressionType;
30use crate::datasource::file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD;
31use crate::datasource::listing::ListingTableUrl;
32use crate::datasource::{file_format::csv::CsvFormat, listing::ListingOptions};
33use crate::error::Result;
34use crate::execution::context::{SessionConfig, SessionState};
35
36use arrow::datatypes::{DataType, Schema, SchemaRef};
37use datafusion_common::config::{ConfigFileDecryptionProperties, TableOptions};
38use datafusion_common::{
39    DEFAULT_ARROW_EXTENSION, DEFAULT_AVRO_EXTENSION, DEFAULT_CSV_EXTENSION,
40    DEFAULT_JSON_EXTENSION, DEFAULT_PARQUET_EXTENSION,
41};
42
43use async_trait::async_trait;
44use datafusion_datasource_json::file_format::JsonFormat;
45use datafusion_expr::SortExpr;
46
47/// Options that control the reading of CSV files.
48///
49/// Note this structure is supplied when a datasource is created and
50/// can not not vary from statement to statement. For settings that
51/// can vary statement to statement see
52/// [`ConfigOptions`](crate::config::ConfigOptions).
53#[derive(Clone)]
54pub struct CsvReadOptions<'a> {
55    /// Does the CSV file have a header?
56    ///
57    /// If schema inference is run on a file with no headers, default column names
58    /// are created.
59    pub has_header: bool,
60    /// An optional column delimiter. Defaults to `b','`.
61    pub delimiter: u8,
62    /// An optional quote character. Defaults to `b'"'`.
63    pub quote: u8,
64    /// An optional terminator character. Defaults to None (CRLF).
65    pub terminator: Option<u8>,
66    /// An optional escape character. Defaults to None.
67    pub escape: Option<u8>,
68    /// If enabled, lines beginning with this byte are ignored.
69    pub comment: Option<u8>,
70    /// Specifies whether newlines in (quoted) values are supported.
71    ///
72    /// Parsing newlines in quoted values may be affected by execution behaviour such as
73    /// parallel file scanning. Setting this to `true` ensures that newlines in values are
74    /// parsed successfully, which may reduce performance.
75    ///
76    /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
77    pub newlines_in_values: bool,
78    /// An optional schema representing the CSV files. If None, CSV reader will try to infer it
79    /// based on data in file.
80    pub schema: Option<&'a Schema>,
81    /// Max number of rows to read from CSV files for schema inference if needed. Defaults to `DEFAULT_SCHEMA_INFER_MAX_RECORD`.
82    pub schema_infer_max_records: usize,
83    /// File extension; only files with this extension are selected for data input.
84    /// Defaults to `FileType::CSV.get_ext().as_str()`.
85    pub file_extension: &'a str,
86    /// Partition Columns
87    pub table_partition_cols: Vec<(String, DataType)>,
88    /// File compression type
89    pub file_compression_type: FileCompressionType,
90    /// Indicates how the file is sorted
91    pub file_sort_order: Vec<Vec<SortExpr>>,
92    /// Optional regex to match null values
93    pub null_regex: Option<String>,
94    /// Whether to allow truncated rows when parsing.
95    /// By default this is set to false and will error if the CSV rows have different lengths.
96    /// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls.
97    /// If the record’s schema is not nullable, then it will still return an error.
98    pub truncated_rows: bool,
99}
100
101impl Default for CsvReadOptions<'_> {
102    fn default() -> Self {
103        Self::new()
104    }
105}
106
107impl<'a> CsvReadOptions<'a> {
108    /// Create a CSV read option with default presets
109    pub fn new() -> Self {
110        Self {
111            has_header: true,
112            schema: None,
113            schema_infer_max_records: DEFAULT_SCHEMA_INFER_MAX_RECORD,
114            delimiter: b',',
115            quote: b'"',
116            terminator: None,
117            escape: None,
118            newlines_in_values: false,
119            file_extension: DEFAULT_CSV_EXTENSION,
120            table_partition_cols: vec![],
121            file_compression_type: FileCompressionType::UNCOMPRESSED,
122            file_sort_order: vec![],
123            comment: None,
124            null_regex: None,
125            truncated_rows: false,
126        }
127    }
128
129    /// Configure has_header setting
130    pub fn has_header(mut self, has_header: bool) -> Self {
131        self.has_header = has_header;
132        self
133    }
134
135    /// Specify comment char to use for CSV read
136    pub fn comment(mut self, comment: u8) -> Self {
137        self.comment = Some(comment);
138        self
139    }
140
141    /// Specify delimiter to use for CSV read
142    pub fn delimiter(mut self, delimiter: u8) -> Self {
143        self.delimiter = delimiter;
144        self
145    }
146
147    /// Specify quote to use for CSV read
148    pub fn quote(mut self, quote: u8) -> Self {
149        self.quote = quote;
150        self
151    }
152
153    /// Specify terminator to use for CSV read
154    pub fn terminator(mut self, terminator: Option<u8>) -> Self {
155        self.terminator = terminator;
156        self
157    }
158
159    /// Specify delimiter to use for CSV read
160    pub fn escape(mut self, escape: u8) -> Self {
161        self.escape = Some(escape);
162        self
163    }
164
165    /// Specifies whether newlines in (quoted) values are supported.
166    ///
167    /// Parsing newlines in quoted values may be affected by execution behaviour such as
168    /// parallel file scanning. Setting this to `true` ensures that newlines in values are
169    /// parsed successfully, which may reduce performance.
170    ///
171    /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
172    pub fn newlines_in_values(mut self, newlines_in_values: bool) -> Self {
173        self.newlines_in_values = newlines_in_values;
174        self
175    }
176
177    /// Specify the file extension for CSV file selection
178    pub fn file_extension(mut self, file_extension: &'a str) -> Self {
179        self.file_extension = file_extension;
180        self
181    }
182
183    /// Configure delimiter setting with Option, None value will be ignored
184    pub fn delimiter_option(mut self, delimiter: Option<u8>) -> Self {
185        if let Some(d) = delimiter {
186            self.delimiter = d;
187        }
188        self
189    }
190
191    /// Specify schema to use for CSV read
192    pub fn schema(mut self, schema: &'a Schema) -> Self {
193        self.schema = Some(schema);
194        self
195    }
196
197    /// Specify table_partition_cols for partition pruning
198    pub fn table_partition_cols(
199        mut self,
200        table_partition_cols: Vec<(String, DataType)>,
201    ) -> Self {
202        self.table_partition_cols = table_partition_cols;
203        self
204    }
205
206    /// Configure number of max records to read for schema inference
207    pub fn schema_infer_max_records(mut self, max_records: usize) -> Self {
208        self.schema_infer_max_records = max_records;
209        self
210    }
211
212    /// Configure file compression type
213    pub fn file_compression_type(
214        mut self,
215        file_compression_type: FileCompressionType,
216    ) -> Self {
217        self.file_compression_type = file_compression_type;
218        self
219    }
220
221    /// Configure if file has known sort order
222    pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<SortExpr>>) -> Self {
223        self.file_sort_order = file_sort_order;
224        self
225    }
226
227    /// Configure the null parsing regex.
228    pub fn null_regex(mut self, null_regex: Option<String>) -> Self {
229        self.null_regex = null_regex;
230        self
231    }
232
233    /// Configure whether to allow truncated rows when parsing.
234    /// By default this is set to false and will error if the CSV rows have different lengths
235    /// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls.
236    /// If the record’s schema is not nullable, then it will still return an error.
237    pub fn truncated_rows(mut self, truncated_rows: bool) -> Self {
238        self.truncated_rows = truncated_rows;
239        self
240    }
241}
242
243/// Options that control the reading of Parquet files.
244///
245/// Note this structure is supplied when a datasource is created and
246/// can not not vary from statement to statement. For settings that
247/// can vary statement to statement see
248/// [`ConfigOptions`](crate::config::ConfigOptions).
249#[derive(Clone)]
250pub struct ParquetReadOptions<'a> {
251    /// File extension; only files with this extension are selected for data input.
252    /// Defaults to ".parquet".
253    pub file_extension: &'a str,
254    /// Partition Columns
255    pub table_partition_cols: Vec<(String, DataType)>,
256    /// Should the parquet reader use the predicate to prune row groups?
257    /// If None, uses value in SessionConfig
258    pub parquet_pruning: Option<bool>,
259    /// Should the parquet reader to skip any metadata that may be in
260    /// the file Schema? This can help avoid schema conflicts due to
261    /// metadata.
262    ///
263    /// If None specified, uses value in SessionConfig
264    pub skip_metadata: Option<bool>,
265    /// An optional schema representing the parquet files. If None, parquet reader will try to infer it
266    /// based on data in file.
267    pub schema: Option<&'a Schema>,
268    /// Indicates how the file is sorted
269    pub file_sort_order: Vec<Vec<SortExpr>>,
270    /// Properties for decryption of Parquet files that use modular encryption
271    pub file_decryption_properties: Option<ConfigFileDecryptionProperties>,
272    /// Metadata size hint for Parquet files reading (in bytes)
273    pub metadata_size_hint: Option<usize>,
274}
275
276impl Default for ParquetReadOptions<'_> {
277    fn default() -> Self {
278        Self {
279            file_extension: DEFAULT_PARQUET_EXTENSION,
280            table_partition_cols: vec![],
281            parquet_pruning: None,
282            skip_metadata: None,
283            schema: None,
284            file_sort_order: vec![],
285            file_decryption_properties: None,
286            metadata_size_hint: None,
287        }
288    }
289}
290
291impl<'a> ParquetReadOptions<'a> {
292    /// Create a new ParquetReadOptions with default values
293    pub fn new() -> Self {
294        Default::default()
295    }
296
297    /// Specify file_extension
298    pub fn file_extension(mut self, file_extension: &'a str) -> Self {
299        self.file_extension = file_extension;
300        self
301    }
302
303    /// Specify parquet_pruning
304    pub fn parquet_pruning(mut self, parquet_pruning: bool) -> Self {
305        self.parquet_pruning = Some(parquet_pruning);
306        self
307    }
308
309    /// Tell the parquet reader to skip any metadata that may be in
310    /// the file Schema. This can help avoid schema conflicts due to
311    /// metadata.  Defaults to true.
312    pub fn skip_metadata(mut self, skip_metadata: bool) -> Self {
313        self.skip_metadata = Some(skip_metadata);
314        self
315    }
316
317    /// Specify schema to use for parquet read
318    pub fn schema(mut self, schema: &'a Schema) -> Self {
319        self.schema = Some(schema);
320        self
321    }
322
323    /// Specify table_partition_cols for partition pruning
324    pub fn table_partition_cols(
325        mut self,
326        table_partition_cols: Vec<(String, DataType)>,
327    ) -> Self {
328        self.table_partition_cols = table_partition_cols;
329        self
330    }
331
332    /// Configure if file has known sort order
333    pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<SortExpr>>) -> Self {
334        self.file_sort_order = file_sort_order;
335        self
336    }
337
338    /// Configure file decryption properties for reading encrypted Parquet files
339    pub fn file_decryption_properties(
340        mut self,
341        file_decryption_properties: ConfigFileDecryptionProperties,
342    ) -> Self {
343        self.file_decryption_properties = Some(file_decryption_properties);
344        self
345    }
346
347    /// Configure metadata size hint for Parquet files reading (in bytes)
348    pub fn metadata_size_hint(mut self, size_hint: Option<usize>) -> Self {
349        self.metadata_size_hint = size_hint;
350        self
351    }
352}
353
354/// Options that control the reading of ARROW files.
355///
356/// Note this structure is supplied when a datasource is created and
357/// can not not vary from statement to statement. For settings that
358/// can vary statement to statement see
359/// [`ConfigOptions`](crate::config::ConfigOptions).
360#[derive(Clone)]
361pub struct ArrowReadOptions<'a> {
362    /// The data source schema.
363    pub schema: Option<&'a Schema>,
364
365    /// File extension; only files with this extension are selected for data input.
366    /// Defaults to `FileType::ARROW.get_ext().as_str()`.
367    pub file_extension: &'a str,
368
369    /// Partition Columns
370    pub table_partition_cols: Vec<(String, DataType)>,
371}
372
373impl Default for ArrowReadOptions<'_> {
374    fn default() -> Self {
375        Self {
376            schema: None,
377            file_extension: DEFAULT_ARROW_EXTENSION,
378            table_partition_cols: vec![],
379        }
380    }
381}
382
383impl<'a> ArrowReadOptions<'a> {
384    /// Specify table_partition_cols for partition pruning
385    pub fn table_partition_cols(
386        mut self,
387        table_partition_cols: Vec<(String, DataType)>,
388    ) -> Self {
389        self.table_partition_cols = table_partition_cols;
390        self
391    }
392
393    /// Specify schema to use for AVRO read
394    pub fn schema(mut self, schema: &'a Schema) -> Self {
395        self.schema = Some(schema);
396        self
397    }
398}
399
400/// Options that control the reading of AVRO files.
401///
402/// Note this structure is supplied when a datasource is created and
403/// can not not vary from statement to statement. For settings that
404/// can vary statement to statement see
405/// [`ConfigOptions`](crate::config::ConfigOptions).
406#[derive(Clone)]
407pub struct AvroReadOptions<'a> {
408    /// The data source schema.
409    pub schema: Option<&'a Schema>,
410
411    /// File extension; only files with this extension are selected for data input.
412    /// Defaults to `FileType::AVRO.get_ext().as_str()`.
413    pub file_extension: &'a str,
414    /// Partition Columns
415    pub table_partition_cols: Vec<(String, DataType)>,
416}
417
418impl Default for AvroReadOptions<'_> {
419    fn default() -> Self {
420        Self {
421            schema: None,
422            file_extension: DEFAULT_AVRO_EXTENSION,
423            table_partition_cols: vec![],
424        }
425    }
426}
427
428impl<'a> AvroReadOptions<'a> {
429    /// Specify table_partition_cols for partition pruning
430    pub fn table_partition_cols(
431        mut self,
432        table_partition_cols: Vec<(String, DataType)>,
433    ) -> Self {
434        self.table_partition_cols = table_partition_cols;
435        self
436    }
437
438    /// Specify schema to use for AVRO read
439    pub fn schema(mut self, schema: &'a Schema) -> Self {
440        self.schema = Some(schema);
441        self
442    }
443}
444
445/// Options that control the reading of Line-delimited JSON files (NDJson)
446///
447/// Note this structure is supplied when a datasource is created and
448/// can not not vary from statement to statement. For settings that
449/// can vary statement to statement see
450/// [`ConfigOptions`](crate::config::ConfigOptions).
451#[derive(Clone)]
452pub struct NdJsonReadOptions<'a> {
453    /// The data source schema.
454    pub schema: Option<&'a Schema>,
455    /// Max number of rows to read from JSON files for schema inference if needed. Defaults to `DEFAULT_SCHEMA_INFER_MAX_RECORD`.
456    pub schema_infer_max_records: usize,
457    /// File extension; only files with this extension are selected for data input.
458    /// Defaults to `FileType::JSON.get_ext().as_str()`.
459    pub file_extension: &'a str,
460    /// Partition Columns
461    pub table_partition_cols: Vec<(String, DataType)>,
462    /// File compression type
463    pub file_compression_type: FileCompressionType,
464    /// Flag indicating whether this file may be unbounded (as in a FIFO file).
465    pub infinite: bool,
466    /// Indicates how the file is sorted
467    pub file_sort_order: Vec<Vec<SortExpr>>,
468}
469
470impl Default for NdJsonReadOptions<'_> {
471    fn default() -> Self {
472        Self {
473            schema: None,
474            schema_infer_max_records: DEFAULT_SCHEMA_INFER_MAX_RECORD,
475            file_extension: DEFAULT_JSON_EXTENSION,
476            table_partition_cols: vec![],
477            file_compression_type: FileCompressionType::UNCOMPRESSED,
478            infinite: false,
479            file_sort_order: vec![],
480        }
481    }
482}
483
484impl<'a> NdJsonReadOptions<'a> {
485    /// Specify table_partition_cols for partition pruning
486    pub fn table_partition_cols(
487        mut self,
488        table_partition_cols: Vec<(String, DataType)>,
489    ) -> Self {
490        self.table_partition_cols = table_partition_cols;
491        self
492    }
493
494    /// Specify file_extension
495    pub fn file_extension(mut self, file_extension: &'a str) -> Self {
496        self.file_extension = file_extension;
497        self
498    }
499
500    /// Configure mark_infinite setting
501    pub fn mark_infinite(mut self, infinite: bool) -> Self {
502        self.infinite = infinite;
503        self
504    }
505
506    /// Specify file_compression_type
507    pub fn file_compression_type(
508        mut self,
509        file_compression_type: FileCompressionType,
510    ) -> Self {
511        self.file_compression_type = file_compression_type;
512        self
513    }
514
515    /// Specify schema to use for NdJson read
516    pub fn schema(mut self, schema: &'a Schema) -> Self {
517        self.schema = Some(schema);
518        self
519    }
520
521    /// Configure if file has known sort order
522    pub fn file_sort_order(mut self, file_sort_order: Vec<Vec<SortExpr>>) -> Self {
523        self.file_sort_order = file_sort_order;
524        self
525    }
526}
527
528#[async_trait]
529/// ['ReadOptions'] is implemented by Options like ['CsvReadOptions'] that control the reading of respective files/sources.
530pub trait ReadOptions<'a> {
531    /// Helper to convert these user facing options to `ListingTable` options
532    fn to_listing_options(
533        &self,
534        config: &SessionConfig,
535        table_options: TableOptions,
536    ) -> ListingOptions;
537
538    /// Infer and resolve the schema from the files/sources provided.
539    async fn get_resolved_schema(
540        &self,
541        config: &SessionConfig,
542        state: SessionState,
543        table_path: ListingTableUrl,
544    ) -> Result<SchemaRef>;
545
546    /// helper function to reduce repetitive code. Infers the schema from sources if not provided. Infinite data sources not supported through this function.
547    async fn _get_resolved_schema(
548        &'a self,
549        config: &SessionConfig,
550        state: SessionState,
551        table_path: ListingTableUrl,
552        schema: Option<&'a Schema>,
553    ) -> Result<SchemaRef>
554    where
555        'a: 'async_trait,
556    {
557        if let Some(s) = schema {
558            return Ok(Arc::new(s.to_owned()));
559        }
560
561        self.to_listing_options(config, state.default_table_options())
562            .infer_schema(&state, &table_path)
563            .await
564    }
565}
566
567#[async_trait]
568impl ReadOptions<'_> for CsvReadOptions<'_> {
569    fn to_listing_options(
570        &self,
571        config: &SessionConfig,
572        table_options: TableOptions,
573    ) -> ListingOptions {
574        let file_format = CsvFormat::default()
575            .with_options(table_options.csv)
576            .with_has_header(self.has_header)
577            .with_comment(self.comment)
578            .with_delimiter(self.delimiter)
579            .with_quote(self.quote)
580            .with_escape(self.escape)
581            .with_terminator(self.terminator)
582            .with_newlines_in_values(self.newlines_in_values)
583            .with_schema_infer_max_rec(self.schema_infer_max_records)
584            .with_file_compression_type(self.file_compression_type.to_owned())
585            .with_null_regex(self.null_regex.clone())
586            .with_truncated_rows(self.truncated_rows);
587
588        ListingOptions::new(Arc::new(file_format))
589            .with_file_extension(self.file_extension)
590            .with_session_config_options(config)
591            .with_table_partition_cols(self.table_partition_cols.clone())
592            .with_file_sort_order(self.file_sort_order.clone())
593    }
594
595    async fn get_resolved_schema(
596        &self,
597        config: &SessionConfig,
598        state: SessionState,
599        table_path: ListingTableUrl,
600    ) -> Result<SchemaRef> {
601        self._get_resolved_schema(config, state, table_path, self.schema)
602            .await
603    }
604}
605
606#[cfg(feature = "parquet")]
607#[async_trait]
608impl ReadOptions<'_> for ParquetReadOptions<'_> {
609    fn to_listing_options(
610        &self,
611        config: &SessionConfig,
612        table_options: TableOptions,
613    ) -> ListingOptions {
614        let mut options = table_options.parquet;
615        if let Some(file_decryption_properties) = &self.file_decryption_properties {
616            options.crypto.file_decryption = Some(file_decryption_properties.clone());
617        }
618        // This can be overridden per-read in ParquetReadOptions, if setting.
619        if let Some(metadata_size_hint) = self.metadata_size_hint {
620            options.global.metadata_size_hint = Some(metadata_size_hint);
621        }
622
623        let mut file_format = ParquetFormat::new().with_options(options);
624
625        if let Some(parquet_pruning) = self.parquet_pruning {
626            file_format = file_format.with_enable_pruning(parquet_pruning)
627        }
628        if let Some(skip_metadata) = self.skip_metadata {
629            file_format = file_format.with_skip_metadata(skip_metadata)
630        }
631
632        ListingOptions::new(Arc::new(file_format))
633            .with_file_extension(self.file_extension)
634            .with_table_partition_cols(self.table_partition_cols.clone())
635            .with_file_sort_order(self.file_sort_order.clone())
636            .with_session_config_options(config)
637    }
638
639    async fn get_resolved_schema(
640        &self,
641        config: &SessionConfig,
642        state: SessionState,
643        table_path: ListingTableUrl,
644    ) -> Result<SchemaRef> {
645        self._get_resolved_schema(config, state, table_path, self.schema)
646            .await
647    }
648}
649
650#[async_trait]
651impl ReadOptions<'_> for NdJsonReadOptions<'_> {
652    fn to_listing_options(
653        &self,
654        config: &SessionConfig,
655        table_options: TableOptions,
656    ) -> ListingOptions {
657        let file_format = JsonFormat::default()
658            .with_options(table_options.json)
659            .with_schema_infer_max_rec(self.schema_infer_max_records)
660            .with_file_compression_type(self.file_compression_type.to_owned());
661
662        ListingOptions::new(Arc::new(file_format))
663            .with_file_extension(self.file_extension)
664            .with_session_config_options(config)
665            .with_table_partition_cols(self.table_partition_cols.clone())
666            .with_file_sort_order(self.file_sort_order.clone())
667    }
668
669    async fn get_resolved_schema(
670        &self,
671        config: &SessionConfig,
672        state: SessionState,
673        table_path: ListingTableUrl,
674    ) -> Result<SchemaRef> {
675        self._get_resolved_schema(config, state, table_path, self.schema)
676            .await
677    }
678}
679
680#[cfg(feature = "avro")]
681#[async_trait]
682impl ReadOptions<'_> for AvroReadOptions<'_> {
683    fn to_listing_options(
684        &self,
685        config: &SessionConfig,
686        _table_options: TableOptions,
687    ) -> ListingOptions {
688        let file_format = AvroFormat;
689
690        ListingOptions::new(Arc::new(file_format))
691            .with_file_extension(self.file_extension)
692            .with_session_config_options(config)
693            .with_table_partition_cols(self.table_partition_cols.clone())
694    }
695
696    async fn get_resolved_schema(
697        &self,
698        config: &SessionConfig,
699        state: SessionState,
700        table_path: ListingTableUrl,
701    ) -> Result<SchemaRef> {
702        self._get_resolved_schema(config, state, table_path, self.schema)
703            .await
704    }
705}
706
707#[async_trait]
708impl ReadOptions<'_> for ArrowReadOptions<'_> {
709    fn to_listing_options(
710        &self,
711        config: &SessionConfig,
712        _table_options: TableOptions,
713    ) -> ListingOptions {
714        let file_format = ArrowFormat;
715
716        ListingOptions::new(Arc::new(file_format))
717            .with_file_extension(self.file_extension)
718            .with_session_config_options(config)
719            .with_table_partition_cols(self.table_partition_cols.clone())
720    }
721
722    async fn get_resolved_schema(
723        &self,
724        config: &SessionConfig,
725        state: SessionState,
726        table_path: ListingTableUrl,
727    ) -> Result<SchemaRef> {
728        self._get_resolved_schema(config, state, table_path, self.schema)
729            .await
730    }
731}