datafusion/execution/context/
parquet.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use super::super::options::{ParquetReadOptions, ReadOptions};
21use super::{DataFilePaths, DataFrame, ExecutionPlan, Result, SessionContext};
22use datafusion_datasource_parquet::plan_to_parquet;
23
24use datafusion_common::TableReference;
25use parquet::file::properties::WriterProperties;
26
27impl SessionContext {
28    /// Creates a [`DataFrame`] for reading a Parquet data source.
29    ///
30    /// For more control such as reading multiple files, you can use
31    /// [`read_table`](Self::read_table) with a [`super::ListingTable`].
32    ///
33    /// For an example, see [`read_csv`](Self::read_csv)
34    ///
35    /// # Note: Statistics
36    ///
37    /// NOTE: by default, statistics are collected when reading the Parquet
38    /// files This can slow down the initial DataFrame creation while
39    /// greatly accelerating queries with certain filters.
40    ///
41    /// To disable statistics collection, set the [config option]
42    /// `datafusion.execution.collect_statistics` to `false`. See
43    /// [`ConfigOptions`] and [`ExecutionOptions::collect_statistics`] for more
44    /// details.
45    ///
46    /// [config option]: https://datafusion.apache.org/user-guide/configs.html
47    /// [`ConfigOptions`]: crate::config::ConfigOptions
48    /// [`ExecutionOptions::collect_statistics`]: crate::config::ExecutionOptions::collect_statistics
49    pub async fn read_parquet<P: DataFilePaths>(
50        &self,
51        table_paths: P,
52        options: ParquetReadOptions<'_>,
53    ) -> Result<DataFrame> {
54        self._read_type(table_paths, options).await
55    }
56
57    /// Registers a Parquet file as a table that can be referenced from SQL
58    /// statements executed against this context.
59    ///
60    /// # Note: Statistics
61    ///
62    /// Statistics are not collected by default. See  [`read_parquet`] for more
63    /// details and how to enable them.
64    ///
65    /// [`read_parquet`]: Self::read_parquet
66    pub async fn register_parquet(
67        &self,
68        table_ref: impl Into<TableReference>,
69        table_path: impl AsRef<str>,
70        options: ParquetReadOptions<'_>,
71    ) -> Result<()> {
72        let listing_options = options
73            .to_listing_options(&self.copied_config(), self.copied_table_options());
74
75        self.register_type_check(table_path.as_ref(), &listing_options.file_extension)?;
76
77        self.register_listing_table(
78            table_ref,
79            table_path,
80            listing_options,
81            options.schema.map(|s| Arc::new(s.to_owned())),
82            None,
83        )
84        .await?;
85        Ok(())
86    }
87
88    /// Executes a query and writes the results to a partitioned Parquet file.
89    pub async fn write_parquet(
90        &self,
91        plan: Arc<dyn ExecutionPlan>,
92        path: impl AsRef<str>,
93        writer_properties: Option<WriterProperties>,
94    ) -> Result<()> {
95        plan_to_parquet(self.task_ctx(), plan, path, writer_properties).await
96    }
97}
98
99#[cfg(test)]
100mod tests {
101    use super::*;
102    use crate::arrow::array::{Float32Array, Int32Array};
103    use crate::arrow::datatypes::{DataType, Field, Schema};
104    use crate::arrow::record_batch::RecordBatch;
105    use crate::dataframe::DataFrameWriteOptions;
106    use crate::parquet::basic::Compression;
107    use crate::test_util::parquet_test_data;
108
109    use arrow::util::pretty::pretty_format_batches;
110    use datafusion_common::config::TableParquetOptions;
111    use datafusion_common::{
112        assert_batches_eq, assert_batches_sorted_eq, assert_contains,
113    };
114    use datafusion_execution::config::SessionConfig;
115
116    use tempfile::{tempdir, TempDir};
117
118    #[tokio::test]
119    async fn read_with_glob_path() -> Result<()> {
120        let ctx = SessionContext::new();
121
122        let df = ctx
123            .read_parquet(
124                format!("{}/alltypes_plain*.parquet", parquet_test_data()),
125                ParquetReadOptions::default(),
126            )
127            .await?;
128        let results = df.collect().await?;
129        let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum();
130        // alltypes_plain.parquet = 8 rows, alltypes_plain.snappy.parquet = 2 rows, alltypes_dictionary.parquet = 2 rows
131        assert_eq!(total_rows, 10);
132        Ok(())
133    }
134
135    #[tokio::test]
136    async fn read_with_glob_path_issue_2465() -> Result<()> {
137        let config =
138            SessionConfig::from_string_hash_map(&std::collections::HashMap::from([(
139                "datafusion.execution.listing_table_ignore_subdirectory".to_owned(),
140                "false".to_owned(),
141            )]))?;
142        let ctx = SessionContext::new_with_config(config);
143        let df = ctx
144            .read_parquet(
145                // it was reported that when a path contains // (two consecutive separator) no files were found
146                // in this test, regardless of parquet_test_data() value, our path now contains a //
147                format!("{}/..//*/alltypes_plain*.parquet", parquet_test_data()),
148                ParquetReadOptions::default(),
149            )
150            .await?;
151        let results = df.collect().await?;
152        let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum();
153        // alltypes_plain.parquet = 8 rows, alltypes_plain.snappy.parquet = 2 rows, alltypes_dictionary.parquet = 2 rows
154        assert_eq!(total_rows, 10);
155        Ok(())
156    }
157
158    async fn explain_query_all_with_config(config: SessionConfig) -> Result<String> {
159        let ctx = SessionContext::new_with_config(config);
160
161        ctx.register_parquet(
162            "test",
163            &format!("{}/alltypes_plain*.parquet", parquet_test_data()),
164            ParquetReadOptions::default(),
165        )
166        .await?;
167        let df = ctx.sql("EXPLAIN SELECT * FROM test").await?;
168        let results = df.collect().await?;
169        let content = pretty_format_batches(&results).unwrap().to_string();
170        Ok(content)
171    }
172
173    #[tokio::test]
174    async fn register_parquet_respects_collect_statistics_config() -> Result<()> {
175        // The default is true
176        let mut config = SessionConfig::new();
177        config.options_mut().explain.physical_plan_only = true;
178        config.options_mut().explain.show_statistics = true;
179        let content = explain_query_all_with_config(config).await?;
180        assert_contains!(content, "statistics=[Rows=Exact(");
181
182        // Explicitly set to true
183        let mut config = SessionConfig::new();
184        config.options_mut().explain.physical_plan_only = true;
185        config.options_mut().explain.show_statistics = true;
186        config.options_mut().execution.collect_statistics = true;
187        let content = explain_query_all_with_config(config).await?;
188        assert_contains!(content, "statistics=[Rows=Exact(");
189
190        // Explicitly set to false
191        let mut config = SessionConfig::new();
192        config.options_mut().explain.physical_plan_only = true;
193        config.options_mut().explain.show_statistics = true;
194        config.options_mut().execution.collect_statistics = false;
195        let content = explain_query_all_with_config(config).await?;
196        assert_contains!(content, "statistics=[Rows=Absent,");
197
198        Ok(())
199    }
200
201    #[tokio::test]
202    async fn read_from_registered_table_with_glob_path() -> Result<()> {
203        let ctx = SessionContext::new();
204
205        ctx.register_parquet(
206            "test",
207            &format!("{}/alltypes_plain*.parquet", parquet_test_data()),
208            ParquetReadOptions::default(),
209        )
210        .await?;
211        let df = ctx.sql("SELECT * FROM test").await?;
212        let results = df.collect().await?;
213        let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum();
214        // alltypes_plain.parquet = 8 rows, alltypes_plain.snappy.parquet = 2 rows, alltypes_dictionary.parquet = 2 rows
215        assert_eq!(total_rows, 10);
216        Ok(())
217    }
218
219    #[tokio::test]
220    async fn read_from_different_file_extension() -> Result<()> {
221        let ctx = SessionContext::new();
222        let sep = std::path::MAIN_SEPARATOR.to_string();
223
224        // Make up a new dataframe.
225        let write_df = ctx.read_batch(RecordBatch::try_new(
226            Arc::new(Schema::new(vec![
227                Field::new("purchase_id", DataType::Int32, false),
228                Field::new("price", DataType::Float32, false),
229                Field::new("quantity", DataType::Int32, false),
230            ])),
231            vec![
232                Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])),
233                Arc::new(Float32Array::from(vec![1.12, 3.40, 2.33, 9.10, 6.66])),
234                Arc::new(Int32Array::from(vec![1, 3, 2, 4, 3])),
235            ],
236        )?)?;
237
238        let temp_dir = tempdir()?;
239        let temp_dir_path = temp_dir.path();
240        let path1 = temp_dir_path
241            .join("output1.parquet")
242            .to_str()
243            .unwrap()
244            .to_string();
245        let path2 = temp_dir_path
246            .join("output2.parquet.snappy")
247            .to_str()
248            .unwrap()
249            .to_string();
250        let path3 = temp_dir_path
251            .join("output3.parquet.snappy.parquet")
252            .to_str()
253            .unwrap()
254            .to_string();
255
256        let path4 = temp_dir_path
257            .join("output4.parquet".to_owned() + &sep)
258            .to_str()
259            .unwrap()
260            .to_string();
261
262        let path5 = temp_dir_path
263            .join("bbb..bbb")
264            .join("filename.parquet")
265            .to_str()
266            .unwrap()
267            .to_string();
268        let dir = temp_dir_path
269            .join("bbb..bbb".to_owned() + &sep)
270            .to_str()
271            .unwrap()
272            .to_string();
273        std::fs::create_dir(dir).expect("create dir failed");
274
275        let mut options = TableParquetOptions::default();
276        options.global.compression = Some(Compression::SNAPPY.to_string());
277
278        // Write the dataframe to a parquet file named 'output1.parquet'
279        write_df
280            .clone()
281            .write_parquet(
282                &path1,
283                DataFrameWriteOptions::new().with_single_file_output(true),
284                Some(options.clone()),
285            )
286            .await?;
287
288        // Write the dataframe to a parquet file named 'output2.parquet.snappy'
289        write_df
290            .clone()
291            .write_parquet(
292                &path2,
293                DataFrameWriteOptions::new().with_single_file_output(true),
294                Some(options.clone()),
295            )
296            .await?;
297
298        // Write the dataframe to a parquet file named 'output3.parquet.snappy.parquet'
299        write_df
300            .clone()
301            .write_parquet(
302                &path3,
303                DataFrameWriteOptions::new().with_single_file_output(true),
304                Some(options.clone()),
305            )
306            .await?;
307
308        // Write the dataframe to a parquet file named 'bbb..bbb/filename.parquet'
309        write_df
310            .write_parquet(
311                &path5,
312                DataFrameWriteOptions::new().with_single_file_output(true),
313                Some(options),
314            )
315            .await?;
316
317        // Read the dataframe from 'output1.parquet' with the default file extension.
318        let read_df = ctx
319            .read_parquet(
320                &path1,
321                ParquetReadOptions {
322                    ..Default::default()
323                },
324            )
325            .await?;
326
327        let results = read_df.collect().await?;
328        let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum();
329        assert_eq!(total_rows, 5);
330
331        // Read the dataframe from 'output2.parquet.snappy' with the correct file extension.
332        let read_df = ctx
333            .read_parquet(
334                &path2,
335                ParquetReadOptions {
336                    file_extension: "snappy",
337                    ..Default::default()
338                },
339            )
340            .await?;
341        let results = read_df.collect().await?;
342        let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum();
343        assert_eq!(total_rows, 5);
344
345        // Read the dataframe from 'output3.parquet.snappy.parquet' with the wrong file extension.
346        let read_df = ctx
347            .read_parquet(
348                &path2,
349                ParquetReadOptions {
350                    ..Default::default()
351                },
352            )
353            .await;
354        let binding = DataFilePaths::to_urls(&path2).unwrap();
355        let expected_path = binding[0].as_str();
356        assert_eq!(
357            read_df.unwrap_err().strip_backtrace(),
358            format!("Execution error: File path '{expected_path}' does not match the expected extension '.parquet'")
359        );
360
361        // Read the dataframe from 'output3.parquet.snappy.parquet' with the correct file extension.
362        let read_df = ctx
363            .read_parquet(
364                &path3,
365                ParquetReadOptions {
366                    ..Default::default()
367                },
368            )
369            .await?;
370
371        let results = read_df.collect().await?;
372        let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum();
373        assert_eq!(total_rows, 5);
374
375        // Read the dataframe from 'output4/'
376        std::fs::create_dir(&path4)?;
377        let read_df = ctx
378            .read_parquet(
379                &path4,
380                ParquetReadOptions {
381                    ..Default::default()
382                },
383            )
384            .await?;
385
386        let results = read_df.collect().await?;
387        let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum();
388        assert_eq!(total_rows, 0);
389
390        // Read the dataframe from double dot folder;
391        let read_df = ctx
392            .read_parquet(
393                &path5,
394                ParquetReadOptions {
395                    ..Default::default()
396                },
397            )
398            .await?;
399
400        let results = read_df.collect().await?;
401        let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum();
402        assert_eq!(total_rows, 5);
403        Ok(())
404    }
405
406    #[tokio::test]
407    async fn read_from_parquet_folder() -> Result<()> {
408        let ctx = SessionContext::new();
409        let tmp_dir = TempDir::new()?;
410        let test_path = tmp_dir.path().to_str().unwrap().to_string();
411
412        ctx.sql("SELECT 1 a")
413            .await?
414            .write_parquet(&test_path, DataFrameWriteOptions::default(), None)
415            .await?;
416
417        ctx.sql("SELECT 2 a")
418            .await?
419            .write_parquet(&test_path, DataFrameWriteOptions::default(), None)
420            .await?;
421
422        // Adding CSV to check it is not read with Parquet reader
423        ctx.sql("SELECT 3 a")
424            .await?
425            .write_csv(&test_path, DataFrameWriteOptions::default(), None)
426            .await?;
427
428        let actual = ctx
429            .read_parquet(&test_path, ParquetReadOptions::default())
430            .await?
431            .collect()
432            .await?;
433
434        #[cfg_attr(any(), rustfmt::skip)]
435        assert_batches_sorted_eq!(&[
436            "+---+",
437            "| a |",
438            "+---+",
439            "| 2 |",
440            "| 1 |",
441            "+---+",
442        ], &actual);
443
444        let actual = ctx
445            .read_parquet(test_path, ParquetReadOptions::default())
446            .await?
447            .collect()
448            .await?;
449
450        #[cfg_attr(any(), rustfmt::skip)]
451        assert_batches_sorted_eq!(&[
452            "+---+",
453            "| a |",
454            "+---+",
455            "| 2 |",
456            "| 1 |",
457            "+---+",
458        ], &actual);
459
460        Ok(())
461    }
462
463    #[tokio::test]
464    async fn read_from_parquet_folder_table() -> Result<()> {
465        let ctx = SessionContext::new();
466        let tmp_dir = TempDir::new()?;
467        let test_path = tmp_dir.path().to_str().unwrap().to_string();
468
469        ctx.sql("SELECT 1 a")
470            .await?
471            .write_parquet(&test_path, DataFrameWriteOptions::default(), None)
472            .await?;
473
474        ctx.sql("SELECT 2 a")
475            .await?
476            .write_parquet(&test_path, DataFrameWriteOptions::default(), None)
477            .await?;
478
479        // Adding CSV to check it is not read with Parquet reader
480        ctx.sql("SELECT 3 a")
481            .await?
482            .write_csv(&test_path, DataFrameWriteOptions::default(), None)
483            .await?;
484
485        ctx.sql(format!("CREATE EXTERNAL TABLE parquet_folder_t1 STORED AS PARQUET LOCATION '{test_path}'").as_ref())
486            .await?;
487
488        let actual = ctx
489            .sql("select * from parquet_folder_t1")
490            .await?
491            .collect()
492            .await?;
493        #[cfg_attr(any(), rustfmt::skip)]
494        assert_batches_sorted_eq!(&[
495            "+---+",
496            "| a |",
497            "+---+",
498            "| 2 |",
499            "| 1 |",
500            "+---+",
501        ], &actual);
502
503        Ok(())
504    }
505
506    #[tokio::test]
507    async fn read_dummy_folder() -> Result<()> {
508        let ctx = SessionContext::new();
509        let test_path = "/foo/";
510
511        let actual = ctx
512            .read_parquet(test_path, ParquetReadOptions::default())
513            .await?
514            .collect()
515            .await?;
516
517        #[cfg_attr(any(), rustfmt::skip)]
518        assert_batches_eq!(&[
519            "++",
520            "++",
521        ], &actual);
522
523        Ok(())
524    }
525}