datafusion_catalog_listing/options.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow::datatypes::{DataType, SchemaRef};
19use datafusion_catalog::Session;
20use datafusion_common::plan_err;
21use datafusion_datasource::file_format::FileFormat;
22use datafusion_datasource::ListingTableUrl;
23use datafusion_execution::config::SessionConfig;
24use datafusion_expr::SortExpr;
25use futures::StreamExt;
26use futures::{future, TryStreamExt};
27use itertools::Itertools;
28use std::sync::Arc;
29
30/// Options for creating a [`crate::ListingTable`]
31#[derive(Clone, Debug)]
32pub struct ListingOptions {
33 /// A suffix on which files should be filtered (leave empty to
34 /// keep all files on the path)
35 pub file_extension: String,
36 /// The file format
37 pub format: Arc<dyn FileFormat>,
38 /// The expected partition column names in the folder structure.
39 /// See [Self::with_table_partition_cols] for details
40 pub table_partition_cols: Vec<(String, DataType)>,
41 /// Set true to try to guess statistics from the files.
42 /// This can add a lot of overhead as it will usually require files
43 /// to be opened and at least partially parsed.
44 pub collect_stat: bool,
45 /// Group files to avoid that the number of partitions exceeds
46 /// this limit
47 pub target_partitions: usize,
48 /// Optional pre-known sort order(s). Must be `SortExpr`s.
49 ///
50 /// DataFusion may take advantage of this ordering to omit sorts
51 /// or use more efficient algorithms. Currently sortedness must be
52 /// provided if it is known by some external mechanism, but may in
53 /// the future be automatically determined, for example using
54 /// parquet metadata.
55 ///
56 /// See <https://github.com/apache/datafusion/issues/4177>
57 ///
58 /// NOTE: This attribute stores all equivalent orderings (the outer `Vec`)
59 /// where each ordering consists of an individual lexicographic
60 /// ordering (encapsulated by a `Vec<Expr>`). If there aren't
61 /// multiple equivalent orderings, the outer `Vec` will have a
62 /// single element.
63 pub file_sort_order: Vec<Vec<SortExpr>>,
64}
65
66impl ListingOptions {
67 /// Creates an options instance with the given format
68 /// Default values:
69 /// - use default file extension filter
70 /// - no input partition to discover
71 /// - one target partition
72 /// - do not collect statistics
73 pub fn new(format: Arc<dyn FileFormat>) -> Self {
74 Self {
75 file_extension: format.get_ext(),
76 format,
77 table_partition_cols: vec![],
78 collect_stat: false,
79 target_partitions: 1,
80 file_sort_order: vec![],
81 }
82 }
83
84 /// Set options from [`SessionConfig`] and returns self.
85 ///
86 /// Currently this sets `target_partitions` and `collect_stat`
87 /// but if more options are added in the future that need to be coordinated
88 /// they will be synchronized through this method.
89 pub fn with_session_config_options(mut self, config: &SessionConfig) -> Self {
90 self = self.with_target_partitions(config.target_partitions());
91 self = self.with_collect_stat(config.collect_statistics());
92 self
93 }
94
95 /// Set file extension on [`ListingOptions`] and returns self.
96 ///
97 /// # Example
98 /// ```
99 /// # use std::sync::Arc;
100 /// # use datafusion_catalog_listing::ListingOptions;
101 /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
102 ///
103 /// let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()))
104 /// .with_file_extension(".parquet");
105 ///
106 /// assert_eq!(listing_options.file_extension, ".parquet");
107 /// ```
108 pub fn with_file_extension(mut self, file_extension: impl Into<String>) -> Self {
109 self.file_extension = file_extension.into();
110 self
111 }
112
113 /// Optionally set file extension on [`ListingOptions`] and returns self.
114 ///
115 /// If `file_extension` is `None`, the file extension will not be changed
116 ///
117 /// # Example
118 /// ```
119 /// # use std::sync::Arc;
120 /// # use datafusion_catalog_listing::ListingOptions;
121 /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
122 ///
123 /// let extension = Some(".parquet");
124 /// let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()))
125 /// .with_file_extension_opt(extension);
126 ///
127 /// assert_eq!(listing_options.file_extension, ".parquet");
128 /// ```
129 pub fn with_file_extension_opt<S>(mut self, file_extension: Option<S>) -> Self
130 where
131 S: Into<String>,
132 {
133 if let Some(file_extension) = file_extension {
134 self.file_extension = file_extension.into();
135 }
136 self
137 }
138
139 /// Set `table partition columns` on [`ListingOptions`] and returns self.
140 ///
141 /// "partition columns," used to support [Hive Partitioning], are
142 /// columns added to the data that is read, based on the folder
143 /// structure where the data resides.
144 ///
145 /// For example, give the following files in your filesystem:
146 ///
147 /// ```text
148 /// /mnt/nyctaxi/year=2022/month=01/tripdata.parquet
149 /// /mnt/nyctaxi/year=2021/month=12/tripdata.parquet
150 /// /mnt/nyctaxi/year=2021/month=11/tripdata.parquet
151 /// ```
152 ///
153 /// A [`crate::ListingTable`] created at `/mnt/nyctaxi/` with partition
154 /// columns "year" and "month" will include new `year` and `month`
155 /// columns while reading the files. The `year` column would have
156 /// value `2022` and the `month` column would have value `01` for
157 /// the rows read from
158 /// `/mnt/nyctaxi/year=2022/month=01/tripdata.parquet`
159 ///
160 ///# Notes
161 ///
162 /// - If only one level (e.g. `year` in the example above) is
163 /// specified, the other levels are ignored but the files are
164 /// still read.
165 ///
166 /// - Files that don't follow this partitioning scheme will be
167 /// ignored.
168 ///
169 /// - Since the columns have the same value for all rows read from
170 /// each individual file (such as dates), they are typically
171 /// dictionary encoded for efficiency. You may use
172 /// [`wrap_partition_type_in_dict`] to request a
173 /// dictionary-encoded type.
174 ///
175 /// - The partition columns are solely extracted from the file path. Especially they are NOT part of the parquet files itself.
176 ///
177 /// # Example
178 ///
179 /// ```
180 /// # use std::sync::Arc;
181 /// # use arrow::datatypes::DataType;
182 /// # use datafusion_expr::col;
183 /// # use datafusion_catalog_listing::ListingOptions;
184 /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
185 ///
186 /// // listing options for files with paths such as `/mnt/data/col_a=x/col_b=y/data.parquet`
187 /// // `col_a` and `col_b` will be included in the data read from those files
188 /// let listing_options = ListingOptions::new(Arc::new(
189 /// ParquetFormat::default()
190 /// ))
191 /// .with_table_partition_cols(vec![("col_a".to_string(), DataType::Utf8),
192 /// ("col_b".to_string(), DataType::Utf8)]);
193 ///
194 /// assert_eq!(listing_options.table_partition_cols, vec![("col_a".to_string(), DataType::Utf8),
195 /// ("col_b".to_string(), DataType::Utf8)]);
196 /// ```
197 ///
198 /// [Hive Partitioning]: https://docs.cloudera.com/HDPDocuments/HDP2/HDP-2.1.3/bk_system-admin-guide/content/hive_partitioned_tables.html
199 /// [`wrap_partition_type_in_dict`]: datafusion_datasource::file_scan_config::wrap_partition_type_in_dict
200 pub fn with_table_partition_cols(
201 mut self,
202 table_partition_cols: Vec<(String, DataType)>,
203 ) -> Self {
204 self.table_partition_cols = table_partition_cols;
205 self
206 }
207
208 /// Set stat collection on [`ListingOptions`] and returns self.
209 ///
210 /// ```
211 /// # use std::sync::Arc;
212 /// # use datafusion_catalog_listing::ListingOptions;
213 /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
214 ///
215 /// let listing_options =
216 /// ListingOptions::new(Arc::new(ParquetFormat::default())).with_collect_stat(true);
217 ///
218 /// assert_eq!(listing_options.collect_stat, true);
219 /// ```
220 pub fn with_collect_stat(mut self, collect_stat: bool) -> Self {
221 self.collect_stat = collect_stat;
222 self
223 }
224
225 /// Set number of target partitions on [`ListingOptions`] and returns self.
226 ///
227 /// ```
228 /// # use std::sync::Arc;
229 /// # use datafusion_catalog_listing::ListingOptions;
230 /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
231 ///
232 /// let listing_options =
233 /// ListingOptions::new(Arc::new(ParquetFormat::default())).with_target_partitions(8);
234 ///
235 /// assert_eq!(listing_options.target_partitions, 8);
236 /// ```
237 pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
238 self.target_partitions = target_partitions;
239 self
240 }
241
242 /// Set file sort order on [`ListingOptions`] and returns self.
243 ///
244 /// ```
245 /// # use std::sync::Arc;
246 /// # use datafusion_expr::col;
247 /// # use datafusion_catalog_listing::ListingOptions;
248 /// # use datafusion_datasource_parquet::file_format::ParquetFormat;
249 ///
250 /// // Tell datafusion that the files are sorted by column "a"
251 /// let file_sort_order = vec![vec![col("a").sort(true, true)]];
252 ///
253 /// let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default()))
254 /// .with_file_sort_order(file_sort_order.clone());
255 ///
256 /// assert_eq!(listing_options.file_sort_order, file_sort_order);
257 /// ```
258 pub fn with_file_sort_order(mut self, file_sort_order: Vec<Vec<SortExpr>>) -> Self {
259 self.file_sort_order = file_sort_order;
260 self
261 }
262
263 /// Infer the schema of the files at the given path on the provided object store.
264 ///
265 /// If the table_path contains one or more files (i.e. it is a directory /
266 /// prefix of files) their schema is merged by calling [`FileFormat::infer_schema`]
267 ///
268 /// Note: The inferred schema does not include any partitioning columns.
269 ///
270 /// This method is called as part of creating a [`crate::ListingTable`].
271 pub async fn infer_schema<'a>(
272 &'a self,
273 state: &dyn Session,
274 table_path: &'a ListingTableUrl,
275 ) -> datafusion_common::Result<SchemaRef> {
276 let store = state.runtime_env().object_store(table_path)?;
277
278 let files: Vec<_> = table_path
279 .list_all_files(state, store.as_ref(), &self.file_extension)
280 .await?
281 // Empty files cannot affect schema but may throw when trying to read for it
282 .try_filter(|object_meta| future::ready(object_meta.size > 0))
283 .try_collect()
284 .await?;
285
286 let schema = self.format.infer_schema(state, &store, &files).await?;
287
288 Ok(schema)
289 }
290
291 /// Infers the partition columns stored in `LOCATION` and compares
292 /// them with the columns provided in `PARTITIONED BY` to help prevent
293 /// accidental corrupts of partitioned tables.
294 ///
295 /// Allows specifying partial partitions.
296 pub async fn validate_partitions(
297 &self,
298 state: &dyn Session,
299 table_path: &ListingTableUrl,
300 ) -> datafusion_common::Result<()> {
301 if self.table_partition_cols.is_empty() {
302 return Ok(());
303 }
304
305 if !table_path.is_collection() {
306 return plan_err!(
307 "Can't create a partitioned table backed by a single file, \
308 perhaps the URL is missing a trailing slash?"
309 );
310 }
311
312 let inferred = self.infer_partitions(state, table_path).await?;
313
314 // no partitioned files found on disk
315 if inferred.is_empty() {
316 return Ok(());
317 }
318
319 let table_partition_names = self
320 .table_partition_cols
321 .iter()
322 .map(|(col_name, _)| col_name.clone())
323 .collect_vec();
324
325 if inferred.len() < table_partition_names.len() {
326 return plan_err!(
327 "Inferred partitions to be {:?}, but got {:?}",
328 inferred,
329 table_partition_names
330 );
331 }
332
333 // match prefix to allow creating tables with partial partitions
334 for (idx, col) in table_partition_names.iter().enumerate() {
335 if &inferred[idx] != col {
336 return plan_err!(
337 "Inferred partitions to be {:?}, but got {:?}",
338 inferred,
339 table_partition_names
340 );
341 }
342 }
343
344 Ok(())
345 }
346
347 /// Infer the partitioning at the given path on the provided object store.
348 /// For performance reasons, it doesn't read all the files on disk
349 /// and therefore may fail to detect invalid partitioning.
350 pub async fn infer_partitions(
351 &self,
352 state: &dyn Session,
353 table_path: &ListingTableUrl,
354 ) -> datafusion_common::Result<Vec<String>> {
355 let store = state.runtime_env().object_store(table_path)?;
356
357 // only use 10 files for inference
358 // This can fail to detect inconsistent partition keys
359 // A DFS traversal approach of the store can help here
360 let files: Vec<_> = table_path
361 .list_all_files(state, store.as_ref(), &self.file_extension)
362 .await?
363 .take(10)
364 .try_collect()
365 .await?;
366
367 let stripped_path_parts = files.iter().map(|file| {
368 table_path
369 .strip_prefix(&file.location)
370 .unwrap()
371 .collect_vec()
372 });
373
374 let partition_keys = stripped_path_parts
375 .map(|path_parts| {
376 path_parts
377 .into_iter()
378 .rev()
379 .skip(1) // get parents only; skip the file itself
380 .rev()
381 // Partitions are expected to follow the format "column_name=value", so we
382 // should ignore any path part that cannot be parsed into the expected format
383 .filter(|s| s.contains('='))
384 .map(|s| s.split('=').take(1).collect())
385 .collect_vec()
386 })
387 .collect_vec();
388
389 match partition_keys.into_iter().all_equal_value() {
390 Ok(v) => Ok(v),
391 Err(None) => Ok(vec![]),
392 Err(Some(diff)) => {
393 let mut sorted_diff = [diff.0, diff.1];
394 sorted_diff.sort();
395 plan_err!("Found mixed partition values on disk {:?}", sorted_diff)
396 }
397 }
398 }
399}