datafusion_catalog_listing/config.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::options::ListingOptions;
19use arrow::datatypes::{DataType, Schema, SchemaRef};
20use datafusion_catalog::Session;
21use datafusion_common::{config_err, internal_err};
22use datafusion_datasource::file_compression_type::FileCompressionType;
23use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
24use datafusion_datasource::ListingTableUrl;
25use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
26use std::str::FromStr;
27use std::sync::Arc;
28
29/// Indicates the source of the schema for a [`crate::ListingTable`]
30// PartialEq required for assert_eq! in tests
31#[derive(Debug, Clone, Copy, PartialEq, Default)]
32pub enum SchemaSource {
33 /// Schema is not yet set (initial state)
34 #[default]
35 Unset,
36 /// Schema was inferred from first table_path
37 Inferred,
38 /// Schema was specified explicitly via with_schema
39 Specified,
40}
41
42/// Configuration for creating a [`crate::ListingTable`]
43///
44/// # Schema Evolution Support
45///
46/// This configuration supports schema evolution through the optional
47/// [`SchemaAdapterFactory`]. You might want to override the default factory when you need:
48///
49/// - **Type coercion requirements**: When you need custom logic for converting between
50/// different Arrow data types (e.g., Int32 ↔ Int64, Utf8 ↔ LargeUtf8)
51/// - **Column mapping**: You need to map columns with a legacy name to a new name
52/// - **Custom handling of missing columns**: By default they are filled in with nulls, but you may e.g. want to fill them in with `0` or `""`.
53///
54/// If not specified, a [`datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory`]
55/// will be used, which handles basic schema compatibility cases.
56#[derive(Debug, Clone, Default)]
57pub struct ListingTableConfig {
58 /// Paths on the `ObjectStore` for creating [`crate::ListingTable`].
59 /// They should share the same schema and object store.
60 pub table_paths: Vec<ListingTableUrl>,
61 /// Optional `SchemaRef` for the to be created [`crate::ListingTable`].
62 ///
63 /// See details on [`ListingTableConfig::with_schema`]
64 pub file_schema: Option<SchemaRef>,
65 /// Optional [`ListingOptions`] for the to be created [`crate::ListingTable`].
66 ///
67 /// See details on [`ListingTableConfig::with_listing_options`]
68 pub options: Option<ListingOptions>,
69 /// Tracks the source of the schema information
70 pub(crate) schema_source: SchemaSource,
71 /// Optional [`SchemaAdapterFactory`] for creating schema adapters
72 pub(crate) schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
73 /// Optional [`PhysicalExprAdapterFactory`] for creating physical expression adapters
74 pub(crate) expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
75}
76
77impl ListingTableConfig {
78 /// Creates new [`ListingTableConfig`] for reading the specified URL
79 pub fn new(table_path: ListingTableUrl) -> Self {
80 Self {
81 table_paths: vec![table_path],
82 ..Default::default()
83 }
84 }
85
86 /// Creates new [`ListingTableConfig`] with multiple table paths.
87 ///
88 /// See `ListingTableConfigExt::infer_options` for details on what happens with multiple paths
89 pub fn new_with_multi_paths(table_paths: Vec<ListingTableUrl>) -> Self {
90 Self {
91 table_paths,
92 ..Default::default()
93 }
94 }
95
96 /// Returns the source of the schema for this configuration
97 pub fn schema_source(&self) -> SchemaSource {
98 self.schema_source
99 }
100 /// Set the `schema` for the overall [`crate::ListingTable`]
101 ///
102 /// [`crate::ListingTable`] will automatically coerce, when possible, the schema
103 /// for individual files to match this schema.
104 ///
105 /// If a schema is not provided, it is inferred using
106 /// [`Self::infer_schema`].
107 ///
108 /// If the schema is provided, it must contain only the fields in the file
109 /// without the table partitioning columns.
110 ///
111 /// # Example: Specifying Table Schema
112 /// ```rust
113 /// # use std::sync::Arc;
114 /// # use datafusion_catalog_listing::{ListingTableConfig, ListingOptions};
115 /// # use datafusion_datasource::ListingTableUrl;
116 /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
117 /// # use arrow::datatypes::{Schema, Field, DataType};
118 /// # let table_paths = ListingTableUrl::parse("file:///path/to/data").unwrap();
119 /// # let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()));
120 /// let schema = Arc::new(Schema::new(vec![
121 /// Field::new("id", DataType::Int64, false),
122 /// Field::new("name", DataType::Utf8, true),
123 /// ]));
124 ///
125 /// let config = ListingTableConfig::new(table_paths)
126 /// .with_listing_options(listing_options) // Set options first
127 /// .with_schema(schema); // Then set schema
128 /// ```
129 pub fn with_schema(self, schema: SchemaRef) -> Self {
130 // Note: We preserve existing options state, but downstream code may expect
131 // options to be set. Consider calling with_listing_options() or infer_options()
132 // before operations that require options to be present.
133 debug_assert!(
134 self.options.is_some() || cfg!(test),
135 "ListingTableConfig::with_schema called without options set. \
136 Consider calling with_listing_options() or infer_options() first to avoid panics in downstream code."
137 );
138
139 Self {
140 file_schema: Some(schema),
141 schema_source: SchemaSource::Specified,
142 ..self
143 }
144 }
145
146 /// Add `listing_options` to [`ListingTableConfig`]
147 ///
148 /// If not provided, format and other options are inferred via
149 /// `ListingTableConfigExt::infer_options`.
150 ///
151 /// # Example: Configuring Parquet Files with Custom Options
152 /// ```rust
153 /// # use std::sync::Arc;
154 /// # use datafusion_catalog_listing::{ListingTableConfig, ListingOptions};
155 /// # use datafusion_datasource::ListingTableUrl;
156 /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
157 /// # let table_paths = ListingTableUrl::parse("file:///path/to/data").unwrap();
158 /// let options = ListingOptions::new(Arc::new(ParquetFormat::default()))
159 /// .with_file_extension(".parquet")
160 /// .with_collect_stat(true);
161 ///
162 /// let config = ListingTableConfig::new(table_paths).with_listing_options(options);
163 /// // Configure file format and options
164 /// ```
165 pub fn with_listing_options(self, listing_options: ListingOptions) -> Self {
166 // Note: This method properly sets options, but be aware that downstream
167 // methods like infer_schema() and try_new() require both schema and options
168 // to be set to function correctly.
169 debug_assert!(
170 !self.table_paths.is_empty() || cfg!(test),
171 "ListingTableConfig::with_listing_options called without table_paths set. \
172 Consider calling new() or new_with_multi_paths() first to establish table paths."
173 );
174
175 Self {
176 options: Some(listing_options),
177 ..self
178 }
179 }
180
181 /// Returns a tuple of `(file_extension, optional compression_extension)`
182 ///
183 /// For example a path ending with blah.test.csv.gz returns `("csv", Some("gz"))`
184 /// For example a path ending with blah.test.csv returns `("csv", None)`
185 pub fn infer_file_extension_and_compression_type(
186 path: &str,
187 ) -> datafusion_common::Result<(String, Option<String>)> {
188 let mut exts = path.rsplit('.');
189
190 let split = exts.next().unwrap_or("");
191
192 let file_compression_type = FileCompressionType::from_str(split)
193 .unwrap_or(FileCompressionType::UNCOMPRESSED);
194
195 if file_compression_type.is_compressed() {
196 let split2 = exts.next().unwrap_or("");
197 Ok((split2.to_string(), Some(split.to_string())))
198 } else {
199 Ok((split.to_string(), None))
200 }
201 }
202
203 /// Infer the [`SchemaRef`] based on `table_path`s.
204 ///
205 /// This method infers the table schema using the first `table_path`.
206 /// See [`ListingOptions::infer_schema`] for more details
207 ///
208 /// # Errors
209 /// * if `self.options` is not set. See [`Self::with_listing_options`]
210 pub async fn infer_schema(
211 self,
212 state: &dyn Session,
213 ) -> datafusion_common::Result<Self> {
214 match self.options {
215 Some(options) => {
216 let ListingTableConfig {
217 table_paths,
218 file_schema,
219 options: _,
220 schema_source,
221 schema_adapter_factory,
222 expr_adapter_factory: physical_expr_adapter_factory,
223 } = self;
224
225 let (schema, new_schema_source) = match file_schema {
226 Some(schema) => (schema, schema_source), // Keep existing source if schema exists
227 None => {
228 if let Some(url) = table_paths.first() {
229 (
230 options.infer_schema(state, url).await?,
231 SchemaSource::Inferred,
232 )
233 } else {
234 (Arc::new(Schema::empty()), SchemaSource::Inferred)
235 }
236 }
237 };
238
239 Ok(Self {
240 table_paths,
241 file_schema: Some(schema),
242 options: Some(options),
243 schema_source: new_schema_source,
244 schema_adapter_factory,
245 expr_adapter_factory: physical_expr_adapter_factory,
246 })
247 }
248 None => internal_err!("No `ListingOptions` set for inferring schema"),
249 }
250 }
251
252 /// Infer the partition columns from `table_paths`.
253 ///
254 /// # Errors
255 /// * if `self.options` is not set. See [`Self::with_listing_options`]
256 pub async fn infer_partitions_from_path(
257 self,
258 state: &dyn Session,
259 ) -> datafusion_common::Result<Self> {
260 match self.options {
261 Some(options) => {
262 let Some(url) = self.table_paths.first() else {
263 return config_err!("No table path found");
264 };
265 let partitions = options
266 .infer_partitions(state, url)
267 .await?
268 .into_iter()
269 .map(|col_name| {
270 (
271 col_name,
272 DataType::Dictionary(
273 Box::new(DataType::UInt16),
274 Box::new(DataType::Utf8),
275 ),
276 )
277 })
278 .collect::<Vec<_>>();
279 let options = options.with_table_partition_cols(partitions);
280 Ok(Self {
281 table_paths: self.table_paths,
282 file_schema: self.file_schema,
283 options: Some(options),
284 schema_source: self.schema_source,
285 schema_adapter_factory: self.schema_adapter_factory,
286 expr_adapter_factory: self.expr_adapter_factory,
287 })
288 }
289 None => config_err!("No `ListingOptions` set for inferring schema"),
290 }
291 }
292
293 /// Set the [`SchemaAdapterFactory`] for the [`crate::ListingTable`]
294 ///
295 /// The schema adapter factory is used to create schema adapters that can
296 /// handle schema evolution and type conversions when reading files with
297 /// different schemas than the table schema.
298 ///
299 /// If not provided, a default schema adapter factory will be used.
300 ///
301 /// # Example: Custom Schema Adapter for Type Coercion
302 /// ```rust
303 /// # use std::sync::Arc;
304 /// # use datafusion_catalog_listing::{ListingTableConfig, ListingOptions};
305 /// # use datafusion_datasource::schema_adapter::{SchemaAdapterFactory, SchemaAdapter};
306 /// # use datafusion_datasource::ListingTableUrl;
307 /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
308 /// # use arrow::datatypes::{SchemaRef, Schema, Field, DataType};
309 /// #
310 /// # #[derive(Debug)]
311 /// # struct MySchemaAdapterFactory;
312 /// # impl SchemaAdapterFactory for MySchemaAdapterFactory {
313 /// # fn create(&self, _projected_table_schema: SchemaRef, _file_schema: SchemaRef) -> Box<dyn SchemaAdapter> {
314 /// # unimplemented!()
315 /// # }
316 /// # }
317 /// # let table_paths = ListingTableUrl::parse("file:///path/to/data").unwrap();
318 /// # let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()));
319 /// # let table_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
320 /// let config = ListingTableConfig::new(table_paths)
321 /// .with_listing_options(listing_options)
322 /// .with_schema(table_schema)
323 /// .with_schema_adapter_factory(Arc::new(MySchemaAdapterFactory));
324 /// ```
325 pub fn with_schema_adapter_factory(
326 self,
327 schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
328 ) -> Self {
329 Self {
330 schema_adapter_factory: Some(schema_adapter_factory),
331 ..self
332 }
333 }
334
335 /// Get the [`SchemaAdapterFactory`] for this configuration
336 pub fn schema_adapter_factory(&self) -> Option<&Arc<dyn SchemaAdapterFactory>> {
337 self.schema_adapter_factory.as_ref()
338 }
339
340 /// Set the [`PhysicalExprAdapterFactory`] for the [`crate::ListingTable`]
341 ///
342 /// The expression adapter factory is used to create physical expression adapters that can
343 /// handle schema evolution and type conversions when evaluating expressions
344 /// with different schemas than the table schema.
345 ///
346 /// If not provided, a default physical expression adapter factory will be used unless a custom
347 /// `SchemaAdapterFactory` is set, in which case only the `SchemaAdapterFactory` will be used.
348 ///
349 /// See <https://github.com/apache/datafusion/issues/16800> for details on this transition.
350 pub fn with_expr_adapter_factory(
351 self,
352 expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory>,
353 ) -> Self {
354 Self {
355 expr_adapter_factory: Some(expr_adapter_factory),
356 ..self
357 }
358 }
359}