datafusion_catalog_listing/
options.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow::datatypes::{DataType, SchemaRef};
19use datafusion_catalog::Session;
20use datafusion_common::plan_err;
21use datafusion_datasource::file_format::FileFormat;
22use datafusion_datasource::ListingTableUrl;
23use datafusion_execution::config::SessionConfig;
24use datafusion_expr::SortExpr;
25use futures::StreamExt;
26use futures::{future, TryStreamExt};
27use itertools::Itertools;
28use std::sync::Arc;
29
30/// Options for creating a [`crate::ListingTable`]
31#[derive(Clone, Debug)]
32pub struct ListingOptions {
33    /// A suffix on which files should be filtered (leave empty to
34    /// keep all files on the path)
35    pub file_extension: String,
36    /// The file format
37    pub format: Arc<dyn FileFormat>,
38    /// The expected partition column names in the folder structure.
39    /// See [Self::with_table_partition_cols] for details
40    pub table_partition_cols: Vec<(String, DataType)>,
41    /// Set true to try to guess statistics from the files.
42    /// This can add a lot of overhead as it will usually require files
43    /// to be opened and at least partially parsed.
44    pub collect_stat: bool,
45    /// Group files to avoid that the number of partitions exceeds
46    /// this limit
47    pub target_partitions: usize,
48    /// Optional pre-known sort order(s). Must be `SortExpr`s.
49    ///
50    /// DataFusion may take advantage of this ordering to omit sorts
51    /// or use more efficient algorithms. Currently sortedness must be
52    /// provided if it is known by some external mechanism, but may in
53    /// the future be automatically determined, for example using
54    /// parquet metadata.
55    ///
56    /// See <https://github.com/apache/datafusion/issues/4177>
57    ///
58    /// NOTE: This attribute stores all equivalent orderings (the outer `Vec`)
59    ///       where each ordering consists of an individual lexicographic
60    ///       ordering (encapsulated by a `Vec<Expr>`). If there aren't
61    ///       multiple equivalent orderings, the outer `Vec` will have a
62    ///       single element.
63    pub file_sort_order: Vec<Vec<SortExpr>>,
64}
65
66impl ListingOptions {
67    /// Creates an options instance with the given format
68    /// Default values:
69    /// - use default file extension filter
70    /// - no input partition to discover
71    /// - one target partition
72    /// - do not collect statistics
73    pub fn new(format: Arc<dyn FileFormat>) -> Self {
74        Self {
75            file_extension: format.get_ext(),
76            format,
77            table_partition_cols: vec![],
78            collect_stat: false,
79            target_partitions: 1,
80            file_sort_order: vec![],
81        }
82    }
83
84    /// Set options from [`SessionConfig`] and returns self.
85    ///
86    /// Currently this sets `target_partitions` and `collect_stat`
87    /// but if more options are added in the future that need to be coordinated
88    /// they will be synchronized through this method.
89    pub fn with_session_config_options(mut self, config: &SessionConfig) -> Self {
90        self = self.with_target_partitions(config.target_partitions());
91        self = self.with_collect_stat(config.collect_statistics());
92        self
93    }
94
95    /// Set file extension on [`ListingOptions`] and returns self.
96    ///
97    /// # Example
98    /// ```
99    /// # use std::sync::Arc;
100    /// # use datafusion_catalog_listing::ListingOptions;
101    /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
102    ///
103    /// let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()))
104    ///     .with_file_extension(".parquet");
105    ///
106    /// assert_eq!(listing_options.file_extension, ".parquet");
107    /// ```
108    pub fn with_file_extension(mut self, file_extension: impl Into<String>) -> Self {
109        self.file_extension = file_extension.into();
110        self
111    }
112
113    /// Optionally set file extension on [`ListingOptions`] and returns self.
114    ///
115    /// If `file_extension` is `None`, the file extension will not be changed
116    ///
117    /// # Example
118    /// ```
119    /// # use std::sync::Arc;
120    /// # use datafusion_catalog_listing::ListingOptions;
121    /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
122    ///
123    /// let extension = Some(".parquet");
124    /// let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()))
125    ///     .with_file_extension_opt(extension);
126    ///
127    /// assert_eq!(listing_options.file_extension, ".parquet");
128    /// ```
129    pub fn with_file_extension_opt<S>(mut self, file_extension: Option<S>) -> Self
130    where
131        S: Into<String>,
132    {
133        if let Some(file_extension) = file_extension {
134            self.file_extension = file_extension.into();
135        }
136        self
137    }
138
139    /// Set `table partition columns` on [`ListingOptions`] and returns self.
140    ///
141    /// "partition columns," used to support [Hive Partitioning], are
142    /// columns added to the data that is read, based on the folder
143    /// structure where the data resides.
144    ///
145    /// For example, give the following files in your filesystem:
146    ///
147    /// ```text
148    /// /mnt/nyctaxi/year=2022/month=01/tripdata.parquet
149    /// /mnt/nyctaxi/year=2021/month=12/tripdata.parquet
150    /// /mnt/nyctaxi/year=2021/month=11/tripdata.parquet
151    /// ```
152    ///
153    /// A [`crate::ListingTable`] created at `/mnt/nyctaxi/` with partition
154    /// columns "year" and "month" will include new `year` and `month`
155    /// columns while reading the files. The `year` column would have
156    /// value `2022` and the `month` column would have value `01` for
157    /// the rows read from
158    /// `/mnt/nyctaxi/year=2022/month=01/tripdata.parquet`
159    ///
160    ///# Notes
161    ///
162    /// - If only one level (e.g. `year` in the example above) is
163    ///   specified, the other levels are ignored but the files are
164    ///   still read.
165    ///
166    /// - Files that don't follow this partitioning scheme will be
167    ///   ignored.
168    ///
169    /// - Since the columns have the same value for all rows read from
170    ///   each individual file (such as dates), they are typically
171    ///   dictionary encoded for efficiency. You may use
172    ///   [`wrap_partition_type_in_dict`] to request a
173    ///   dictionary-encoded type.
174    ///
175    /// - The partition columns are solely extracted from the file path. Especially they are NOT part of the parquet files itself.
176    ///
177    /// # Example
178    ///
179    /// ```
180    /// # use std::sync::Arc;
181    /// # use arrow::datatypes::DataType;
182    /// # use datafusion_expr::col;
183    /// # use datafusion_catalog_listing::ListingOptions;
184    /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
185    ///
186    /// // listing options for files with paths such as  `/mnt/data/col_a=x/col_b=y/data.parquet`
187    /// // `col_a` and `col_b` will be included in the data read from those files
188    /// let listing_options = ListingOptions::new(Arc::new(
189    ///     ParquetFormat::default()
190    ///   ))
191    ///   .with_table_partition_cols(vec![("col_a".to_string(), DataType::Utf8),
192    ///       ("col_b".to_string(), DataType::Utf8)]);
193    ///
194    /// assert_eq!(listing_options.table_partition_cols, vec![("col_a".to_string(), DataType::Utf8),
195    ///     ("col_b".to_string(), DataType::Utf8)]);
196    /// ```
197    ///
198    /// [Hive Partitioning]: https://docs.cloudera.com/HDPDocuments/HDP2/HDP-2.1.3/bk_system-admin-guide/content/hive_partitioned_tables.html
199    /// [`wrap_partition_type_in_dict`]: datafusion_datasource::file_scan_config::wrap_partition_type_in_dict
200    pub fn with_table_partition_cols(
201        mut self,
202        table_partition_cols: Vec<(String, DataType)>,
203    ) -> Self {
204        self.table_partition_cols = table_partition_cols;
205        self
206    }
207
208    /// Set stat collection on [`ListingOptions`] and returns self.
209    ///
210    /// ```
211    /// # use std::sync::Arc;
212    /// # use datafusion_catalog_listing::ListingOptions;
213    /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
214    ///
215    /// let listing_options =
216    ///     ListingOptions::new(Arc::new(ParquetFormat::default())).with_collect_stat(true);
217    ///
218    /// assert_eq!(listing_options.collect_stat, true);
219    /// ```
220    pub fn with_collect_stat(mut self, collect_stat: bool) -> Self {
221        self.collect_stat = collect_stat;
222        self
223    }
224
225    /// Set number of target partitions on [`ListingOptions`] and returns self.
226    ///
227    /// ```
228    /// # use std::sync::Arc;
229    /// # use datafusion_catalog_listing::ListingOptions;
230    /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
231    ///
232    /// let listing_options =
233    ///     ListingOptions::new(Arc::new(ParquetFormat::default())).with_target_partitions(8);
234    ///
235    /// assert_eq!(listing_options.target_partitions, 8);
236    /// ```
237    pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
238        self.target_partitions = target_partitions;
239        self
240    }
241
242    /// Set file sort order on [`ListingOptions`] and returns self.
243    ///
244    /// ```
245    /// # use std::sync::Arc;
246    /// # use datafusion_expr::col;
247    /// # use datafusion_catalog_listing::ListingOptions;
248    /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
249    ///
250    /// // Tell datafusion that the files are sorted by column "a"
251    /// let file_sort_order = vec![vec![col("a").sort(true, true)]];
252    ///
253    /// let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()))
254    ///     .with_file_sort_order(file_sort_order.clone());
255    ///
256    /// assert_eq!(listing_options.file_sort_order, file_sort_order);
257    /// ```
258    pub fn with_file_sort_order(mut self, file_sort_order: Vec<Vec<SortExpr>>) -> Self {
259        self.file_sort_order = file_sort_order;
260        self
261    }
262
263    /// Infer the schema of the files at the given path on the provided object store.
264    ///
265    /// If the table_path contains one or more files (i.e. it is a directory /
266    /// prefix of files) their schema is merged by calling [`FileFormat::infer_schema`]
267    ///
268    /// Note: The inferred schema does not include any partitioning columns.
269    ///
270    /// This method is called as part of creating a [`crate::ListingTable`].
271    pub async fn infer_schema<'a>(
272        &'a self,
273        state: &dyn Session,
274        table_path: &'a ListingTableUrl,
275    ) -> datafusion_common::Result<SchemaRef> {
276        let store = state.runtime_env().object_store(table_path)?;
277
278        let files: Vec<_> = table_path
279            .list_all_files(state, store.as_ref(), &self.file_extension)
280            .await?
281            // Empty files cannot affect schema but may throw when trying to read for it
282            .try_filter(|object_meta| future::ready(object_meta.size > 0))
283            .try_collect()
284            .await?;
285
286        let schema = self.format.infer_schema(state, &store, &files).await?;
287
288        Ok(schema)
289    }
290
291    /// Infers the partition columns stored in `LOCATION` and compares
292    /// them with the columns provided in `PARTITIONED BY` to help prevent
293    /// accidental corrupts of partitioned tables.
294    ///
295    /// Allows specifying partial partitions.
296    pub async fn validate_partitions(
297        &self,
298        state: &dyn Session,
299        table_path: &ListingTableUrl,
300    ) -> datafusion_common::Result<()> {
301        if self.table_partition_cols.is_empty() {
302            return Ok(());
303        }
304
305        if !table_path.is_collection() {
306            return plan_err!(
307                "Can't create a partitioned table backed by a single file, \
308                perhaps the URL is missing a trailing slash?"
309            );
310        }
311
312        let inferred = self.infer_partitions(state, table_path).await?;
313
314        // no partitioned files found on disk
315        if inferred.is_empty() {
316            return Ok(());
317        }
318
319        let table_partition_names = self
320            .table_partition_cols
321            .iter()
322            .map(|(col_name, _)| col_name.clone())
323            .collect_vec();
324
325        if inferred.len() < table_partition_names.len() {
326            return plan_err!(
327                "Inferred partitions to be {:?}, but got {:?}",
328                inferred,
329                table_partition_names
330            );
331        }
332
333        // match prefix to allow creating tables with partial partitions
334        for (idx, col) in table_partition_names.iter().enumerate() {
335            if &inferred[idx] != col {
336                return plan_err!(
337                    "Inferred partitions to be {:?}, but got {:?}",
338                    inferred,
339                    table_partition_names
340                );
341            }
342        }
343
344        Ok(())
345    }
346
347    /// Infer the partitioning at the given path on the provided object store.
348    /// For performance reasons, it doesn't read all the files on disk
349    /// and therefore may fail to detect invalid partitioning.
350    pub async fn infer_partitions(
351        &self,
352        state: &dyn Session,
353        table_path: &ListingTableUrl,
354    ) -> datafusion_common::Result<Vec<String>> {
355        let store = state.runtime_env().object_store(table_path)?;
356
357        // only use 10 files for inference
358        // This can fail to detect inconsistent partition keys
359        // A DFS traversal approach of the store can help here
360        let files: Vec<_> = table_path
361            .list_all_files(state, store.as_ref(), &self.file_extension)
362            .await?
363            .take(10)
364            .try_collect()
365            .await?;
366
367        let stripped_path_parts = files.iter().map(|file| {
368            table_path
369                .strip_prefix(&file.location)
370                .unwrap()
371                .collect_vec()
372        });
373
374        let partition_keys = stripped_path_parts
375            .map(|path_parts| {
376                path_parts
377                    .into_iter()
378                    .rev()
379                    .skip(1) // get parents only; skip the file itself
380                    .rev()
381                    // Partitions are expected to follow the format "column_name=value", so we
382                    // should ignore any path part that cannot be parsed into the expected format
383                    .filter(|s| s.contains('='))
384                    .map(|s| s.split('=').take(1).collect())
385                    .collect_vec()
386            })
387            .collect_vec();
388
389        match partition_keys.into_iter().all_equal_value() {
390            Ok(v) => Ok(v),
391            Err(None) => Ok(vec![]),
392            Err(Some(diff)) => {
393                let mut sorted_diff = [diff.0, diff.1];
394                sorted_diff.sort();
395                plan_err!("Found mixed partition values on disk {:?}", sorted_diff)
396            }
397        }
398    }
399}