1pub use datafusion_datasource_parquet::file_format::*;
21
22#[cfg(test)]
23pub(crate) mod test_util {
24 use arrow::array::RecordBatch;
25 use datafusion_common::Result;
26 use object_store::ObjectMeta;
27
28 use crate::test::object_store::local_unpartitioned_file;
29
30 pub async fn store_parquet(
39 batches: Vec<RecordBatch>,
40 multi_page: bool,
41 ) -> Result<(Vec<ObjectMeta>, Vec<tempfile::NamedTempFile>)> {
42 const ROWS_PER_PAGE: usize = 2;
44 fn write_in_chunks<W: std::io::Write + Send>(
46 writer: &mut parquet::arrow::ArrowWriter<W>,
47 batch: &RecordBatch,
48 chunk_size: usize,
49 ) {
50 let mut i = 0;
51 while i < batch.num_rows() {
52 let num = chunk_size.min(batch.num_rows() - i);
53 writer.write(&batch.slice(i, num)).unwrap();
54 i += num;
55 }
56 }
57
58 let tmp_files = {
61 let mut tmp_files: Vec<_> = (0..batches.len())
62 .map(|_| tempfile::NamedTempFile::new().expect("creating temp file"))
63 .collect();
64 tmp_files.sort_by(|a, b| a.path().cmp(b.path()));
65 tmp_files
66 };
67
68 let files: Vec<_> = batches
70 .into_iter()
71 .zip(tmp_files.into_iter())
72 .map(|(batch, mut output)| {
73 let mut builder = parquet::file::properties::WriterProperties::builder();
74 if multi_page {
75 builder = builder.set_data_page_row_count_limit(ROWS_PER_PAGE)
76 }
77 builder = builder.set_bloom_filter_enabled(true);
78
79 let props = builder.build();
80
81 let mut writer = parquet::arrow::ArrowWriter::try_new(
82 &mut output,
83 batch.schema(),
84 Some(props),
85 )
86 .expect("creating writer");
87
88 if multi_page {
89 write_in_chunks(&mut writer, &batch, ROWS_PER_PAGE);
92 } else {
93 writer.write(&batch).expect("Writing batch");
94 };
95 writer.close().unwrap();
96 output
97 })
98 .collect();
99
100 let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect();
101
102 Ok((meta, files))
103 }
104}
105
106#[cfg(test)]
107mod tests {
108
109 use std::fmt::{self, Display, Formatter};
110 use std::sync::atomic::{AtomicUsize, Ordering};
111 use std::sync::Arc;
112 use std::time::Duration;
113
114 use crate::datasource::file_format::parquet::test_util::store_parquet;
115 use crate::datasource::file_format::test_util::scan_format;
116 use crate::execution::SessionState;
117 use crate::physical_plan::metrics::MetricValue;
118 use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
119
120 use arrow::array::RecordBatch;
121 use arrow_schema::Schema;
122 use datafusion_catalog::Session;
123 use datafusion_common::cast::{
124 as_binary_array, as_binary_view_array, as_boolean_array, as_float32_array,
125 as_float64_array, as_int32_array, as_timestamp_nanosecond_array,
126 };
127 use datafusion_common::config::{ParquetOptions, TableParquetOptions};
128 use datafusion_common::stats::Precision;
129 use datafusion_common::test_util::batches_to_string;
130 use datafusion_common::ScalarValue::Utf8;
131 use datafusion_common::{Result, ScalarValue};
132 use datafusion_datasource::file_format::FileFormat;
133 use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
134 use datafusion_datasource::{ListingTableUrl, PartitionedFile};
135 use datafusion_datasource_parquet::{
136 ParquetFormat, ParquetFormatFactory, ParquetSink,
137 };
138 use datafusion_execution::object_store::ObjectStoreUrl;
139 use datafusion_execution::runtime_env::RuntimeEnv;
140 use datafusion_execution::TaskContext;
141 use datafusion_expr::dml::InsertOp;
142 use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
143 use datafusion_physical_plan::{collect, ExecutionPlan};
144
145 use crate::test_util::bounded_stream;
146 use arrow::array::{
147 types::Int32Type, Array, ArrayRef, DictionaryArray, Int32Array, Int64Array,
148 StringArray,
149 };
150 use arrow::datatypes::{DataType, Field};
151 use async_trait::async_trait;
152 use datafusion_datasource::file_groups::FileGroup;
153 use datafusion_datasource_parquet::metadata::DFParquetMetadata;
154 use futures::stream::BoxStream;
155 use futures::StreamExt;
156 use insta::assert_snapshot;
157 use object_store::local::LocalFileSystem;
158 use object_store::ObjectMeta;
159 use object_store::{
160 path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectStore,
161 PutMultipartOptions, PutOptions, PutPayload, PutResult,
162 };
163 use parquet::arrow::arrow_reader::ArrowReaderOptions;
164 use parquet::arrow::ParquetRecordBatchStreamBuilder;
165 use parquet::file::metadata::{
166 KeyValue, ParquetColumnIndex, ParquetMetaData, ParquetOffsetIndex,
167 };
168 use parquet::file::page_index::column_index::ColumnIndexMetaData;
169 use tokio::fs::File;
170
171 enum ForceViews {
172 Yes,
173 No,
174 }
175
176 async fn _run_read_merged_batches(force_views: ForceViews) -> Result<()> {
177 let c1: ArrayRef =
178 Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
179
180 let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
181
182 let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())])?;
183 let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)])?;
184
185 let store = Arc::new(LocalFileSystem::new()) as _;
186 let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?;
187
188 let session = SessionContext::new();
189 let ctx = session.state();
190 let force_views = match force_views {
191 ForceViews::Yes => true,
192 ForceViews::No => false,
193 };
194 let format = ParquetFormat::default().with_force_view_types(force_views);
195 let schema = format.infer_schema(&ctx, &store, &meta).await?;
196
197 let file_metadata_cache =
198 ctx.runtime_env().cache_manager.get_file_metadata_cache();
199 let stats = DFParquetMetadata::new(&store, &meta[0])
200 .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
201 .fetch_statistics(&schema)
202 .await?;
203
204 assert_eq!(stats.num_rows, Precision::Exact(3));
205 let c1_stats = &stats.column_statistics[0];
206 let c2_stats = &stats.column_statistics[1];
207 assert_eq!(c1_stats.null_count, Precision::Exact(1));
208 assert_eq!(c2_stats.null_count, Precision::Exact(3));
209
210 let stats = DFParquetMetadata::new(&store, &meta[1])
211 .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
212 .fetch_statistics(&schema)
213 .await?;
214
215 assert_eq!(stats.num_rows, Precision::Exact(3));
216 let c1_stats = &stats.column_statistics[0];
217 let c2_stats = &stats.column_statistics[1];
218 assert_eq!(c1_stats.null_count, Precision::Exact(3));
219 assert_eq!(c2_stats.null_count, Precision::Exact(1));
220 assert_eq!(
221 c2_stats.max_value,
222 Precision::Exact(ScalarValue::Int64(Some(2)))
223 );
224 assert_eq!(
225 c2_stats.min_value,
226 Precision::Exact(ScalarValue::Int64(Some(1)))
227 );
228
229 Ok(())
230 }
231
232 #[tokio::test]
233 async fn read_merged_batches() -> Result<()> {
234 _run_read_merged_batches(ForceViews::No).await?;
235 _run_read_merged_batches(ForceViews::Yes).await?;
236
237 Ok(())
238 }
239
240 #[tokio::test]
241 async fn is_schema_stable() -> Result<()> {
242 let c1: ArrayRef =
243 Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
244
245 let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
246
247 let batch1 =
248 RecordBatch::try_from_iter(vec![("a", c1.clone()), ("b", c1.clone())])?;
249 let batch2 =
250 RecordBatch::try_from_iter(vec![("c", c2.clone()), ("d", c2.clone())])?;
251
252 let store = Arc::new(LocalFileSystem::new()) as _;
253 let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?;
254
255 let session = SessionContext::new();
256 let ctx = session.state();
257 let format = ParquetFormat::default();
258 let schema = format.infer_schema(&ctx, &store, &meta).await?;
259
260 let order: Vec<_> = ["a", "b", "c", "d"]
261 .into_iter()
262 .map(|i| i.to_string())
263 .collect();
264 let coll: Vec<_> = schema
265 .flattened_fields()
266 .into_iter()
267 .map(|i| i.name().to_string())
268 .collect();
269 assert_eq!(coll, order);
270
271 Ok(())
272 }
273
274 #[derive(Debug)]
275 struct RequestCountingObjectStore {
276 inner: Arc<dyn ObjectStore>,
277 request_count: AtomicUsize,
278 }
279
280 impl Display for RequestCountingObjectStore {
281 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
282 write!(f, "RequestCounting({})", self.inner)
283 }
284 }
285
286 impl RequestCountingObjectStore {
287 pub fn new(inner: Arc<dyn ObjectStore>) -> Self {
288 Self {
289 inner,
290 request_count: Default::default(),
291 }
292 }
293
294 pub fn request_count(&self) -> usize {
295 self.request_count.load(Ordering::SeqCst)
296 }
297
298 pub fn upcast(self: &Arc<Self>) -> Arc<dyn ObjectStore> {
299 self.clone()
300 }
301 }
302
303 #[async_trait]
304 impl ObjectStore for RequestCountingObjectStore {
305 async fn put_opts(
306 &self,
307 _location: &Path,
308 _payload: PutPayload,
309 _opts: PutOptions,
310 ) -> object_store::Result<PutResult> {
311 Err(object_store::Error::NotImplemented)
312 }
313
314 async fn put_multipart_opts(
315 &self,
316 _location: &Path,
317 _opts: PutMultipartOptions,
318 ) -> object_store::Result<Box<dyn MultipartUpload>> {
319 Err(object_store::Error::NotImplemented)
320 }
321
322 async fn get_opts(
323 &self,
324 location: &Path,
325 options: GetOptions,
326 ) -> object_store::Result<GetResult> {
327 self.request_count.fetch_add(1, Ordering::SeqCst);
328 self.inner.get_opts(location, options).await
329 }
330
331 async fn head(&self, _location: &Path) -> object_store::Result<ObjectMeta> {
332 Err(object_store::Error::NotImplemented)
333 }
334
335 async fn delete(&self, _location: &Path) -> object_store::Result<()> {
336 Err(object_store::Error::NotImplemented)
337 }
338
339 fn list(
340 &self,
341 _prefix: Option<&Path>,
342 ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
343 Box::pin(futures::stream::once(async {
344 Err(object_store::Error::NotImplemented)
345 }))
346 }
347
348 async fn list_with_delimiter(
349 &self,
350 _prefix: Option<&Path>,
351 ) -> object_store::Result<ListResult> {
352 Err(object_store::Error::NotImplemented)
353 }
354
355 async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
356 Err(object_store::Error::NotImplemented)
357 }
358
359 async fn copy_if_not_exists(
360 &self,
361 _from: &Path,
362 _to: &Path,
363 ) -> object_store::Result<()> {
364 Err(object_store::Error::NotImplemented)
365 }
366 }
367
368 async fn _run_fetch_metadata_with_size_hint(force_views: ForceViews) -> Result<()> {
369 let c1: ArrayRef =
370 Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
371
372 let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
373
374 let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())])?;
375 let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)])?;
376
377 let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
378 LocalFileSystem::new(),
379 )));
380 let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?;
381
382 let session = SessionContext::new();
383 let ctx = session.state();
384
385 let file_metadata_cache =
388 ctx.runtime_env().cache_manager.get_file_metadata_cache();
389 let df_meta = DFParquetMetadata::new(store.as_ref(), &meta[0])
390 .with_metadata_size_hint(Some(9));
391 df_meta.fetch_metadata().await?;
392 assert_eq!(store.request_count(), 2);
393
394 let df_meta =
395 df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)));
396
397 df_meta.fetch_metadata().await?;
399 assert_eq!(store.request_count(), 5);
400
401 df_meta.fetch_metadata().await?;
403 assert_eq!(store.request_count(), 5);
404
405 let df_meta = df_meta.with_file_metadata_cache(None);
407 df_meta.fetch_metadata().await?;
408 assert_eq!(store.request_count(), 7);
409
410 let force_views = match force_views {
411 ForceViews::Yes => true,
412 ForceViews::No => false,
413 };
414 let format = ParquetFormat::default()
415 .with_metadata_size_hint(Some(9))
416 .with_force_view_types(force_views);
417 let _schema = format.infer_schema(&ctx, &store.upcast(), &meta).await?;
419 assert_eq!(store.request_count(), 10);
420 let schema = format.infer_schema(&ctx, &store.upcast(), &meta).await?;
422 assert_eq!(store.request_count(), 10);
423
424 let df_meta =
426 df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)));
427 let stats = df_meta.fetch_statistics(&schema).await?;
428 assert_eq!(store.request_count(), 10);
429
430 assert_eq!(stats.num_rows, Precision::Exact(3));
431 let c1_stats = &stats.column_statistics[0];
432 let c2_stats = &stats.column_statistics[1];
433 assert_eq!(c1_stats.null_count, Precision::Exact(1));
434 assert_eq!(c2_stats.null_count, Precision::Exact(3));
435
436 let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
437 LocalFileSystem::new(),
438 )));
439
440 let size_hint = meta[0].size as usize;
442 let df_meta = DFParquetMetadata::new(store.as_ref(), &meta[0])
443 .with_metadata_size_hint(Some(size_hint));
444
445 df_meta.fetch_metadata().await?;
446 assert_eq!(store.request_count(), 1);
448
449 let session = SessionContext::new();
450 let ctx = session.state();
451 let file_metadata_cache =
452 ctx.runtime_env().cache_manager.get_file_metadata_cache();
453 let df_meta =
454 df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)));
455 df_meta.fetch_metadata().await?;
457 assert_eq!(store.request_count(), 2);
458
459 df_meta.fetch_metadata().await?;
461 assert_eq!(store.request_count(), 2);
462
463 let df_meta = df_meta.with_file_metadata_cache(None);
465 df_meta.fetch_metadata().await?;
466 assert_eq!(store.request_count(), 3);
467
468 let format = ParquetFormat::default()
469 .with_metadata_size_hint(Some(size_hint))
470 .with_force_view_types(force_views);
471 let _schema = format.infer_schema(&ctx, &store.upcast(), &meta).await?;
473 assert_eq!(store.request_count(), 4);
474 let schema = format.infer_schema(&ctx, &store.upcast(), &meta).await?;
476 assert_eq!(store.request_count(), 4);
477 let df_meta =
479 df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)));
480 let stats = df_meta.fetch_statistics(&schema).await?;
481 assert_eq!(store.request_count(), 4);
482
483 assert_eq!(stats.num_rows, Precision::Exact(3));
484 let c1_stats = &stats.column_statistics[0];
485 let c2_stats = &stats.column_statistics[1];
486 assert_eq!(c1_stats.null_count, Precision::Exact(1));
487 assert_eq!(c2_stats.null_count, Precision::Exact(3));
488
489 let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
490 LocalFileSystem::new(),
491 )));
492
493 let size_hint = (meta[0].size + 100) as usize;
495 let df_meta = DFParquetMetadata::new(store.as_ref(), &meta[0])
496 .with_metadata_size_hint(Some(size_hint));
497
498 df_meta.fetch_metadata().await?;
499 assert_eq!(store.request_count(), 1);
500
501 let df_meta =
503 df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)));
504 df_meta.fetch_metadata().await?;
505 assert_eq!(store.request_count(), 1);
506
507 Ok(())
508 }
509
510 #[tokio::test]
511 async fn fetch_metadata_with_size_hint() -> Result<()> {
512 _run_fetch_metadata_with_size_hint(ForceViews::No).await?;
513 _run_fetch_metadata_with_size_hint(ForceViews::Yes).await?;
514
515 Ok(())
516 }
517
518 #[tokio::test]
519 async fn test_statistics_from_parquet_metadata_dictionary() -> Result<()> {
520 let values = StringArray::from_iter_values(["a", "b", "c", "d"]);
522 let keys = Int32Array::from_iter_values([0, 1, 2, 3]);
523 let dic_array = DictionaryArray::<Int32Type>::try_new(keys, Arc::new(values))?;
524 let c_dic: ArrayRef = Arc::new(dic_array);
525
526 let string_truncation: ArrayRef = Arc::new(StringArray::from(vec![
528 Some("a".repeat(128)),
529 None,
530 Some("b".repeat(128)),
531 None,
532 ]));
533
534 let batch1 = RecordBatch::try_from_iter(vec![
535 ("c_dic", c_dic),
536 ("string_truncation", string_truncation),
537 ])?;
538
539 let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
544 LocalFileSystem::new(),
545 )));
546 let (files, _file_names) = store_parquet(vec![batch1], false).await?;
547
548 let state = SessionContext::new().state();
549 let format = ParquetFormat::default().with_metadata_size_hint(None);
551 let _schema = format.infer_schema(&state, &store.upcast(), &files).await?;
552 assert_eq!(store.request_count(), 3);
553 let schema = format.infer_schema(&state, &store.upcast(), &files).await?;
555 assert_eq!(store.request_count(), 3);
556
557 let file_metadata_cache =
559 state.runtime_env().cache_manager.get_file_metadata_cache();
560 let stats = DFParquetMetadata::new(store.as_ref(), &files[0])
561 .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
562 .fetch_statistics(&schema)
563 .await?;
564 assert_eq!(stats.num_rows, Precision::Exact(4));
565
566 let c_dic_stats = &stats.column_statistics[0];
568
569 assert_eq!(c_dic_stats.null_count, Precision::Exact(0));
570 assert_eq!(
571 c_dic_stats.max_value,
572 Precision::Exact(Utf8(Some("d".into())))
573 );
574 assert_eq!(
575 c_dic_stats.min_value,
576 Precision::Exact(Utf8(Some("a".into())))
577 );
578
579 let string_truncation_stats = &stats.column_statistics[1];
581
582 assert_eq!(string_truncation_stats.null_count, Precision::Exact(2));
583 assert_eq!(
584 string_truncation_stats.max_value,
585 Precision::Inexact(ScalarValue::Utf8View(Some("b".repeat(63) + "c")))
586 );
587 assert_eq!(
588 string_truncation_stats.min_value,
589 Precision::Inexact(ScalarValue::Utf8View(Some("a".repeat(64))))
590 );
591
592 Ok(())
593 }
594
595 async fn _run_test_statistics_from_parquet_metadata(
596 force_views: ForceViews,
597 ) -> Result<()> {
598 let c1: ArrayRef =
600 Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
601 let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())])?;
602
603 let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
605 let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)])?;
606
607 let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
613 LocalFileSystem::new(),
614 )));
615 let (files, _file_names) = store_parquet(vec![batch1, batch2], false).await?;
616
617 let force_views = match force_views {
618 ForceViews::Yes => true,
619 ForceViews::No => false,
620 };
621
622 let mut state = SessionContext::new().state();
623 state = set_view_state(state, force_views);
624 let format = ParquetFormat::default()
625 .with_force_view_types(force_views)
626 .with_metadata_size_hint(None);
627 let schema = format.infer_schema(&state, &store.upcast(), &files).await?;
628 assert_eq!(store.request_count(), 6);
629
630 let null_i64 = ScalarValue::Int64(None);
631 let null_utf8 = if force_views {
632 ScalarValue::Utf8View(None)
633 } else {
634 Utf8(None)
635 };
636
637 let file_metadata_cache =
639 state.runtime_env().cache_manager.get_file_metadata_cache();
640 let stats = DFParquetMetadata::new(store.as_ref(), &files[0])
641 .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
642 .fetch_statistics(&schema)
643 .await?;
644 assert_eq!(store.request_count(), 6);
645 assert_eq!(stats.num_rows, Precision::Exact(3));
646 let c1_stats = &stats.column_statistics[0];
648 assert_eq!(c1_stats.null_count, Precision::Exact(1));
649 let expected_type = if force_views {
650 ScalarValue::Utf8View
651 } else {
652 Utf8
653 };
654 assert_eq!(
655 c1_stats.max_value,
656 Precision::Exact(expected_type(Some("bar".to_string())))
657 );
658 assert_eq!(
659 c1_stats.min_value,
660 Precision::Exact(expected_type(Some("Foo".to_string())))
661 );
662 let c2_stats = &stats.column_statistics[1];
664 assert_eq!(c2_stats.null_count, Precision::Exact(3));
665 assert_eq!(c2_stats.max_value, Precision::Exact(null_i64.clone()));
666 assert_eq!(c2_stats.min_value, Precision::Exact(null_i64.clone()));
667
668 let stats = DFParquetMetadata::new(store.as_ref(), &files[1])
670 .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
671 .fetch_statistics(&schema)
672 .await?;
673 assert_eq!(store.request_count(), 6);
674 assert_eq!(stats.num_rows, Precision::Exact(3));
675 let c1_stats = &stats.column_statistics[0];
677 assert_eq!(c1_stats.null_count, Precision::Exact(3));
678 assert_eq!(c1_stats.max_value, Precision::Exact(null_utf8.clone()));
679 assert_eq!(c1_stats.min_value, Precision::Exact(null_utf8.clone()));
680 let c2_stats = &stats.column_statistics[1];
682 assert_eq!(c2_stats.null_count, Precision::Exact(1));
683 assert_eq!(c2_stats.max_value, Precision::Exact(2i64.into()));
684 assert_eq!(c2_stats.min_value, Precision::Exact(1i64.into()));
685
686 Ok(())
687 }
688
689 #[tokio::test]
690 async fn test_statistics_from_parquet_metadata() -> Result<()> {
691 _run_test_statistics_from_parquet_metadata(ForceViews::No).await?;
692
693 _run_test_statistics_from_parquet_metadata(ForceViews::Yes).await?;
694
695 Ok(())
696 }
697
698 #[tokio::test]
699 async fn read_small_batches() -> Result<()> {
700 let config = SessionConfig::new().with_batch_size(2);
701 let session_ctx = SessionContext::new_with_config(config);
702 let state = session_ctx.state();
703 let task_ctx = state.task_ctx();
704 let projection = None;
705 let exec = get_exec(&state, "alltypes_plain.parquet", projection, None).await?;
706 let stream = exec.execute(0, task_ctx)?;
707
708 let tt_batches = stream
709 .map(|batch| {
710 let batch = batch.unwrap();
711 assert_eq!(11, batch.num_columns());
712 assert_eq!(2, batch.num_rows());
713 })
714 .fold(0, |acc, _| async move { acc + 1i32 })
715 .await;
716
717 assert_eq!(tt_batches, 4 );
718
719 assert_eq!(
721 exec.partition_statistics(None)?.num_rows,
722 Precision::Exact(8)
723 );
724 assert_eq!(
726 exec.partition_statistics(None)?.total_byte_size,
727 Precision::Exact(671)
728 );
729
730 Ok(())
731 }
732
733 #[tokio::test]
734 async fn capture_bytes_scanned_metric() -> Result<()> {
735 let config = SessionConfig::new().with_batch_size(2);
736 let session = SessionContext::new_with_config(config);
737 let ctx = session.state();
738
739 let projection = None;
741 let exec = get_exec(&ctx, "alltypes_plain.parquet", projection, None).await?;
742
743 let projection = Some(vec![0]);
745 let exec_projected =
746 get_exec(&ctx, "alltypes_plain.parquet", projection, None).await?;
747
748 let task_ctx = ctx.task_ctx();
749
750 let _ = collect(exec.clone(), task_ctx.clone()).await?;
751 let _ = collect(exec_projected.clone(), task_ctx).await?;
752
753 assert_bytes_scanned(exec, 671);
754 assert_bytes_scanned(exec_projected, 73);
755
756 Ok(())
757 }
758
759 #[tokio::test]
760 async fn read_limit() -> Result<()> {
761 let session_ctx = SessionContext::new();
762 let state = session_ctx.state();
763 let task_ctx = state.task_ctx();
764 let projection = None;
765 let exec =
766 get_exec(&state, "alltypes_plain.parquet", projection, Some(1)).await?;
767
768 assert_eq!(
770 exec.partition_statistics(None)?.num_rows,
771 Precision::Exact(8)
772 );
773 assert_eq!(
775 exec.partition_statistics(None)?.total_byte_size,
776 Precision::Exact(671)
777 );
778 let batches = collect(exec, task_ctx).await?;
779 assert_eq!(1, batches.len());
780 assert_eq!(11, batches[0].num_columns());
781 assert_eq!(1, batches[0].num_rows());
782
783 Ok(())
784 }
785
786 fn set_view_state(mut state: SessionState, use_views: bool) -> SessionState {
787 let mut options = TableParquetOptions::default();
788 options.global.schema_force_view_types = use_views;
789 state
790 .register_file_format(
791 Arc::new(ParquetFormatFactory::new_with_options(options)),
792 true,
793 )
794 .expect("ok");
795 state
796 }
797
798 async fn _run_read_alltypes_plain_parquet(
799 force_views: ForceViews,
800 expected: &str,
801 ) -> Result<()> {
802 let force_views = match force_views {
803 ForceViews::Yes => true,
804 ForceViews::No => false,
805 };
806
807 let session_ctx = SessionContext::new();
808 let mut state = session_ctx.state();
809 state = set_view_state(state, force_views);
810
811 let task_ctx = state.task_ctx();
812 let projection = None;
813 let exec = get_exec(&state, "alltypes_plain.parquet", projection, None).await?;
814
815 let x: Vec<String> = exec
816 .schema()
817 .fields()
818 .iter()
819 .map(|f| format!("{}: {:?}", f.name(), f.data_type()))
820 .collect();
821 let y = x.join("\n");
822 assert_eq!(expected, y);
823
824 let batches = collect(exec, task_ctx).await?;
825
826 assert_eq!(1, batches.len());
827 assert_eq!(11, batches[0].num_columns());
828 assert_eq!(8, batches[0].num_rows());
829
830 Ok(())
831 }
832
833 #[tokio::test]
834 async fn read_alltypes_plain_parquet() -> Result<()> {
835 let no_views = "id: Int32\n\
836 bool_col: Boolean\n\
837 tinyint_col: Int32\n\
838 smallint_col: Int32\n\
839 int_col: Int32\n\
840 bigint_col: Int64\n\
841 float_col: Float32\n\
842 double_col: Float64\n\
843 date_string_col: Binary\n\
844 string_col: Binary\n\
845 timestamp_col: Timestamp(Nanosecond, None)";
846 _run_read_alltypes_plain_parquet(ForceViews::No, no_views).await?;
847
848 let with_views = "id: Int32\n\
849 bool_col: Boolean\n\
850 tinyint_col: Int32\n\
851 smallint_col: Int32\n\
852 int_col: Int32\n\
853 bigint_col: Int64\n\
854 float_col: Float32\n\
855 double_col: Float64\n\
856 date_string_col: BinaryView\n\
857 string_col: BinaryView\n\
858 timestamp_col: Timestamp(Nanosecond, None)";
859 _run_read_alltypes_plain_parquet(ForceViews::Yes, with_views).await?;
860
861 Ok(())
862 }
863
864 #[tokio::test]
865 async fn read_bool_alltypes_plain_parquet() -> Result<()> {
866 let session_ctx = SessionContext::new();
867 let state = session_ctx.state();
868 let task_ctx = state.task_ctx();
869 let projection = Some(vec![1]);
870 let exec = get_exec(&state, "alltypes_plain.parquet", projection, None).await?;
871
872 let batches = collect(exec, task_ctx).await?;
873 assert_eq!(1, batches.len());
874 assert_eq!(1, batches[0].num_columns());
875 assert_eq!(8, batches[0].num_rows());
876
877 let array = as_boolean_array(batches[0].column(0))?;
878 let mut values: Vec<bool> = vec![];
879 for i in 0..batches[0].num_rows() {
880 values.push(array.value(i));
881 }
882
883 assert_eq!(
884 "[true, false, true, false, true, false, true, false]",
885 format!("{values:?}")
886 );
887
888 Ok(())
889 }
890
891 #[tokio::test]
892 async fn read_i32_alltypes_plain_parquet() -> Result<()> {
893 let session_ctx = SessionContext::new();
894 let state = session_ctx.state();
895 let task_ctx = state.task_ctx();
896 let projection = Some(vec![0]);
897 let exec = get_exec(&state, "alltypes_plain.parquet", projection, None).await?;
898
899 let batches = collect(exec, task_ctx).await?;
900 assert_eq!(1, batches.len());
901 assert_eq!(1, batches[0].num_columns());
902 assert_eq!(8, batches[0].num_rows());
903
904 let array = as_int32_array(batches[0].column(0))?;
905 let mut values: Vec<i32> = vec![];
906 for i in 0..batches[0].num_rows() {
907 values.push(array.value(i));
908 }
909
910 assert_eq!("[4, 5, 6, 7, 2, 3, 0, 1]", format!("{values:?}"));
911
912 Ok(())
913 }
914
915 #[tokio::test]
916 async fn read_i96_alltypes_plain_parquet() -> Result<()> {
917 let session_ctx = SessionContext::new();
918 let state = session_ctx.state();
919 let task_ctx = state.task_ctx();
920 let projection = Some(vec![10]);
921 let exec = get_exec(&state, "alltypes_plain.parquet", projection, None).await?;
922
923 let batches = collect(exec, task_ctx).await?;
924 assert_eq!(1, batches.len());
925 assert_eq!(1, batches[0].num_columns());
926 assert_eq!(8, batches[0].num_rows());
927
928 let array = as_timestamp_nanosecond_array(batches[0].column(0))?;
929 let mut values: Vec<i64> = vec![];
930 for i in 0..batches[0].num_rows() {
931 values.push(array.value(i));
932 }
933
934 assert_eq!("[1235865600000000000, 1235865660000000000, 1238544000000000000, 1238544060000000000, 1233446400000000000, 1233446460000000000, 1230768000000000000, 1230768060000000000]", format!("{values:?}"));
935
936 Ok(())
937 }
938
939 #[tokio::test]
940 async fn read_f32_alltypes_plain_parquet() -> Result<()> {
941 let session_ctx = SessionContext::new();
942 let state = session_ctx.state();
943 let task_ctx = state.task_ctx();
944 let projection = Some(vec![6]);
945 let exec = get_exec(&state, "alltypes_plain.parquet", projection, None).await?;
946
947 let batches = collect(exec, task_ctx).await?;
948 assert_eq!(1, batches.len());
949 assert_eq!(1, batches[0].num_columns());
950 assert_eq!(8, batches[0].num_rows());
951
952 let array = as_float32_array(batches[0].column(0))?;
953 let mut values: Vec<f32> = vec![];
954 for i in 0..batches[0].num_rows() {
955 values.push(array.value(i));
956 }
957
958 assert_eq!(
959 "[0.0, 1.1, 0.0, 1.1, 0.0, 1.1, 0.0, 1.1]",
960 format!("{values:?}")
961 );
962
963 Ok(())
964 }
965
966 #[tokio::test]
967 async fn read_f64_alltypes_plain_parquet() -> Result<()> {
968 let session_ctx = SessionContext::new();
969 let state = session_ctx.state();
970 let task_ctx = state.task_ctx();
971 let projection = Some(vec![7]);
972 let exec = get_exec(&state, "alltypes_plain.parquet", projection, None).await?;
973
974 let batches = collect(exec, task_ctx).await?;
975 assert_eq!(1, batches.len());
976 assert_eq!(1, batches[0].num_columns());
977 assert_eq!(8, batches[0].num_rows());
978
979 let array = as_float64_array(batches[0].column(0))?;
980 let mut values: Vec<f64> = vec![];
981 for i in 0..batches[0].num_rows() {
982 values.push(array.value(i));
983 }
984
985 assert_eq!(
986 "[0.0, 10.1, 0.0, 10.1, 0.0, 10.1, 0.0, 10.1]",
987 format!("{values:?}")
988 );
989
990 Ok(())
991 }
992
993 #[tokio::test]
994 async fn read_binary_alltypes_plain_parquet() -> Result<()> {
995 let session_ctx = SessionContext::new();
996 let mut state = session_ctx.state();
997 state = set_view_state(state, false);
998
999 let task_ctx = state.task_ctx();
1000 let projection = Some(vec![9]);
1001 let exec = get_exec(&state, "alltypes_plain.parquet", projection, None).await?;
1002
1003 let batches = collect(exec, task_ctx).await?;
1004 assert_eq!(1, batches.len());
1005 assert_eq!(1, batches[0].num_columns());
1006 assert_eq!(8, batches[0].num_rows());
1007
1008 let array = as_binary_array(batches[0].column(0))?;
1009 let mut values: Vec<&str> = vec![];
1010 for i in 0..batches[0].num_rows() {
1011 values.push(std::str::from_utf8(array.value(i)).unwrap());
1012 }
1013
1014 assert_eq!(
1015 "[\"0\", \"1\", \"0\", \"1\", \"0\", \"1\", \"0\", \"1\"]",
1016 format!("{values:?}")
1017 );
1018
1019 Ok(())
1020 }
1021
1022 #[tokio::test]
1023 async fn read_binaryview_alltypes_plain_parquet() -> Result<()> {
1024 let session_ctx = SessionContext::new();
1025 let mut state = session_ctx.state();
1026 state = set_view_state(state, true);
1027
1028 let task_ctx = state.task_ctx();
1029 let projection = Some(vec![9]);
1030 let exec = get_exec(&state, "alltypes_plain.parquet", projection, None).await?;
1031
1032 let batches = collect(exec, task_ctx).await?;
1033 assert_eq!(1, batches.len());
1034 assert_eq!(1, batches[0].num_columns());
1035 assert_eq!(8, batches[0].num_rows());
1036
1037 let array = as_binary_view_array(batches[0].column(0))?;
1038 let mut values: Vec<&str> = vec![];
1039 for i in 0..batches[0].num_rows() {
1040 values.push(std::str::from_utf8(array.value(i)).unwrap());
1041 }
1042
1043 assert_eq!(
1044 "[\"0\", \"1\", \"0\", \"1\", \"0\", \"1\", \"0\", \"1\"]",
1045 format!("{values:?}")
1046 );
1047
1048 Ok(())
1049 }
1050
1051 #[tokio::test]
1052 async fn read_decimal_parquet() -> Result<()> {
1053 let session_ctx = SessionContext::new();
1054 let state = session_ctx.state();
1055 let task_ctx = state.task_ctx();
1056
1057 let exec = get_exec(&state, "int32_decimal.parquet", None, None).await?;
1059 let batches = collect(exec, task_ctx.clone()).await?;
1060 assert_eq!(1, batches.len());
1061 assert_eq!(1, batches[0].num_columns());
1062 let column = batches[0].column(0);
1063 assert_eq!(&DataType::Decimal128(4, 2), column.data_type());
1064
1065 let exec = get_exec(&state, "int64_decimal.parquet", None, None).await?;
1067 let batches = collect(exec, task_ctx.clone()).await?;
1068 assert_eq!(1, batches.len());
1069 assert_eq!(1, batches[0].num_columns());
1070 let column = batches[0].column(0);
1071 assert_eq!(&DataType::Decimal128(10, 2), column.data_type());
1072
1073 let exec = get_exec(&state, "fixed_length_decimal.parquet", None, None).await?;
1075 let batches = collect(exec, task_ctx.clone()).await?;
1076 assert_eq!(1, batches.len());
1077 assert_eq!(1, batches[0].num_columns());
1078 let column = batches[0].column(0);
1079 assert_eq!(&DataType::Decimal128(25, 2), column.data_type());
1080
1081 let exec =
1082 get_exec(&state, "fixed_length_decimal_legacy.parquet", None, None).await?;
1083 let batches = collect(exec, task_ctx.clone()).await?;
1084 assert_eq!(1, batches.len());
1085 assert_eq!(1, batches[0].num_columns());
1086 let column = batches[0].column(0);
1087 assert_eq!(&DataType::Decimal128(13, 2), column.data_type());
1088
1089 let exec = get_exec(&state, "byte_array_decimal.parquet", None, None).await?;
1091 let batches = collect(exec, task_ctx.clone()).await?;
1092 assert_eq!(1, batches.len());
1093 assert_eq!(1, batches[0].num_columns());
1094 let column = batches[0].column(0);
1095 assert_eq!(&DataType::Decimal128(4, 2), column.data_type());
1096
1097 Ok(())
1098 }
1099 #[tokio::test]
1100 async fn test_read_parquet_page_index() -> Result<()> {
1101 let testdata = datafusion_common::test_util::parquet_test_data();
1102 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1103 let file = File::open(path).await?;
1104 let options = ArrowReaderOptions::new().with_page_index(true);
1105 let builder =
1106 ParquetRecordBatchStreamBuilder::new_with_options(file, options.clone())
1107 .await?
1108 .metadata()
1109 .clone();
1110 check_page_index_validation(builder.column_index(), builder.offset_index());
1111
1112 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1113 let file = File::open(path).await?;
1114
1115 let builder = ParquetRecordBatchStreamBuilder::new_with_options(file, options)
1116 .await?
1117 .metadata()
1118 .clone();
1119 check_page_index_validation(builder.column_index(), builder.offset_index());
1120
1121 Ok(())
1122 }
1123
1124 fn check_page_index_validation(
1125 page_index: Option<&ParquetColumnIndex>,
1126 offset_index: Option<&ParquetOffsetIndex>,
1127 ) {
1128 assert!(page_index.is_some());
1129 assert!(offset_index.is_some());
1130
1131 let page_index = page_index.unwrap();
1132 let offset_index = offset_index.unwrap();
1133
1134 assert_eq!(page_index.len(), 1);
1136 assert_eq!(offset_index.len(), 1);
1137 let page_index = page_index.first().unwrap();
1138 let offset_index = offset_index.first().unwrap();
1139
1140 assert_eq!(page_index.len(), 13);
1142 assert_eq!(offset_index.len(), 13);
1143
1144 let int_col_index = page_index.get(4).unwrap();
1146 let int_col_offset = offset_index.get(4).unwrap().page_locations();
1147
1148 assert_eq!(int_col_offset.len(), 325);
1150 let ColumnIndexMetaData::INT32(index) = int_col_index else {
1151 panic!("fail to read page index.")
1152 };
1153 assert_eq!(index.min_values().len(), 325);
1154 assert_eq!(index.max_values().len(), 325);
1155 for idx in 0..325 {
1157 assert_eq!(index.null_count(idx), Some(0));
1158 }
1159 }
1160
1161 fn assert_bytes_scanned(exec: Arc<dyn ExecutionPlan>, expected: usize) {
1162 let actual = exec
1163 .metrics()
1164 .expect("Metrics not recorded")
1165 .sum(|metric| matches!(metric.value(), MetricValue::Count { name, .. } if name == "bytes_scanned"))
1166 .map(|t| t.as_usize())
1167 .expect("bytes_scanned metric not recorded");
1168
1169 assert_eq!(actual, expected);
1170 }
1171
1172 async fn get_exec(
1173 state: &dyn Session,
1174 file_name: &str,
1175 projection: Option<Vec<usize>>,
1176 limit: Option<usize>,
1177 ) -> Result<Arc<dyn ExecutionPlan>> {
1178 let testdata = datafusion_common::test_util::parquet_test_data();
1179 let state = state.as_any().downcast_ref::<SessionState>().unwrap();
1180 let format = state
1181 .get_file_format_factory("parquet")
1182 .map(|factory| factory.create(state, &Default::default()).unwrap())
1183 .unwrap_or_else(|| Arc::new(ParquetFormat::new()));
1184
1185 scan_format(
1186 state, &*format, None, &testdata, file_name, projection, limit,
1187 )
1188 .await
1189 }
1190
1191 #[tokio::test]
1193 async fn test_read_empty_parquet() -> Result<()> {
1194 let tmp_dir = tempfile::TempDir::new()?;
1195 let path = format!("{}/empty.parquet", tmp_dir.path().to_string_lossy());
1196 File::create(&path).await?;
1197
1198 let ctx = SessionContext::new();
1199
1200 let df = ctx
1201 .read_parquet(&path, ParquetReadOptions::default())
1202 .await
1203 .expect("read_parquet should succeed");
1204
1205 let result = df.collect().await?;
1206
1207 assert_snapshot!(batches_to_string(&result), @r###"
1208 ++
1209 ++
1210 "###);
1211
1212 Ok(())
1213 }
1214
1215 #[tokio::test]
1217 async fn test_read_partitioned_empty_parquet() -> Result<()> {
1218 let tmp_dir = tempfile::TempDir::new()?;
1219 let partition_dir = tmp_dir.path().join("col1=a");
1220 std::fs::create_dir(&partition_dir)?;
1221 File::create(partition_dir.join("empty.parquet")).await?;
1222
1223 let ctx = SessionContext::new();
1224
1225 let df = ctx
1226 .read_parquet(
1227 tmp_dir.path().to_str().unwrap(),
1228 ParquetReadOptions::new()
1229 .table_partition_cols(vec![("col1".to_string(), DataType::Utf8)]),
1230 )
1231 .await
1232 .expect("read_parquet should succeed");
1233
1234 let result = df.collect().await?;
1235
1236 assert_snapshot!(batches_to_string(&result), @r###"
1237 ++
1238 ++
1239 "###);
1240
1241 Ok(())
1242 }
1243
1244 fn build_ctx(store_url: &url::Url) -> Arc<TaskContext> {
1245 let tmp_dir = tempfile::TempDir::new().unwrap();
1246 let local = Arc::new(
1247 LocalFileSystem::new_with_prefix(&tmp_dir)
1248 .expect("should create object store"),
1249 );
1250
1251 let mut session = SessionConfig::default();
1252 let mut parquet_opts = ParquetOptions {
1253 allow_single_file_parallelism: true,
1254 ..Default::default()
1255 };
1256 parquet_opts.allow_single_file_parallelism = true;
1257 session.options_mut().execution.parquet = parquet_opts;
1258
1259 let runtime = RuntimeEnv::default();
1260 runtime
1261 .object_store_registry
1262 .register_store(store_url, local);
1263
1264 Arc::new(
1265 TaskContext::default()
1266 .with_session_config(session)
1267 .with_runtime(Arc::new(runtime)),
1268 )
1269 }
1270
1271 #[tokio::test]
1272 async fn parquet_sink_write() -> Result<()> {
1273 let parquet_sink = create_written_parquet_sink("file:///").await?;
1274
1275 let (path, file_metadata) = get_written(parquet_sink)?;
1277 let path_parts = path.parts().collect::<Vec<_>>();
1278 assert_eq!(path_parts.len(), 1, "should not have path prefix");
1279
1280 let expected_kv_meta = vec![
1282 KeyValue {
1284 key: "ARROW:schema".to_string(),
1285 value: Some(ENCODED_ARROW_SCHEMA.to_string()),
1286 },
1287 KeyValue {
1288 key: "my-data".to_string(),
1289 value: Some("stuff".to_string()),
1290 },
1291 KeyValue {
1292 key: "my-data-bool-key".to_string(),
1293 value: None,
1294 },
1295 ];
1296 assert_file_metadata(file_metadata, &expected_kv_meta);
1297
1298 Ok(())
1299 }
1300
1301 #[tokio::test]
1302 async fn parquet_sink_parallel_write() -> Result<()> {
1303 let opts = ParquetOptions {
1304 allow_single_file_parallelism: true,
1305 maximum_parallel_row_group_writers: 2,
1306 maximum_buffered_record_batches_per_stream: 2,
1307 ..Default::default()
1308 };
1309
1310 let parquet_sink =
1311 create_written_parquet_sink_using_config("file:///", opts).await?;
1312
1313 let (path, file_metadata) = get_written(parquet_sink)?;
1315 let path_parts = path.parts().collect::<Vec<_>>();
1316 assert_eq!(path_parts.len(), 1, "should not have path prefix");
1317
1318 let expected_kv_meta = vec![
1320 KeyValue {
1322 key: "ARROW:schema".to_string(),
1323 value: Some(ENCODED_ARROW_SCHEMA.to_string()),
1324 },
1325 KeyValue {
1326 key: "my-data".to_string(),
1327 value: Some("stuff".to_string()),
1328 },
1329 KeyValue {
1330 key: "my-data-bool-key".to_string(),
1331 value: None,
1332 },
1333 ];
1334 assert_file_metadata(file_metadata, &expected_kv_meta);
1335
1336 Ok(())
1337 }
1338
1339 #[tokio::test]
1340 async fn test_write_empty_recordbatch_creates_file() -> Result<()> {
1341 let empty_record_batch = RecordBatch::try_new(
1342 Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])),
1343 vec![Arc::new(Int32Array::from(Vec::<i32>::new()))],
1344 )
1345 .expect("Failed to create empty RecordBatch");
1346
1347 let tmp_dir = tempfile::TempDir::new()?;
1348 let path = format!("{}/empty2.parquet", tmp_dir.path().to_string_lossy());
1349
1350 let ctx = SessionContext::new();
1351 let df = ctx.read_batch(empty_record_batch.clone())?;
1352 df.write_parquet(&path, crate::dataframe::DataFrameWriteOptions::new(), None)
1353 .await?;
1354 assert!(std::path::Path::new(&path).exists());
1355
1356 let stream = ctx
1357 .read_parquet(&path, ParquetReadOptions::new())
1358 .await?
1359 .execute_stream()
1360 .await?;
1361 assert_eq!(stream.schema(), empty_record_batch.schema());
1362 let results = stream.collect::<Vec<_>>().await;
1363 assert_eq!(results.len(), 0);
1364 Ok(())
1365 }
1366
1367 #[tokio::test]
1368 async fn parquet_sink_write_insert_schema_into_metadata() -> Result<()> {
1369 let expected_without = vec![
1371 KeyValue {
1372 key: "my-data".to_string(),
1373 value: Some("stuff".to_string()),
1374 },
1375 KeyValue {
1376 key: "my-data-bool-key".to_string(),
1377 value: None,
1378 },
1379 ];
1380 let expected_with = [
1382 vec![KeyValue {
1383 key: "ARROW:schema".to_string(),
1384 value: Some(ENCODED_ARROW_SCHEMA.to_string()),
1385 }],
1386 expected_without.clone(),
1387 ]
1388 .concat();
1389
1390 let opts = ParquetOptions {
1392 allow_single_file_parallelism: false,
1393 skip_arrow_metadata: true,
1394 ..Default::default()
1395 };
1396 let parquet_sink =
1397 create_written_parquet_sink_using_config("file:///", opts).await?;
1398 let (_, file_metadata) = get_written(parquet_sink)?;
1399 assert_file_metadata(file_metadata, &expected_without);
1400
1401 let opts = ParquetOptions {
1403 allow_single_file_parallelism: false,
1404 skip_arrow_metadata: false,
1405 ..Default::default()
1406 };
1407 let parquet_sink =
1408 create_written_parquet_sink_using_config("file:///", opts).await?;
1409 let (_, file_metadata) = get_written(parquet_sink)?;
1410 assert_file_metadata(file_metadata, &expected_with);
1411
1412 let opts = ParquetOptions {
1414 allow_single_file_parallelism: true,
1415 maximum_parallel_row_group_writers: 2,
1416 maximum_buffered_record_batches_per_stream: 2,
1417 skip_arrow_metadata: true,
1418 ..Default::default()
1419 };
1420 let parquet_sink =
1421 create_written_parquet_sink_using_config("file:///", opts).await?;
1422 let (_, file_metadata) = get_written(parquet_sink)?;
1423 assert_file_metadata(file_metadata, &expected_without);
1424
1425 let opts = ParquetOptions {
1427 allow_single_file_parallelism: true,
1428 maximum_parallel_row_group_writers: 2,
1429 maximum_buffered_record_batches_per_stream: 2,
1430 skip_arrow_metadata: false,
1431 ..Default::default()
1432 };
1433 let parquet_sink =
1434 create_written_parquet_sink_using_config("file:///", opts).await?;
1435 let (_, file_metadata) = get_written(parquet_sink)?;
1436 assert_file_metadata(file_metadata, &expected_with);
1437
1438 Ok(())
1439 }
1440
1441 #[tokio::test]
1442 async fn parquet_sink_write_with_extension() -> Result<()> {
1443 let filename = "test_file.custom_ext";
1444 let file_path = format!("file:///path/to/{filename}");
1445 let parquet_sink = create_written_parquet_sink(file_path.as_str()).await?;
1446
1447 let (path, _) = get_written(parquet_sink)?;
1449 let path_parts = path.parts().collect::<Vec<_>>();
1450 assert_eq!(
1451 path_parts.len(),
1452 3,
1453 "Expected 3 path parts, instead found {}",
1454 path_parts.len()
1455 );
1456 assert_eq!(path_parts.last().unwrap().as_ref(), filename);
1457
1458 Ok(())
1459 }
1460
1461 #[tokio::test]
1462 async fn parquet_sink_write_with_directory_name() -> Result<()> {
1463 let file_path = "file:///path/to";
1464 let parquet_sink = create_written_parquet_sink(file_path).await?;
1465
1466 let (path, _) = get_written(parquet_sink)?;
1468 let path_parts = path.parts().collect::<Vec<_>>();
1469 assert_eq!(
1470 path_parts.len(),
1471 3,
1472 "Expected 3 path parts, instead found {}",
1473 path_parts.len()
1474 );
1475 assert!(path_parts.last().unwrap().as_ref().ends_with(".parquet"));
1476
1477 Ok(())
1478 }
1479
1480 #[tokio::test]
1481 async fn parquet_sink_write_with_folder_ending() -> Result<()> {
1482 let file_path = "file:///path/to/";
1483 let parquet_sink = create_written_parquet_sink(file_path).await?;
1484
1485 let (path, _) = get_written(parquet_sink)?;
1487 let path_parts = path.parts().collect::<Vec<_>>();
1488 assert_eq!(
1489 path_parts.len(),
1490 3,
1491 "Expected 3 path parts, instead found {}",
1492 path_parts.len()
1493 );
1494 assert!(path_parts.last().unwrap().as_ref().ends_with(".parquet"));
1495
1496 Ok(())
1497 }
1498
1499 async fn create_written_parquet_sink(table_path: &str) -> Result<Arc<ParquetSink>> {
1500 create_written_parquet_sink_using_config(table_path, ParquetOptions::default())
1501 .await
1502 }
1503
1504 static ENCODED_ARROW_SCHEMA: &str = "/////5QAAAAQAAAAAAAKAAwACgAJAAQACgAAABAAAAAAAQQACAAIAAAABAAIAAAABAAAAAIAAAA8AAAABAAAANz///8UAAAADAAAAAAAAAUMAAAAAAAAAMz///8BAAAAYgAAABAAFAAQAAAADwAEAAAACAAQAAAAGAAAAAwAAAAAAAAFEAAAAAAAAAAEAAQABAAAAAEAAABhAAAA";
1505
1506 async fn create_written_parquet_sink_using_config(
1507 table_path: &str,
1508 global: ParquetOptions,
1509 ) -> Result<Arc<ParquetSink>> {
1510 let field_a = Field::new("a", DataType::Utf8, false);
1512 let field_b = Field::new("b", DataType::Utf8, false);
1513 let schema = Arc::new(Schema::new(vec![field_a, field_b]));
1514 let object_store_url = ObjectStoreUrl::local_filesystem();
1515
1516 let file_sink_config = FileSinkConfig {
1517 original_url: String::default(),
1518 object_store_url: object_store_url.clone(),
1519 file_group: FileGroup::new(vec![PartitionedFile::new("/tmp".to_string(), 1)]),
1520 table_paths: vec![ListingTableUrl::parse(table_path)?],
1521 output_schema: schema.clone(),
1522 table_partition_cols: vec![],
1523 insert_op: InsertOp::Overwrite,
1524 keep_partition_by_columns: false,
1525 file_extension: "parquet".into(),
1526 };
1527 let parquet_sink = Arc::new(ParquetSink::new(
1528 file_sink_config,
1529 TableParquetOptions {
1530 key_value_metadata: std::collections::HashMap::from([
1531 ("my-data".to_string(), Some("stuff".to_string())),
1532 ("my-data-bool-key".to_string(), None),
1533 ]),
1534 global,
1535 ..Default::default()
1536 },
1537 ));
1538
1539 let col_a: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar"]));
1541 let col_b: ArrayRef = Arc::new(StringArray::from(vec!["baz", "baz"]));
1542 let batch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)])?;
1543
1544 FileSink::write_all(
1546 parquet_sink.as_ref(),
1547 Box::pin(RecordBatchStreamAdapter::new(
1548 schema,
1549 futures::stream::iter(vec![Ok(batch)]),
1550 )),
1551 &build_ctx(object_store_url.as_ref()),
1552 )
1553 .await?;
1554
1555 Ok(parquet_sink)
1556 }
1557
1558 fn get_written(parquet_sink: Arc<ParquetSink>) -> Result<(Path, ParquetMetaData)> {
1559 let mut written = parquet_sink.written();
1560 let written = written.drain();
1561 assert_eq!(
1562 written.len(),
1563 1,
1564 "expected a single parquet files to be written, instead found {}",
1565 written.len()
1566 );
1567
1568 let (path, parquet_meta_data) = written.take(1).next().unwrap();
1569 Ok((path, parquet_meta_data))
1570 }
1571
1572 fn assert_file_metadata(
1573 parquet_meta_data: ParquetMetaData,
1574 expected_kv: &Vec<KeyValue>,
1575 ) {
1576 let file_metadata = parquet_meta_data.file_metadata();
1577 let schema_descr = file_metadata.schema_descr();
1578 assert_eq!(file_metadata.num_rows(), 2, "file metadata to have 2 rows");
1579 assert!(
1580 schema_descr
1581 .columns()
1582 .iter()
1583 .any(|col_schema| col_schema.name() == "a"),
1584 "output file metadata should contain col a"
1585 );
1586 assert!(
1587 schema_descr
1588 .columns()
1589 .iter()
1590 .any(|col_schema| col_schema.name() == "b"),
1591 "output file metadata should contain col b"
1592 );
1593
1594 let mut key_value_metadata = file_metadata.key_value_metadata().unwrap().clone();
1595 key_value_metadata.sort_by(|a, b| a.key.cmp(&b.key));
1596 assert_eq!(&key_value_metadata, expected_kv);
1597 }
1598
1599 #[tokio::test]
1600 async fn parquet_sink_write_partitions() -> Result<()> {
1601 let field_a = Field::new("a", DataType::Utf8, false);
1602 let field_b = Field::new("b", DataType::Utf8, false);
1603 let schema = Arc::new(Schema::new(vec![field_a, field_b]));
1604 let object_store_url = ObjectStoreUrl::local_filesystem();
1605
1606 let file_sink_config = FileSinkConfig {
1608 original_url: String::default(),
1609 object_store_url: object_store_url.clone(),
1610 file_group: FileGroup::new(vec![PartitionedFile::new("/tmp".to_string(), 1)]),
1611 table_paths: vec![ListingTableUrl::parse("file:///")?],
1612 output_schema: schema.clone(),
1613 table_partition_cols: vec![("a".to_string(), DataType::Utf8)], insert_op: InsertOp::Overwrite,
1615 keep_partition_by_columns: false,
1616 file_extension: "parquet".into(),
1617 };
1618 let parquet_sink = Arc::new(ParquetSink::new(
1619 file_sink_config,
1620 TableParquetOptions::default(),
1621 ));
1622
1623 let col_a: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar"]));
1625 let col_b: ArrayRef = Arc::new(StringArray::from(vec!["baz", "baz"]));
1626 let batch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)])?;
1627
1628 FileSink::write_all(
1630 parquet_sink.as_ref(),
1631 Box::pin(RecordBatchStreamAdapter::new(
1632 schema,
1633 futures::stream::iter(vec![Ok(batch)]),
1634 )),
1635 &build_ctx(object_store_url.as_ref()),
1636 )
1637 .await?;
1638
1639 let mut written = parquet_sink.written();
1641 let written = written.drain();
1642 assert_eq!(
1643 written.len(),
1644 2,
1645 "expected two parquet files to be written, instead found {}",
1646 written.len()
1647 );
1648
1649 let mut expected_partitions = std::collections::HashSet::from(["a=foo", "a=bar"]);
1651 for (path, parquet_metadata) in written.take(2) {
1652 let file_metadata = parquet_metadata.file_metadata();
1653 let schema = file_metadata.schema_descr();
1654 let num_rows = file_metadata.num_rows();
1655
1656 let path_parts = path.parts().collect::<Vec<_>>();
1657 assert_eq!(path_parts.len(), 2, "should have path prefix");
1658
1659 let prefix = path_parts[0].as_ref();
1660 assert!(
1661 expected_partitions.contains(prefix),
1662 "expected path prefix to match partition, instead found {prefix:?}"
1663 );
1664 expected_partitions.remove(prefix);
1665
1666 assert_eq!(num_rows, 1, "file metadata to have 1 row");
1667 assert!(
1668 !schema
1669 .columns()
1670 .iter()
1671 .any(|col_schema| col_schema.name() == "a"),
1672 "output file metadata will not contain partitioned col a"
1673 );
1674 assert!(
1675 schema
1676 .columns()
1677 .iter()
1678 .any(|col_schema| col_schema.name() == "b"),
1679 "output file metadata should contain col b"
1680 );
1681 }
1682
1683 Ok(())
1684 }
1685
1686 #[tokio::test]
1687 async fn parquet_sink_write_memory_reservation() -> Result<()> {
1688 async fn test_memory_reservation(global: ParquetOptions) -> Result<()> {
1689 let field_a = Field::new("a", DataType::Utf8, false);
1690 let field_b = Field::new("b", DataType::Utf8, false);
1691 let schema = Arc::new(Schema::new(vec![field_a, field_b]));
1692 let object_store_url = ObjectStoreUrl::local_filesystem();
1693
1694 let file_sink_config = FileSinkConfig {
1695 original_url: String::default(),
1696 object_store_url: object_store_url.clone(),
1697 file_group: FileGroup::new(vec![PartitionedFile::new(
1698 "/tmp".to_string(),
1699 1,
1700 )]),
1701 table_paths: vec![ListingTableUrl::parse("file:///")?],
1702 output_schema: schema.clone(),
1703 table_partition_cols: vec![],
1704 insert_op: InsertOp::Overwrite,
1705 keep_partition_by_columns: false,
1706 file_extension: "parquet".into(),
1707 };
1708 let parquet_sink = Arc::new(ParquetSink::new(
1709 file_sink_config,
1710 TableParquetOptions {
1711 key_value_metadata: std::collections::HashMap::from([
1712 ("my-data".to_string(), Some("stuff".to_string())),
1713 ("my-data-bool-key".to_string(), None),
1714 ]),
1715 global,
1716 ..Default::default()
1717 },
1718 ));
1719
1720 let col_a: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar"]));
1722 let col_b: ArrayRef = Arc::new(StringArray::from(vec!["baz", "baz"]));
1723 let batch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)])?;
1724
1725 let task_context = build_ctx(object_store_url.as_ref());
1727 assert_eq!(
1728 task_context.memory_pool().reserved(),
1729 0,
1730 "no bytes are reserved yet"
1731 );
1732
1733 let mut write_task = FileSink::write_all(
1734 parquet_sink.as_ref(),
1735 Box::pin(RecordBatchStreamAdapter::new(
1736 schema,
1737 bounded_stream(batch, 1000),
1738 )),
1739 &task_context,
1740 );
1741
1742 let mut reserved_bytes = 0;
1744 while futures::poll!(&mut write_task).is_pending() {
1745 reserved_bytes += task_context.memory_pool().reserved();
1746 tokio::time::sleep(Duration::from_micros(1)).await;
1747 }
1748 assert!(
1749 reserved_bytes > 0,
1750 "should have bytes reserved during write"
1751 );
1752 assert_eq!(
1753 task_context.memory_pool().reserved(),
1754 0,
1755 "no leaking byte reservation"
1756 );
1757
1758 Ok(())
1759 }
1760
1761 let write_opts = ParquetOptions {
1762 allow_single_file_parallelism: false,
1763 ..Default::default()
1764 };
1765 test_memory_reservation(write_opts)
1766 .await
1767 .expect("should track for non-parallel writes");
1768
1769 let row_parallel_write_opts = ParquetOptions {
1770 allow_single_file_parallelism: true,
1771 maximum_parallel_row_group_writers: 10,
1772 maximum_buffered_record_batches_per_stream: 1,
1773 ..Default::default()
1774 };
1775 test_memory_reservation(row_parallel_write_opts)
1776 .await
1777 .expect("should track for row-parallel writes");
1778
1779 let col_parallel_write_opts = ParquetOptions {
1780 allow_single_file_parallelism: true,
1781 maximum_parallel_row_group_writers: 1,
1782 maximum_buffered_record_batches_per_stream: 2,
1783 ..Default::default()
1784 };
1785 test_memory_reservation(col_parallel_write_opts)
1786 .await
1787 .expect("should track for column-parallel writes");
1788
1789 Ok(())
1790 }
1791}