datafusion_catalog_listing/
config.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::options::ListingOptions;
19use arrow::datatypes::{DataType, Schema, SchemaRef};
20use datafusion_catalog::Session;
21use datafusion_common::{config_err, internal_err};
22use datafusion_datasource::file_compression_type::FileCompressionType;
23use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
24use datafusion_datasource::ListingTableUrl;
25use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
26use std::str::FromStr;
27use std::sync::Arc;
28
29/// Indicates the source of the schema for a [`crate::ListingTable`]
30// PartialEq required for assert_eq! in tests
31#[derive(Debug, Clone, Copy, PartialEq, Default)]
32pub enum SchemaSource {
33    /// Schema is not yet set (initial state)
34    #[default]
35    Unset,
36    /// Schema was inferred from first table_path
37    Inferred,
38    /// Schema was specified explicitly via with_schema
39    Specified,
40}
41
42/// Configuration for creating a [`crate::ListingTable`]
43///
44/// # Schema Evolution Support
45///
46/// This configuration supports schema evolution through the optional
47/// [`SchemaAdapterFactory`]. You might want to override the default factory when you need:
48///
49/// - **Type coercion requirements**: When you need custom logic for converting between
50///   different Arrow data types (e.g., Int32 ↔ Int64, Utf8 ↔ LargeUtf8)
51/// - **Column mapping**: You need to map columns with a legacy name to a new name
52/// - **Custom handling of missing columns**: By default they are filled in with nulls, but you may e.g. want to fill them in with `0` or `""`.
53///
54/// If not specified, a [`datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory`]
55/// will be used, which handles basic schema compatibility cases.
56#[derive(Debug, Clone, Default)]
57pub struct ListingTableConfig {
58    /// Paths on the `ObjectStore` for creating [`crate::ListingTable`].
59    /// They should share the same schema and object store.
60    pub table_paths: Vec<ListingTableUrl>,
61    /// Optional `SchemaRef` for the to be created [`crate::ListingTable`].
62    ///
63    /// See details on [`ListingTableConfig::with_schema`]
64    pub file_schema: Option<SchemaRef>,
65    /// Optional [`ListingOptions`] for the to be created [`crate::ListingTable`].
66    ///
67    /// See details on [`ListingTableConfig::with_listing_options`]
68    pub options: Option<ListingOptions>,
69    /// Tracks the source of the schema information
70    pub(crate) schema_source: SchemaSource,
71    /// Optional [`SchemaAdapterFactory`] for creating schema adapters
72    pub(crate) schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
73    /// Optional [`PhysicalExprAdapterFactory`] for creating physical expression adapters
74    pub(crate) expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
75}
76
77impl ListingTableConfig {
78    /// Creates new [`ListingTableConfig`] for reading the specified URL
79    pub fn new(table_path: ListingTableUrl) -> Self {
80        Self {
81            table_paths: vec![table_path],
82            ..Default::default()
83        }
84    }
85
86    /// Creates new [`ListingTableConfig`] with multiple table paths.
87    ///
88    ///  See `ListingTableConfigExt::infer_options` for details on what happens with multiple paths
89    pub fn new_with_multi_paths(table_paths: Vec<ListingTableUrl>) -> Self {
90        Self {
91            table_paths,
92            ..Default::default()
93        }
94    }
95
96    /// Returns the source of the schema for this configuration
97    pub fn schema_source(&self) -> SchemaSource {
98        self.schema_source
99    }
100    /// Set the `schema` for the overall [`crate::ListingTable`]
101    ///
102    /// [`crate::ListingTable`] will automatically coerce, when possible, the schema
103    /// for individual files to match this schema.
104    ///
105    /// If a schema is not provided, it is inferred using
106    /// [`Self::infer_schema`].
107    ///
108    /// If the schema is provided, it must contain only the fields in the file
109    /// without the table partitioning columns.
110    ///
111    /// # Example: Specifying Table Schema
112    /// ```rust
113    /// # use std::sync::Arc;
114    /// # use datafusion_catalog_listing::{ListingTableConfig, ListingOptions};
115    /// # use datafusion_datasource::ListingTableUrl;
116    /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
117    /// # use arrow::datatypes::{Schema, Field, DataType};
118    /// # let table_paths = ListingTableUrl::parse("file:///path/to/data").unwrap();
119    /// # let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()));
120    /// let schema = Arc::new(Schema::new(vec![
121    ///     Field::new("id", DataType::Int64, false),
122    ///     Field::new("name", DataType::Utf8, true),
123    /// ]));
124    ///
125    /// let config = ListingTableConfig::new(table_paths)
126    ///     .with_listing_options(listing_options)  // Set options first
127    ///     .with_schema(schema);                    // Then set schema
128    /// ```
129    pub fn with_schema(self, schema: SchemaRef) -> Self {
130        // Note: We preserve existing options state, but downstream code may expect
131        // options to be set. Consider calling with_listing_options() or infer_options()
132        // before operations that require options to be present.
133        debug_assert!(
134            self.options.is_some() || cfg!(test),
135            "ListingTableConfig::with_schema called without options set. \
136             Consider calling with_listing_options() or infer_options() first to avoid panics in downstream code."
137        );
138
139        Self {
140            file_schema: Some(schema),
141            schema_source: SchemaSource::Specified,
142            ..self
143        }
144    }
145
146    /// Add `listing_options` to [`ListingTableConfig`]
147    ///
148    /// If not provided, format and other options are inferred via
149    /// `ListingTableConfigExt::infer_options`.
150    ///
151    /// # Example: Configuring Parquet Files with Custom Options
152    /// ```rust
153    /// # use std::sync::Arc;
154    /// # use datafusion_catalog_listing::{ListingTableConfig, ListingOptions};
155    /// # use datafusion_datasource::ListingTableUrl;
156    /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
157    /// # let table_paths = ListingTableUrl::parse("file:///path/to/data").unwrap();
158    /// let options = ListingOptions::new(Arc::new(ParquetFormat::default()))
159    ///     .with_file_extension(".parquet")
160    ///     .with_collect_stat(true);
161    ///
162    /// let config = ListingTableConfig::new(table_paths).with_listing_options(options);
163    /// // Configure file format and options
164    /// ```
165    pub fn with_listing_options(self, listing_options: ListingOptions) -> Self {
166        // Note: This method properly sets options, but be aware that downstream
167        // methods like infer_schema() and try_new() require both schema and options
168        // to be set to function correctly.
169        debug_assert!(
170            !self.table_paths.is_empty() || cfg!(test),
171            "ListingTableConfig::with_listing_options called without table_paths set. \
172             Consider calling new() or new_with_multi_paths() first to establish table paths."
173        );
174
175        Self {
176            options: Some(listing_options),
177            ..self
178        }
179    }
180
181    /// Returns a tuple of `(file_extension, optional compression_extension)`
182    ///
183    /// For example a path ending with blah.test.csv.gz returns `("csv", Some("gz"))`
184    /// For example a path ending with blah.test.csv returns `("csv", None)`
185    pub fn infer_file_extension_and_compression_type(
186        path: &str,
187    ) -> datafusion_common::Result<(String, Option<String>)> {
188        let mut exts = path.rsplit('.');
189
190        let split = exts.next().unwrap_or("");
191
192        let file_compression_type = FileCompressionType::from_str(split)
193            .unwrap_or(FileCompressionType::UNCOMPRESSED);
194
195        if file_compression_type.is_compressed() {
196            let split2 = exts.next().unwrap_or("");
197            Ok((split2.to_string(), Some(split.to_string())))
198        } else {
199            Ok((split.to_string(), None))
200        }
201    }
202
203    /// Infer the [`SchemaRef`] based on `table_path`s.
204    ///
205    /// This method infers the table schema using the first `table_path`.
206    /// See [`ListingOptions::infer_schema`] for more details
207    ///
208    /// # Errors
209    /// * if `self.options` is not set. See [`Self::with_listing_options`]
210    pub async fn infer_schema(
211        self,
212        state: &dyn Session,
213    ) -> datafusion_common::Result<Self> {
214        match self.options {
215            Some(options) => {
216                let ListingTableConfig {
217                    table_paths,
218                    file_schema,
219                    options: _,
220                    schema_source,
221                    schema_adapter_factory,
222                    expr_adapter_factory: physical_expr_adapter_factory,
223                } = self;
224
225                let (schema, new_schema_source) = match file_schema {
226                    Some(schema) => (schema, schema_source), // Keep existing source if schema exists
227                    None => {
228                        if let Some(url) = table_paths.first() {
229                            (
230                                options.infer_schema(state, url).await?,
231                                SchemaSource::Inferred,
232                            )
233                        } else {
234                            (Arc::new(Schema::empty()), SchemaSource::Inferred)
235                        }
236                    }
237                };
238
239                Ok(Self {
240                    table_paths,
241                    file_schema: Some(schema),
242                    options: Some(options),
243                    schema_source: new_schema_source,
244                    schema_adapter_factory,
245                    expr_adapter_factory: physical_expr_adapter_factory,
246                })
247            }
248            None => internal_err!("No `ListingOptions` set for inferring schema"),
249        }
250    }
251
252    /// Infer the partition columns from `table_paths`.
253    ///
254    /// # Errors
255    /// * if `self.options` is not set. See [`Self::with_listing_options`]
256    pub async fn infer_partitions_from_path(
257        self,
258        state: &dyn Session,
259    ) -> datafusion_common::Result<Self> {
260        match self.options {
261            Some(options) => {
262                let Some(url) = self.table_paths.first() else {
263                    return config_err!("No table path found");
264                };
265                let partitions = options
266                    .infer_partitions(state, url)
267                    .await?
268                    .into_iter()
269                    .map(|col_name| {
270                        (
271                            col_name,
272                            DataType::Dictionary(
273                                Box::new(DataType::UInt16),
274                                Box::new(DataType::Utf8),
275                            ),
276                        )
277                    })
278                    .collect::<Vec<_>>();
279                let options = options.with_table_partition_cols(partitions);
280                Ok(Self {
281                    table_paths: self.table_paths,
282                    file_schema: self.file_schema,
283                    options: Some(options),
284                    schema_source: self.schema_source,
285                    schema_adapter_factory: self.schema_adapter_factory,
286                    expr_adapter_factory: self.expr_adapter_factory,
287                })
288            }
289            None => config_err!("No `ListingOptions` set for inferring schema"),
290        }
291    }
292
293    /// Set the [`SchemaAdapterFactory`] for the [`crate::ListingTable`]
294    ///
295    /// The schema adapter factory is used to create schema adapters that can
296    /// handle schema evolution and type conversions when reading files with
297    /// different schemas than the table schema.
298    ///
299    /// If not provided, a default schema adapter factory will be used.
300    ///
301    /// # Example: Custom Schema Adapter for Type Coercion
302    /// ```rust
303    /// # use std::sync::Arc;
304    /// # use datafusion_catalog_listing::{ListingTableConfig, ListingOptions};
305    /// # use datafusion_datasource::schema_adapter::{SchemaAdapterFactory, SchemaAdapter};
306    /// # use datafusion_datasource::ListingTableUrl;
307    /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
308    /// # use arrow::datatypes::{SchemaRef, Schema, Field, DataType};
309    /// #
310    /// # #[derive(Debug)]
311    /// # struct MySchemaAdapterFactory;
312    /// # impl SchemaAdapterFactory for MySchemaAdapterFactory {
313    /// #     fn create(&self, _projected_table_schema: SchemaRef, _file_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
314    /// #         unimplemented!()
315    /// #     }
316    /// # }
317    /// # let table_paths = ListingTableUrl::parse("file:///path/to/data").unwrap();
318    /// # let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()));
319    /// # let table_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
320    /// let config = ListingTableConfig::new(table_paths)
321    ///     .with_listing_options(listing_options)
322    ///     .with_schema(table_schema)
323    ///     .with_schema_adapter_factory(Arc::new(MySchemaAdapterFactory));
324    /// ```
325    pub fn with_schema_adapter_factory(
326        self,
327        schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
328    ) -> Self {
329        Self {
330            schema_adapter_factory: Some(schema_adapter_factory),
331            ..self
332        }
333    }
334
335    /// Get the [`SchemaAdapterFactory`] for this configuration
336    pub fn schema_adapter_factory(&self) -> Option<&Arc<dyn SchemaAdapterFactory>> {
337        self.schema_adapter_factory.as_ref()
338    }
339
340    /// Set the [`PhysicalExprAdapterFactory`] for the [`crate::ListingTable`]
341    ///
342    /// The expression adapter factory is used to create physical expression adapters that can
343    /// handle schema evolution and type conversions when evaluating expressions
344    /// with different schemas than the table schema.
345    ///
346    /// If not provided, a default physical expression adapter factory will be used unless a custom
347    /// `SchemaAdapterFactory` is set, in which case only the `SchemaAdapterFactory` will be used.
348    ///
349    /// See <https://github.com/apache/datafusion/issues/16800> for details on this transition.
350    pub fn with_expr_adapter_factory(
351        self,
352        expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory>,
353    ) -> Self {
354        Self {
355            expr_adapter_factory: Some(expr_adapter_factory),
356            ..self
357        }
358    }
359}