datafusion/execution/context/
parquet.rs1use std::sync::Arc;
19
20use super::super::options::{ParquetReadOptions, ReadOptions};
21use super::{DataFilePaths, DataFrame, ExecutionPlan, Result, SessionContext};
22use datafusion_datasource_parquet::plan_to_parquet;
23
24use datafusion_common::TableReference;
25use parquet::file::properties::WriterProperties;
26
27impl SessionContext {
28 pub async fn read_parquet<P: DataFilePaths>(
50 &self,
51 table_paths: P,
52 options: ParquetReadOptions<'_>,
53 ) -> Result<DataFrame> {
54 self._read_type(table_paths, options).await
55 }
56
57 pub async fn register_parquet(
67 &self,
68 table_ref: impl Into<TableReference>,
69 table_path: impl AsRef<str>,
70 options: ParquetReadOptions<'_>,
71 ) -> Result<()> {
72 let listing_options = options
73 .to_listing_options(&self.copied_config(), self.copied_table_options());
74
75 self.register_type_check(table_path.as_ref(), &listing_options.file_extension)?;
76
77 self.register_listing_table(
78 table_ref,
79 table_path,
80 listing_options,
81 options.schema.map(|s| Arc::new(s.to_owned())),
82 None,
83 )
84 .await?;
85 Ok(())
86 }
87
88 pub async fn write_parquet(
90 &self,
91 plan: Arc<dyn ExecutionPlan>,
92 path: impl AsRef<str>,
93 writer_properties: Option<WriterProperties>,
94 ) -> Result<()> {
95 plan_to_parquet(self.task_ctx(), plan, path, writer_properties).await
96 }
97}
98
99#[cfg(test)]
100mod tests {
101 use super::*;
102 use crate::arrow::array::{Float32Array, Int32Array};
103 use crate::arrow::datatypes::{DataType, Field, Schema};
104 use crate::arrow::record_batch::RecordBatch;
105 use crate::dataframe::DataFrameWriteOptions;
106 use crate::parquet::basic::Compression;
107 use crate::test_util::parquet_test_data;
108
109 use arrow::util::pretty::pretty_format_batches;
110 use datafusion_common::config::TableParquetOptions;
111 use datafusion_common::{
112 assert_batches_eq, assert_batches_sorted_eq, assert_contains,
113 };
114 use datafusion_execution::config::SessionConfig;
115
116 use tempfile::{tempdir, TempDir};
117
118 #[tokio::test]
119 async fn read_with_glob_path() -> Result<()> {
120 let ctx = SessionContext::new();
121
122 let df = ctx
123 .read_parquet(
124 format!("{}/alltypes_plain*.parquet", parquet_test_data()),
125 ParquetReadOptions::default(),
126 )
127 .await?;
128 let results = df.collect().await?;
129 let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum();
130 assert_eq!(total_rows, 10);
132 Ok(())
133 }
134
135 #[tokio::test]
136 async fn read_with_glob_path_issue_2465() -> Result<()> {
137 let config =
138 SessionConfig::from_string_hash_map(&std::collections::HashMap::from([(
139 "datafusion.execution.listing_table_ignore_subdirectory".to_owned(),
140 "false".to_owned(),
141 )]))?;
142 let ctx = SessionContext::new_with_config(config);
143 let df = ctx
144 .read_parquet(
145 format!("{}/..//*/alltypes_plain*.parquet", parquet_test_data()),
148 ParquetReadOptions::default(),
149 )
150 .await?;
151 let results = df.collect().await?;
152 let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum();
153 assert_eq!(total_rows, 10);
155 Ok(())
156 }
157
158 async fn explain_query_all_with_config(config: SessionConfig) -> Result<String> {
159 let ctx = SessionContext::new_with_config(config);
160
161 ctx.register_parquet(
162 "test",
163 &format!("{}/alltypes_plain*.parquet", parquet_test_data()),
164 ParquetReadOptions::default(),
165 )
166 .await?;
167 let df = ctx.sql("EXPLAIN SELECT * FROM test").await?;
168 let results = df.collect().await?;
169 let content = pretty_format_batches(&results).unwrap().to_string();
170 Ok(content)
171 }
172
173 #[tokio::test]
174 async fn register_parquet_respects_collect_statistics_config() -> Result<()> {
175 let mut config = SessionConfig::new();
177 config.options_mut().explain.physical_plan_only = true;
178 config.options_mut().explain.show_statistics = true;
179 let content = explain_query_all_with_config(config).await?;
180 assert_contains!(content, "statistics=[Rows=Exact(");
181
182 let mut config = SessionConfig::new();
184 config.options_mut().explain.physical_plan_only = true;
185 config.options_mut().explain.show_statistics = true;
186 config.options_mut().execution.collect_statistics = true;
187 let content = explain_query_all_with_config(config).await?;
188 assert_contains!(content, "statistics=[Rows=Exact(");
189
190 let mut config = SessionConfig::new();
192 config.options_mut().explain.physical_plan_only = true;
193 config.options_mut().explain.show_statistics = true;
194 config.options_mut().execution.collect_statistics = false;
195 let content = explain_query_all_with_config(config).await?;
196 assert_contains!(content, "statistics=[Rows=Absent,");
197
198 Ok(())
199 }
200
201 #[tokio::test]
202 async fn read_from_registered_table_with_glob_path() -> Result<()> {
203 let ctx = SessionContext::new();
204
205 ctx.register_parquet(
206 "test",
207 &format!("{}/alltypes_plain*.parquet", parquet_test_data()),
208 ParquetReadOptions::default(),
209 )
210 .await?;
211 let df = ctx.sql("SELECT * FROM test").await?;
212 let results = df.collect().await?;
213 let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum();
214 assert_eq!(total_rows, 10);
216 Ok(())
217 }
218
219 #[tokio::test]
220 async fn read_from_different_file_extension() -> Result<()> {
221 let ctx = SessionContext::new();
222 let sep = std::path::MAIN_SEPARATOR.to_string();
223
224 let write_df = ctx.read_batch(RecordBatch::try_new(
226 Arc::new(Schema::new(vec![
227 Field::new("purchase_id", DataType::Int32, false),
228 Field::new("price", DataType::Float32, false),
229 Field::new("quantity", DataType::Int32, false),
230 ])),
231 vec![
232 Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])),
233 Arc::new(Float32Array::from(vec![1.12, 3.40, 2.33, 9.10, 6.66])),
234 Arc::new(Int32Array::from(vec![1, 3, 2, 4, 3])),
235 ],
236 )?)?;
237
238 let temp_dir = tempdir()?;
239 let temp_dir_path = temp_dir.path();
240 let path1 = temp_dir_path
241 .join("output1.parquet")
242 .to_str()
243 .unwrap()
244 .to_string();
245 let path2 = temp_dir_path
246 .join("output2.parquet.snappy")
247 .to_str()
248 .unwrap()
249 .to_string();
250 let path3 = temp_dir_path
251 .join("output3.parquet.snappy.parquet")
252 .to_str()
253 .unwrap()
254 .to_string();
255
256 let path4 = temp_dir_path
257 .join("output4.parquet".to_owned() + &sep)
258 .to_str()
259 .unwrap()
260 .to_string();
261
262 let path5 = temp_dir_path
263 .join("bbb..bbb")
264 .join("filename.parquet")
265 .to_str()
266 .unwrap()
267 .to_string();
268 let dir = temp_dir_path
269 .join("bbb..bbb".to_owned() + &sep)
270 .to_str()
271 .unwrap()
272 .to_string();
273 std::fs::create_dir(dir).expect("create dir failed");
274
275 let mut options = TableParquetOptions::default();
276 options.global.compression = Some(Compression::SNAPPY.to_string());
277
278 write_df
280 .clone()
281 .write_parquet(
282 &path1,
283 DataFrameWriteOptions::new().with_single_file_output(true),
284 Some(options.clone()),
285 )
286 .await?;
287
288 write_df
290 .clone()
291 .write_parquet(
292 &path2,
293 DataFrameWriteOptions::new().with_single_file_output(true),
294 Some(options.clone()),
295 )
296 .await?;
297
298 write_df
300 .clone()
301 .write_parquet(
302 &path3,
303 DataFrameWriteOptions::new().with_single_file_output(true),
304 Some(options.clone()),
305 )
306 .await?;
307
308 write_df
310 .write_parquet(
311 &path5,
312 DataFrameWriteOptions::new().with_single_file_output(true),
313 Some(options),
314 )
315 .await?;
316
317 let read_df = ctx
319 .read_parquet(
320 &path1,
321 ParquetReadOptions {
322 ..Default::default()
323 },
324 )
325 .await?;
326
327 let results = read_df.collect().await?;
328 let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum();
329 assert_eq!(total_rows, 5);
330
331 let read_df = ctx
333 .read_parquet(
334 &path2,
335 ParquetReadOptions {
336 file_extension: "snappy",
337 ..Default::default()
338 },
339 )
340 .await?;
341 let results = read_df.collect().await?;
342 let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum();
343 assert_eq!(total_rows, 5);
344
345 let read_df = ctx
347 .read_parquet(
348 &path2,
349 ParquetReadOptions {
350 ..Default::default()
351 },
352 )
353 .await;
354 let binding = DataFilePaths::to_urls(&path2).unwrap();
355 let expected_path = binding[0].as_str();
356 assert_eq!(
357 read_df.unwrap_err().strip_backtrace(),
358 format!("Execution error: File path '{expected_path}' does not match the expected extension '.parquet'")
359 );
360
361 let read_df = ctx
363 .read_parquet(
364 &path3,
365 ParquetReadOptions {
366 ..Default::default()
367 },
368 )
369 .await?;
370
371 let results = read_df.collect().await?;
372 let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum();
373 assert_eq!(total_rows, 5);
374
375 std::fs::create_dir(&path4)?;
377 let read_df = ctx
378 .read_parquet(
379 &path4,
380 ParquetReadOptions {
381 ..Default::default()
382 },
383 )
384 .await?;
385
386 let results = read_df.collect().await?;
387 let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum();
388 assert_eq!(total_rows, 0);
389
390 let read_df = ctx
392 .read_parquet(
393 &path5,
394 ParquetReadOptions {
395 ..Default::default()
396 },
397 )
398 .await?;
399
400 let results = read_df.collect().await?;
401 let total_rows: usize = results.iter().map(|rb| rb.num_rows()).sum();
402 assert_eq!(total_rows, 5);
403 Ok(())
404 }
405
406 #[tokio::test]
407 async fn read_from_parquet_folder() -> Result<()> {
408 let ctx = SessionContext::new();
409 let tmp_dir = TempDir::new()?;
410 let test_path = tmp_dir.path().to_str().unwrap().to_string();
411
412 ctx.sql("SELECT 1 a")
413 .await?
414 .write_parquet(&test_path, DataFrameWriteOptions::default(), None)
415 .await?;
416
417 ctx.sql("SELECT 2 a")
418 .await?
419 .write_parquet(&test_path, DataFrameWriteOptions::default(), None)
420 .await?;
421
422 ctx.sql("SELECT 3 a")
424 .await?
425 .write_csv(&test_path, DataFrameWriteOptions::default(), None)
426 .await?;
427
428 let actual = ctx
429 .read_parquet(&test_path, ParquetReadOptions::default())
430 .await?
431 .collect()
432 .await?;
433
434 #[cfg_attr(any(), rustfmt::skip)]
435 assert_batches_sorted_eq!(&[
436 "+---+",
437 "| a |",
438 "+---+",
439 "| 2 |",
440 "| 1 |",
441 "+---+",
442 ], &actual);
443
444 let actual = ctx
445 .read_parquet(test_path, ParquetReadOptions::default())
446 .await?
447 .collect()
448 .await?;
449
450 #[cfg_attr(any(), rustfmt::skip)]
451 assert_batches_sorted_eq!(&[
452 "+---+",
453 "| a |",
454 "+---+",
455 "| 2 |",
456 "| 1 |",
457 "+---+",
458 ], &actual);
459
460 Ok(())
461 }
462
463 #[tokio::test]
464 async fn read_from_parquet_folder_table() -> Result<()> {
465 let ctx = SessionContext::new();
466 let tmp_dir = TempDir::new()?;
467 let test_path = tmp_dir.path().to_str().unwrap().to_string();
468
469 ctx.sql("SELECT 1 a")
470 .await?
471 .write_parquet(&test_path, DataFrameWriteOptions::default(), None)
472 .await?;
473
474 ctx.sql("SELECT 2 a")
475 .await?
476 .write_parquet(&test_path, DataFrameWriteOptions::default(), None)
477 .await?;
478
479 ctx.sql("SELECT 3 a")
481 .await?
482 .write_csv(&test_path, DataFrameWriteOptions::default(), None)
483 .await?;
484
485 ctx.sql(format!("CREATE EXTERNAL TABLE parquet_folder_t1 STORED AS PARQUET LOCATION '{test_path}'").as_ref())
486 .await?;
487
488 let actual = ctx
489 .sql("select * from parquet_folder_t1")
490 .await?
491 .collect()
492 .await?;
493 #[cfg_attr(any(), rustfmt::skip)]
494 assert_batches_sorted_eq!(&[
495 "+---+",
496 "| a |",
497 "+---+",
498 "| 2 |",
499 "| 1 |",
500 "+---+",
501 ], &actual);
502
503 Ok(())
504 }
505
506 #[tokio::test]
507 async fn read_dummy_folder() -> Result<()> {
508 let ctx = SessionContext::new();
509 let test_path = "/foo/";
510
511 let actual = ctx
512 .read_parquet(test_path, ParquetReadOptions::default())
513 .await?
514 .collect()
515 .await?;
516
517 #[cfg_attr(any(), rustfmt::skip)]
518 assert_batches_eq!(&[
519 "++",
520 "++",
521 ], &actual);
522
523 Ok(())
524 }
525}