datafusion/datasource/file_format/
parquet.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Re-exports the [`datafusion_datasource_parquet::file_format`] module, and contains tests for it.
19
20pub use datafusion_datasource_parquet::file_format::*;
21
22#[cfg(test)]
23pub(crate) mod test_util {
24    use arrow::array::RecordBatch;
25    use datafusion_common::Result;
26    use object_store::ObjectMeta;
27
28    use crate::test::object_store::local_unpartitioned_file;
29
30    /// Writes each `batch` to at least one temporary parquet file
31    ///
32    /// For example, if `batches` contains 2 batches, the function will create
33    /// 2 temporary files, each containing the contents of one batch
34    ///
35    /// If multi_page is set to `true`, the parquet file(s) are written
36    /// with 2 rows per data page (used to test page filtering and
37    /// boundaries).
38    pub async fn store_parquet(
39        batches: Vec<RecordBatch>,
40        multi_page: bool,
41    ) -> Result<(Vec<ObjectMeta>, Vec<tempfile::NamedTempFile>)> {
42        /// How many rows per page should be written
43        const ROWS_PER_PAGE: usize = 2;
44        /// write batches chunk_size rows at a time
45        fn write_in_chunks<W: std::io::Write + Send>(
46            writer: &mut parquet::arrow::ArrowWriter<W>,
47            batch: &RecordBatch,
48            chunk_size: usize,
49        ) {
50            let mut i = 0;
51            while i < batch.num_rows() {
52                let num = chunk_size.min(batch.num_rows() - i);
53                writer.write(&batch.slice(i, num)).unwrap();
54                i += num;
55            }
56        }
57
58        // we need the tmp files to be sorted as some tests rely on the returned file ordering
59        // https://github.com/apache/datafusion/pull/6629
60        let tmp_files = {
61            let mut tmp_files: Vec<_> = (0..batches.len())
62                .map(|_| tempfile::NamedTempFile::new().expect("creating temp file"))
63                .collect();
64            tmp_files.sort_by(|a, b| a.path().cmp(b.path()));
65            tmp_files
66        };
67
68        // Each batch writes to their own file
69        let files: Vec<_> = batches
70            .into_iter()
71            .zip(tmp_files.into_iter())
72            .map(|(batch, mut output)| {
73                let mut builder = parquet::file::properties::WriterProperties::builder();
74                if multi_page {
75                    builder = builder.set_data_page_row_count_limit(ROWS_PER_PAGE)
76                }
77                builder = builder.set_bloom_filter_enabled(true);
78
79                let props = builder.build();
80
81                let mut writer = parquet::arrow::ArrowWriter::try_new(
82                    &mut output,
83                    batch.schema(),
84                    Some(props),
85                )
86                .expect("creating writer");
87
88                if multi_page {
89                    // write in smaller batches as the parquet writer
90                    // only checks datapage size limits on the boundaries of each batch
91                    write_in_chunks(&mut writer, &batch, ROWS_PER_PAGE);
92                } else {
93                    writer.write(&batch).expect("Writing batch");
94                };
95                writer.close().unwrap();
96                output
97            })
98            .collect();
99
100        let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect();
101
102        Ok((meta, files))
103    }
104}
105
106#[cfg(test)]
107mod tests {
108
109    use std::fmt::{self, Display, Formatter};
110    use std::sync::atomic::{AtomicUsize, Ordering};
111    use std::sync::Arc;
112    use std::time::Duration;
113
114    use crate::datasource::file_format::parquet::test_util::store_parquet;
115    use crate::datasource::file_format::test_util::scan_format;
116    use crate::execution::SessionState;
117    use crate::physical_plan::metrics::MetricValue;
118    use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
119
120    use arrow::array::RecordBatch;
121    use arrow_schema::Schema;
122    use datafusion_catalog::Session;
123    use datafusion_common::cast::{
124        as_binary_array, as_binary_view_array, as_boolean_array, as_float32_array,
125        as_float64_array, as_int32_array, as_timestamp_nanosecond_array,
126    };
127    use datafusion_common::config::{ParquetOptions, TableParquetOptions};
128    use datafusion_common::stats::Precision;
129    use datafusion_common::test_util::batches_to_string;
130    use datafusion_common::ScalarValue::Utf8;
131    use datafusion_common::{Result, ScalarValue};
132    use datafusion_datasource::file_format::FileFormat;
133    use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
134    use datafusion_datasource::{ListingTableUrl, PartitionedFile};
135    use datafusion_datasource_parquet::{
136        ParquetFormat, ParquetFormatFactory, ParquetSink,
137    };
138    use datafusion_execution::object_store::ObjectStoreUrl;
139    use datafusion_execution::runtime_env::RuntimeEnv;
140    use datafusion_execution::TaskContext;
141    use datafusion_expr::dml::InsertOp;
142    use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
143    use datafusion_physical_plan::{collect, ExecutionPlan};
144
145    use crate::test_util::bounded_stream;
146    use arrow::array::{
147        types::Int32Type, Array, ArrayRef, DictionaryArray, Int32Array, Int64Array,
148        StringArray,
149    };
150    use arrow::datatypes::{DataType, Field};
151    use async_trait::async_trait;
152    use datafusion_datasource::file_groups::FileGroup;
153    use datafusion_datasource_parquet::metadata::DFParquetMetadata;
154    use futures::stream::BoxStream;
155    use futures::StreamExt;
156    use insta::assert_snapshot;
157    use object_store::local::LocalFileSystem;
158    use object_store::ObjectMeta;
159    use object_store::{
160        path::Path, GetOptions, GetResult, ListResult, MultipartUpload, ObjectStore,
161        PutMultipartOptions, PutOptions, PutPayload, PutResult,
162    };
163    use parquet::arrow::arrow_reader::ArrowReaderOptions;
164    use parquet::arrow::ParquetRecordBatchStreamBuilder;
165    use parquet::file::metadata::{
166        KeyValue, ParquetColumnIndex, ParquetMetaData, ParquetOffsetIndex,
167    };
168    use parquet::file::page_index::column_index::ColumnIndexMetaData;
169    use tokio::fs::File;
170
171    enum ForceViews {
172        Yes,
173        No,
174    }
175
176    async fn _run_read_merged_batches(force_views: ForceViews) -> Result<()> {
177        let c1: ArrayRef =
178            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
179
180        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
181
182        let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())])?;
183        let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)])?;
184
185        let store = Arc::new(LocalFileSystem::new()) as _;
186        let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?;
187
188        let session = SessionContext::new();
189        let ctx = session.state();
190        let force_views = match force_views {
191            ForceViews::Yes => true,
192            ForceViews::No => false,
193        };
194        let format = ParquetFormat::default().with_force_view_types(force_views);
195        let schema = format.infer_schema(&ctx, &store, &meta).await?;
196
197        let file_metadata_cache =
198            ctx.runtime_env().cache_manager.get_file_metadata_cache();
199        let stats = DFParquetMetadata::new(&store, &meta[0])
200            .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
201            .fetch_statistics(&schema)
202            .await?;
203
204        assert_eq!(stats.num_rows, Precision::Exact(3));
205        let c1_stats = &stats.column_statistics[0];
206        let c2_stats = &stats.column_statistics[1];
207        assert_eq!(c1_stats.null_count, Precision::Exact(1));
208        assert_eq!(c2_stats.null_count, Precision::Exact(3));
209
210        let stats = DFParquetMetadata::new(&store, &meta[1])
211            .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
212            .fetch_statistics(&schema)
213            .await?;
214
215        assert_eq!(stats.num_rows, Precision::Exact(3));
216        let c1_stats = &stats.column_statistics[0];
217        let c2_stats = &stats.column_statistics[1];
218        assert_eq!(c1_stats.null_count, Precision::Exact(3));
219        assert_eq!(c2_stats.null_count, Precision::Exact(1));
220        assert_eq!(
221            c2_stats.max_value,
222            Precision::Exact(ScalarValue::Int64(Some(2)))
223        );
224        assert_eq!(
225            c2_stats.min_value,
226            Precision::Exact(ScalarValue::Int64(Some(1)))
227        );
228
229        Ok(())
230    }
231
232    #[tokio::test]
233    async fn read_merged_batches() -> Result<()> {
234        _run_read_merged_batches(ForceViews::No).await?;
235        _run_read_merged_batches(ForceViews::Yes).await?;
236
237        Ok(())
238    }
239
240    #[tokio::test]
241    async fn is_schema_stable() -> Result<()> {
242        let c1: ArrayRef =
243            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
244
245        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
246
247        let batch1 =
248            RecordBatch::try_from_iter(vec![("a", c1.clone()), ("b", c1.clone())])?;
249        let batch2 =
250            RecordBatch::try_from_iter(vec![("c", c2.clone()), ("d", c2.clone())])?;
251
252        let store = Arc::new(LocalFileSystem::new()) as _;
253        let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?;
254
255        let session = SessionContext::new();
256        let ctx = session.state();
257        let format = ParquetFormat::default();
258        let schema = format.infer_schema(&ctx, &store, &meta).await?;
259
260        let order: Vec<_> = ["a", "b", "c", "d"]
261            .into_iter()
262            .map(|i| i.to_string())
263            .collect();
264        let coll: Vec<_> = schema
265            .flattened_fields()
266            .into_iter()
267            .map(|i| i.name().to_string())
268            .collect();
269        assert_eq!(coll, order);
270
271        Ok(())
272    }
273
274    #[derive(Debug)]
275    struct RequestCountingObjectStore {
276        inner: Arc<dyn ObjectStore>,
277        request_count: AtomicUsize,
278    }
279
280    impl Display for RequestCountingObjectStore {
281        fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
282            write!(f, "RequestCounting({})", self.inner)
283        }
284    }
285
286    impl RequestCountingObjectStore {
287        pub fn new(inner: Arc<dyn ObjectStore>) -> Self {
288            Self {
289                inner,
290                request_count: Default::default(),
291            }
292        }
293
294        pub fn request_count(&self) -> usize {
295            self.request_count.load(Ordering::SeqCst)
296        }
297
298        pub fn upcast(self: &Arc<Self>) -> Arc<dyn ObjectStore> {
299            self.clone()
300        }
301    }
302
303    #[async_trait]
304    impl ObjectStore for RequestCountingObjectStore {
305        async fn put_opts(
306            &self,
307            _location: &Path,
308            _payload: PutPayload,
309            _opts: PutOptions,
310        ) -> object_store::Result<PutResult> {
311            Err(object_store::Error::NotImplemented)
312        }
313
314        async fn put_multipart_opts(
315            &self,
316            _location: &Path,
317            _opts: PutMultipartOptions,
318        ) -> object_store::Result<Box<dyn MultipartUpload>> {
319            Err(object_store::Error::NotImplemented)
320        }
321
322        async fn get_opts(
323            &self,
324            location: &Path,
325            options: GetOptions,
326        ) -> object_store::Result<GetResult> {
327            self.request_count.fetch_add(1, Ordering::SeqCst);
328            self.inner.get_opts(location, options).await
329        }
330
331        async fn head(&self, _location: &Path) -> object_store::Result<ObjectMeta> {
332            Err(object_store::Error::NotImplemented)
333        }
334
335        async fn delete(&self, _location: &Path) -> object_store::Result<()> {
336            Err(object_store::Error::NotImplemented)
337        }
338
339        fn list(
340            &self,
341            _prefix: Option<&Path>,
342        ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
343            Box::pin(futures::stream::once(async {
344                Err(object_store::Error::NotImplemented)
345            }))
346        }
347
348        async fn list_with_delimiter(
349            &self,
350            _prefix: Option<&Path>,
351        ) -> object_store::Result<ListResult> {
352            Err(object_store::Error::NotImplemented)
353        }
354
355        async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
356            Err(object_store::Error::NotImplemented)
357        }
358
359        async fn copy_if_not_exists(
360            &self,
361            _from: &Path,
362            _to: &Path,
363        ) -> object_store::Result<()> {
364            Err(object_store::Error::NotImplemented)
365        }
366    }
367
368    async fn _run_fetch_metadata_with_size_hint(force_views: ForceViews) -> Result<()> {
369        let c1: ArrayRef =
370            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
371
372        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
373
374        let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())])?;
375        let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)])?;
376
377        let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
378            LocalFileSystem::new(),
379        )));
380        let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?;
381
382        let session = SessionContext::new();
383        let ctx = session.state();
384
385        // Use a size hint larger than the parquet footer but smaller than the actual metadata, requiring a second fetch
386        // for the remaining metadata
387        let file_metadata_cache =
388            ctx.runtime_env().cache_manager.get_file_metadata_cache();
389        let df_meta = DFParquetMetadata::new(store.as_ref(), &meta[0])
390            .with_metadata_size_hint(Some(9));
391        df_meta.fetch_metadata().await?;
392        assert_eq!(store.request_count(), 2);
393
394        let df_meta =
395            df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)));
396
397        // Increases by 3 because cache has no entries yet
398        df_meta.fetch_metadata().await?;
399        assert_eq!(store.request_count(), 5);
400
401        // No increase because cache has an entry
402        df_meta.fetch_metadata().await?;
403        assert_eq!(store.request_count(), 5);
404
405        // Increase by 2  because `get_file_metadata_cache()` is None
406        let df_meta = df_meta.with_file_metadata_cache(None);
407        df_meta.fetch_metadata().await?;
408        assert_eq!(store.request_count(), 7);
409
410        let force_views = match force_views {
411            ForceViews::Yes => true,
412            ForceViews::No => false,
413        };
414        let format = ParquetFormat::default()
415            .with_metadata_size_hint(Some(9))
416            .with_force_view_types(force_views);
417        // Increase by 3, partial cache being used.
418        let _schema = format.infer_schema(&ctx, &store.upcast(), &meta).await?;
419        assert_eq!(store.request_count(), 10);
420        // No increase, full cache being used.
421        let schema = format.infer_schema(&ctx, &store.upcast(), &meta).await?;
422        assert_eq!(store.request_count(), 10);
423
424        // No increase, cache being used
425        let df_meta =
426            df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)));
427        let stats = df_meta.fetch_statistics(&schema).await?;
428        assert_eq!(store.request_count(), 10);
429
430        assert_eq!(stats.num_rows, Precision::Exact(3));
431        let c1_stats = &stats.column_statistics[0];
432        let c2_stats = &stats.column_statistics[1];
433        assert_eq!(c1_stats.null_count, Precision::Exact(1));
434        assert_eq!(c2_stats.null_count, Precision::Exact(3));
435
436        let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
437            LocalFileSystem::new(),
438        )));
439
440        // Use the file size as the hint so we can get the full metadata from the first fetch
441        let size_hint = meta[0].size as usize;
442        let df_meta = DFParquetMetadata::new(store.as_ref(), &meta[0])
443            .with_metadata_size_hint(Some(size_hint));
444
445        df_meta.fetch_metadata().await?;
446        // ensure the requests were coalesced into a single request
447        assert_eq!(store.request_count(), 1);
448
449        let session = SessionContext::new();
450        let ctx = session.state();
451        let file_metadata_cache =
452            ctx.runtime_env().cache_manager.get_file_metadata_cache();
453        let df_meta =
454            df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)));
455        // Increases by 1 because cache has no entries yet and new session context
456        df_meta.fetch_metadata().await?;
457        assert_eq!(store.request_count(), 2);
458
459        // No increase because cache has an entry
460        df_meta.fetch_metadata().await?;
461        assert_eq!(store.request_count(), 2);
462
463        // Increase by 1  because `get_file_metadata_cache` is None
464        let df_meta = df_meta.with_file_metadata_cache(None);
465        df_meta.fetch_metadata().await?;
466        assert_eq!(store.request_count(), 3);
467
468        let format = ParquetFormat::default()
469            .with_metadata_size_hint(Some(size_hint))
470            .with_force_view_types(force_views);
471        // Increase by 1, partial cache being used.
472        let _schema = format.infer_schema(&ctx, &store.upcast(), &meta).await?;
473        assert_eq!(store.request_count(), 4);
474        // No increase, full cache being used.
475        let schema = format.infer_schema(&ctx, &store.upcast(), &meta).await?;
476        assert_eq!(store.request_count(), 4);
477        // No increase, cache being used
478        let df_meta =
479            df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)));
480        let stats = df_meta.fetch_statistics(&schema).await?;
481        assert_eq!(store.request_count(), 4);
482
483        assert_eq!(stats.num_rows, Precision::Exact(3));
484        let c1_stats = &stats.column_statistics[0];
485        let c2_stats = &stats.column_statistics[1];
486        assert_eq!(c1_stats.null_count, Precision::Exact(1));
487        assert_eq!(c2_stats.null_count, Precision::Exact(3));
488
489        let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
490            LocalFileSystem::new(),
491        )));
492
493        // Use a size hint larger than the file size to make sure we don't panic
494        let size_hint = (meta[0].size + 100) as usize;
495        let df_meta = DFParquetMetadata::new(store.as_ref(), &meta[0])
496            .with_metadata_size_hint(Some(size_hint));
497
498        df_meta.fetch_metadata().await?;
499        assert_eq!(store.request_count(), 1);
500
501        // No increase because cache has an entry
502        let df_meta =
503            df_meta.with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)));
504        df_meta.fetch_metadata().await?;
505        assert_eq!(store.request_count(), 1);
506
507        Ok(())
508    }
509
510    #[tokio::test]
511    async fn fetch_metadata_with_size_hint() -> Result<()> {
512        _run_fetch_metadata_with_size_hint(ForceViews::No).await?;
513        _run_fetch_metadata_with_size_hint(ForceViews::Yes).await?;
514
515        Ok(())
516    }
517
518    #[tokio::test]
519    async fn test_statistics_from_parquet_metadata_dictionary() -> Result<()> {
520        // Data for column c_dic: ["a", "b", "c", "d"]
521        let values = StringArray::from_iter_values(["a", "b", "c", "d"]);
522        let keys = Int32Array::from_iter_values([0, 1, 2, 3]);
523        let dic_array = DictionaryArray::<Int32Type>::try_new(keys, Arc::new(values))?;
524        let c_dic: ArrayRef = Arc::new(dic_array);
525
526        // Data for column string_truncation: ["a".repeat(128), null, "b".repeat(128), null]
527        let string_truncation: ArrayRef = Arc::new(StringArray::from(vec![
528            Some("a".repeat(128)),
529            None,
530            Some("b".repeat(128)),
531            None,
532        ]));
533
534        let batch1 = RecordBatch::try_from_iter(vec![
535            ("c_dic", c_dic),
536            ("string_truncation", string_truncation),
537        ])?;
538
539        // Use store_parquet to write each batch to its own file
540        // . batch1 written into first file and includes:
541        //    - column c_dic that has 4 rows with no null. Stats min and max of dictionary column is available.
542        //    - column string_truncation that has 4 rows with 2 nulls. Stats min and max of string column is available but not exact.
543        let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
544            LocalFileSystem::new(),
545        )));
546        let (files, _file_names) = store_parquet(vec![batch1], false).await?;
547
548        let state = SessionContext::new().state();
549        // Make metadata size hint None to keep original behavior
550        let format = ParquetFormat::default().with_metadata_size_hint(None);
551        let _schema = format.infer_schema(&state, &store.upcast(), &files).await?;
552        assert_eq!(store.request_count(), 3);
553        // No increase, cache being used.
554        let schema = format.infer_schema(&state, &store.upcast(), &files).await?;
555        assert_eq!(store.request_count(), 3);
556
557        // No increase in request count because cache is not empty
558        let file_metadata_cache =
559            state.runtime_env().cache_manager.get_file_metadata_cache();
560        let stats = DFParquetMetadata::new(store.as_ref(), &files[0])
561            .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
562            .fetch_statistics(&schema)
563            .await?;
564        assert_eq!(stats.num_rows, Precision::Exact(4));
565
566        // column c_dic
567        let c_dic_stats = &stats.column_statistics[0];
568
569        assert_eq!(c_dic_stats.null_count, Precision::Exact(0));
570        assert_eq!(
571            c_dic_stats.max_value,
572            Precision::Exact(Utf8(Some("d".into())))
573        );
574        assert_eq!(
575            c_dic_stats.min_value,
576            Precision::Exact(Utf8(Some("a".into())))
577        );
578
579        // column string_truncation
580        let string_truncation_stats = &stats.column_statistics[1];
581
582        assert_eq!(string_truncation_stats.null_count, Precision::Exact(2));
583        assert_eq!(
584            string_truncation_stats.max_value,
585            Precision::Inexact(ScalarValue::Utf8View(Some("b".repeat(63) + "c")))
586        );
587        assert_eq!(
588            string_truncation_stats.min_value,
589            Precision::Inexact(ScalarValue::Utf8View(Some("a".repeat(64))))
590        );
591
592        Ok(())
593    }
594
595    async fn _run_test_statistics_from_parquet_metadata(
596        force_views: ForceViews,
597    ) -> Result<()> {
598        // Data for column c1: ["Foo", null, "bar"]
599        let c1: ArrayRef =
600            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
601        let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())])?;
602
603        // Data for column c2: [1, 2, null]
604        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
605        let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)])?;
606
607        // Use store_parquet to write each batch to its own file
608        // . batch1 written into first file and includes:
609        //    - column c1 that has 3 rows with one null. Stats min and max of string column is missing for this test even the column has values
610        // . batch2 written into second file and includes:
611        //    - column c2 that has 3 rows with one null. Stats min and max of int are available and 1 and 2 respectively
612        let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
613            LocalFileSystem::new(),
614        )));
615        let (files, _file_names) = store_parquet(vec![batch1, batch2], false).await?;
616
617        let force_views = match force_views {
618            ForceViews::Yes => true,
619            ForceViews::No => false,
620        };
621
622        let mut state = SessionContext::new().state();
623        state = set_view_state(state, force_views);
624        let format = ParquetFormat::default()
625            .with_force_view_types(force_views)
626            .with_metadata_size_hint(None);
627        let schema = format.infer_schema(&state, &store.upcast(), &files).await?;
628        assert_eq!(store.request_count(), 6);
629
630        let null_i64 = ScalarValue::Int64(None);
631        let null_utf8 = if force_views {
632            ScalarValue::Utf8View(None)
633        } else {
634            Utf8(None)
635        };
636
637        // No increase in request count because cache is not empty
638        let file_metadata_cache =
639            state.runtime_env().cache_manager.get_file_metadata_cache();
640        let stats = DFParquetMetadata::new(store.as_ref(), &files[0])
641            .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
642            .fetch_statistics(&schema)
643            .await?;
644        assert_eq!(store.request_count(), 6);
645        assert_eq!(stats.num_rows, Precision::Exact(3));
646        // column c1
647        let c1_stats = &stats.column_statistics[0];
648        assert_eq!(c1_stats.null_count, Precision::Exact(1));
649        let expected_type = if force_views {
650            ScalarValue::Utf8View
651        } else {
652            Utf8
653        };
654        assert_eq!(
655            c1_stats.max_value,
656            Precision::Exact(expected_type(Some("bar".to_string())))
657        );
658        assert_eq!(
659            c1_stats.min_value,
660            Precision::Exact(expected_type(Some("Foo".to_string())))
661        );
662        // column c2: missing from the file so the table treats all 3 rows as null
663        let c2_stats = &stats.column_statistics[1];
664        assert_eq!(c2_stats.null_count, Precision::Exact(3));
665        assert_eq!(c2_stats.max_value, Precision::Exact(null_i64.clone()));
666        assert_eq!(c2_stats.min_value, Precision::Exact(null_i64.clone()));
667
668        // No increase in request count because cache is not empty
669        let stats = DFParquetMetadata::new(store.as_ref(), &files[1])
670            .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
671            .fetch_statistics(&schema)
672            .await?;
673        assert_eq!(store.request_count(), 6);
674        assert_eq!(stats.num_rows, Precision::Exact(3));
675        // column c1: missing from the file so the table treats all 3 rows as null
676        let c1_stats = &stats.column_statistics[0];
677        assert_eq!(c1_stats.null_count, Precision::Exact(3));
678        assert_eq!(c1_stats.max_value, Precision::Exact(null_utf8.clone()));
679        assert_eq!(c1_stats.min_value, Precision::Exact(null_utf8.clone()));
680        // column c2
681        let c2_stats = &stats.column_statistics[1];
682        assert_eq!(c2_stats.null_count, Precision::Exact(1));
683        assert_eq!(c2_stats.max_value, Precision::Exact(2i64.into()));
684        assert_eq!(c2_stats.min_value, Precision::Exact(1i64.into()));
685
686        Ok(())
687    }
688
689    #[tokio::test]
690    async fn test_statistics_from_parquet_metadata() -> Result<()> {
691        _run_test_statistics_from_parquet_metadata(ForceViews::No).await?;
692
693        _run_test_statistics_from_parquet_metadata(ForceViews::Yes).await?;
694
695        Ok(())
696    }
697
698    #[tokio::test]
699    async fn read_small_batches() -> Result<()> {
700        let config = SessionConfig::new().with_batch_size(2);
701        let session_ctx = SessionContext::new_with_config(config);
702        let state = session_ctx.state();
703        let task_ctx = state.task_ctx();
704        let projection = None;
705        let exec = get_exec(&state, "alltypes_plain.parquet", projection, None).await?;
706        let stream = exec.execute(0, task_ctx)?;
707
708        let tt_batches = stream
709            .map(|batch| {
710                let batch = batch.unwrap();
711                assert_eq!(11, batch.num_columns());
712                assert_eq!(2, batch.num_rows());
713            })
714            .fold(0, |acc, _| async move { acc + 1i32 })
715            .await;
716
717        assert_eq!(tt_batches, 4 /* 8/2 */);
718
719        // test metadata
720        assert_eq!(
721            exec.partition_statistics(None)?.num_rows,
722            Precision::Exact(8)
723        );
724        // TODO correct byte size: https://github.com/apache/datafusion/issues/14936
725        assert_eq!(
726            exec.partition_statistics(None)?.total_byte_size,
727            Precision::Exact(671)
728        );
729
730        Ok(())
731    }
732
733    #[tokio::test]
734    async fn capture_bytes_scanned_metric() -> Result<()> {
735        let config = SessionConfig::new().with_batch_size(2);
736        let session = SessionContext::new_with_config(config);
737        let ctx = session.state();
738
739        // Read the full file
740        let projection = None;
741        let exec = get_exec(&ctx, "alltypes_plain.parquet", projection, None).await?;
742
743        // Read only one column. This should scan less data.
744        let projection = Some(vec![0]);
745        let exec_projected =
746            get_exec(&ctx, "alltypes_plain.parquet", projection, None).await?;
747
748        let task_ctx = ctx.task_ctx();
749
750        let _ = collect(exec.clone(), task_ctx.clone()).await?;
751        let _ = collect(exec_projected.clone(), task_ctx).await?;
752
753        assert_bytes_scanned(exec, 671);
754        assert_bytes_scanned(exec_projected, 73);
755
756        Ok(())
757    }
758
759    #[tokio::test]
760    async fn read_limit() -> Result<()> {
761        let session_ctx = SessionContext::new();
762        let state = session_ctx.state();
763        let task_ctx = state.task_ctx();
764        let projection = None;
765        let exec =
766            get_exec(&state, "alltypes_plain.parquet", projection, Some(1)).await?;
767
768        // note: even if the limit is set, the executor rounds up to the batch size
769        assert_eq!(
770            exec.partition_statistics(None)?.num_rows,
771            Precision::Exact(8)
772        );
773        // TODO correct byte size: https://github.com/apache/datafusion/issues/14936
774        assert_eq!(
775            exec.partition_statistics(None)?.total_byte_size,
776            Precision::Exact(671)
777        );
778        let batches = collect(exec, task_ctx).await?;
779        assert_eq!(1, batches.len());
780        assert_eq!(11, batches[0].num_columns());
781        assert_eq!(1, batches[0].num_rows());
782
783        Ok(())
784    }
785
786    fn set_view_state(mut state: SessionState, use_views: bool) -> SessionState {
787        let mut options = TableParquetOptions::default();
788        options.global.schema_force_view_types = use_views;
789        state
790            .register_file_format(
791                Arc::new(ParquetFormatFactory::new_with_options(options)),
792                true,
793            )
794            .expect("ok");
795        state
796    }
797
798    async fn _run_read_alltypes_plain_parquet(
799        force_views: ForceViews,
800        expected: &str,
801    ) -> Result<()> {
802        let force_views = match force_views {
803            ForceViews::Yes => true,
804            ForceViews::No => false,
805        };
806
807        let session_ctx = SessionContext::new();
808        let mut state = session_ctx.state();
809        state = set_view_state(state, force_views);
810
811        let task_ctx = state.task_ctx();
812        let projection = None;
813        let exec = get_exec(&state, "alltypes_plain.parquet", projection, None).await?;
814
815        let x: Vec<String> = exec
816            .schema()
817            .fields()
818            .iter()
819            .map(|f| format!("{}: {:?}", f.name(), f.data_type()))
820            .collect();
821        let y = x.join("\n");
822        assert_eq!(expected, y);
823
824        let batches = collect(exec, task_ctx).await?;
825
826        assert_eq!(1, batches.len());
827        assert_eq!(11, batches[0].num_columns());
828        assert_eq!(8, batches[0].num_rows());
829
830        Ok(())
831    }
832
833    #[tokio::test]
834    async fn read_alltypes_plain_parquet() -> Result<()> {
835        let no_views = "id: Int32\n\
836             bool_col: Boolean\n\
837             tinyint_col: Int32\n\
838             smallint_col: Int32\n\
839             int_col: Int32\n\
840             bigint_col: Int64\n\
841             float_col: Float32\n\
842             double_col: Float64\n\
843             date_string_col: Binary\n\
844             string_col: Binary\n\
845             timestamp_col: Timestamp(Nanosecond, None)";
846        _run_read_alltypes_plain_parquet(ForceViews::No, no_views).await?;
847
848        let with_views = "id: Int32\n\
849             bool_col: Boolean\n\
850             tinyint_col: Int32\n\
851             smallint_col: Int32\n\
852             int_col: Int32\n\
853             bigint_col: Int64\n\
854             float_col: Float32\n\
855             double_col: Float64\n\
856             date_string_col: BinaryView\n\
857             string_col: BinaryView\n\
858             timestamp_col: Timestamp(Nanosecond, None)";
859        _run_read_alltypes_plain_parquet(ForceViews::Yes, with_views).await?;
860
861        Ok(())
862    }
863
864    #[tokio::test]
865    async fn read_bool_alltypes_plain_parquet() -> Result<()> {
866        let session_ctx = SessionContext::new();
867        let state = session_ctx.state();
868        let task_ctx = state.task_ctx();
869        let projection = Some(vec![1]);
870        let exec = get_exec(&state, "alltypes_plain.parquet", projection, None).await?;
871
872        let batches = collect(exec, task_ctx).await?;
873        assert_eq!(1, batches.len());
874        assert_eq!(1, batches[0].num_columns());
875        assert_eq!(8, batches[0].num_rows());
876
877        let array = as_boolean_array(batches[0].column(0))?;
878        let mut values: Vec<bool> = vec![];
879        for i in 0..batches[0].num_rows() {
880            values.push(array.value(i));
881        }
882
883        assert_eq!(
884            "[true, false, true, false, true, false, true, false]",
885            format!("{values:?}")
886        );
887
888        Ok(())
889    }
890
891    #[tokio::test]
892    async fn read_i32_alltypes_plain_parquet() -> Result<()> {
893        let session_ctx = SessionContext::new();
894        let state = session_ctx.state();
895        let task_ctx = state.task_ctx();
896        let projection = Some(vec![0]);
897        let exec = get_exec(&state, "alltypes_plain.parquet", projection, None).await?;
898
899        let batches = collect(exec, task_ctx).await?;
900        assert_eq!(1, batches.len());
901        assert_eq!(1, batches[0].num_columns());
902        assert_eq!(8, batches[0].num_rows());
903
904        let array = as_int32_array(batches[0].column(0))?;
905        let mut values: Vec<i32> = vec![];
906        for i in 0..batches[0].num_rows() {
907            values.push(array.value(i));
908        }
909
910        assert_eq!("[4, 5, 6, 7, 2, 3, 0, 1]", format!("{values:?}"));
911
912        Ok(())
913    }
914
915    #[tokio::test]
916    async fn read_i96_alltypes_plain_parquet() -> Result<()> {
917        let session_ctx = SessionContext::new();
918        let state = session_ctx.state();
919        let task_ctx = state.task_ctx();
920        let projection = Some(vec![10]);
921        let exec = get_exec(&state, "alltypes_plain.parquet", projection, None).await?;
922
923        let batches = collect(exec, task_ctx).await?;
924        assert_eq!(1, batches.len());
925        assert_eq!(1, batches[0].num_columns());
926        assert_eq!(8, batches[0].num_rows());
927
928        let array = as_timestamp_nanosecond_array(batches[0].column(0))?;
929        let mut values: Vec<i64> = vec![];
930        for i in 0..batches[0].num_rows() {
931            values.push(array.value(i));
932        }
933
934        assert_eq!("[1235865600000000000, 1235865660000000000, 1238544000000000000, 1238544060000000000, 1233446400000000000, 1233446460000000000, 1230768000000000000, 1230768060000000000]", format!("{values:?}"));
935
936        Ok(())
937    }
938
939    #[tokio::test]
940    async fn read_f32_alltypes_plain_parquet() -> Result<()> {
941        let session_ctx = SessionContext::new();
942        let state = session_ctx.state();
943        let task_ctx = state.task_ctx();
944        let projection = Some(vec![6]);
945        let exec = get_exec(&state, "alltypes_plain.parquet", projection, None).await?;
946
947        let batches = collect(exec, task_ctx).await?;
948        assert_eq!(1, batches.len());
949        assert_eq!(1, batches[0].num_columns());
950        assert_eq!(8, batches[0].num_rows());
951
952        let array = as_float32_array(batches[0].column(0))?;
953        let mut values: Vec<f32> = vec![];
954        for i in 0..batches[0].num_rows() {
955            values.push(array.value(i));
956        }
957
958        assert_eq!(
959            "[0.0, 1.1, 0.0, 1.1, 0.0, 1.1, 0.0, 1.1]",
960            format!("{values:?}")
961        );
962
963        Ok(())
964    }
965
966    #[tokio::test]
967    async fn read_f64_alltypes_plain_parquet() -> Result<()> {
968        let session_ctx = SessionContext::new();
969        let state = session_ctx.state();
970        let task_ctx = state.task_ctx();
971        let projection = Some(vec![7]);
972        let exec = get_exec(&state, "alltypes_plain.parquet", projection, None).await?;
973
974        let batches = collect(exec, task_ctx).await?;
975        assert_eq!(1, batches.len());
976        assert_eq!(1, batches[0].num_columns());
977        assert_eq!(8, batches[0].num_rows());
978
979        let array = as_float64_array(batches[0].column(0))?;
980        let mut values: Vec<f64> = vec![];
981        for i in 0..batches[0].num_rows() {
982            values.push(array.value(i));
983        }
984
985        assert_eq!(
986            "[0.0, 10.1, 0.0, 10.1, 0.0, 10.1, 0.0, 10.1]",
987            format!("{values:?}")
988        );
989
990        Ok(())
991    }
992
993    #[tokio::test]
994    async fn read_binary_alltypes_plain_parquet() -> Result<()> {
995        let session_ctx = SessionContext::new();
996        let mut state = session_ctx.state();
997        state = set_view_state(state, false);
998
999        let task_ctx = state.task_ctx();
1000        let projection = Some(vec![9]);
1001        let exec = get_exec(&state, "alltypes_plain.parquet", projection, None).await?;
1002
1003        let batches = collect(exec, task_ctx).await?;
1004        assert_eq!(1, batches.len());
1005        assert_eq!(1, batches[0].num_columns());
1006        assert_eq!(8, batches[0].num_rows());
1007
1008        let array = as_binary_array(batches[0].column(0))?;
1009        let mut values: Vec<&str> = vec![];
1010        for i in 0..batches[0].num_rows() {
1011            values.push(std::str::from_utf8(array.value(i)).unwrap());
1012        }
1013
1014        assert_eq!(
1015            "[\"0\", \"1\", \"0\", \"1\", \"0\", \"1\", \"0\", \"1\"]",
1016            format!("{values:?}")
1017        );
1018
1019        Ok(())
1020    }
1021
1022    #[tokio::test]
1023    async fn read_binaryview_alltypes_plain_parquet() -> Result<()> {
1024        let session_ctx = SessionContext::new();
1025        let mut state = session_ctx.state();
1026        state = set_view_state(state, true);
1027
1028        let task_ctx = state.task_ctx();
1029        let projection = Some(vec![9]);
1030        let exec = get_exec(&state, "alltypes_plain.parquet", projection, None).await?;
1031
1032        let batches = collect(exec, task_ctx).await?;
1033        assert_eq!(1, batches.len());
1034        assert_eq!(1, batches[0].num_columns());
1035        assert_eq!(8, batches[0].num_rows());
1036
1037        let array = as_binary_view_array(batches[0].column(0))?;
1038        let mut values: Vec<&str> = vec![];
1039        for i in 0..batches[0].num_rows() {
1040            values.push(std::str::from_utf8(array.value(i)).unwrap());
1041        }
1042
1043        assert_eq!(
1044            "[\"0\", \"1\", \"0\", \"1\", \"0\", \"1\", \"0\", \"1\"]",
1045            format!("{values:?}")
1046        );
1047
1048        Ok(())
1049    }
1050
1051    #[tokio::test]
1052    async fn read_decimal_parquet() -> Result<()> {
1053        let session_ctx = SessionContext::new();
1054        let state = session_ctx.state();
1055        let task_ctx = state.task_ctx();
1056
1057        // parquet use the int32 as the physical type to store decimal
1058        let exec = get_exec(&state, "int32_decimal.parquet", None, None).await?;
1059        let batches = collect(exec, task_ctx.clone()).await?;
1060        assert_eq!(1, batches.len());
1061        assert_eq!(1, batches[0].num_columns());
1062        let column = batches[0].column(0);
1063        assert_eq!(&DataType::Decimal128(4, 2), column.data_type());
1064
1065        // parquet use the int64 as the physical type to store decimal
1066        let exec = get_exec(&state, "int64_decimal.parquet", None, None).await?;
1067        let batches = collect(exec, task_ctx.clone()).await?;
1068        assert_eq!(1, batches.len());
1069        assert_eq!(1, batches[0].num_columns());
1070        let column = batches[0].column(0);
1071        assert_eq!(&DataType::Decimal128(10, 2), column.data_type());
1072
1073        // parquet use the fixed length binary as the physical type to store decimal
1074        let exec = get_exec(&state, "fixed_length_decimal.parquet", None, None).await?;
1075        let batches = collect(exec, task_ctx.clone()).await?;
1076        assert_eq!(1, batches.len());
1077        assert_eq!(1, batches[0].num_columns());
1078        let column = batches[0].column(0);
1079        assert_eq!(&DataType::Decimal128(25, 2), column.data_type());
1080
1081        let exec =
1082            get_exec(&state, "fixed_length_decimal_legacy.parquet", None, None).await?;
1083        let batches = collect(exec, task_ctx.clone()).await?;
1084        assert_eq!(1, batches.len());
1085        assert_eq!(1, batches[0].num_columns());
1086        let column = batches[0].column(0);
1087        assert_eq!(&DataType::Decimal128(13, 2), column.data_type());
1088
1089        // parquet use the byte array as the physical type to store decimal
1090        let exec = get_exec(&state, "byte_array_decimal.parquet", None, None).await?;
1091        let batches = collect(exec, task_ctx.clone()).await?;
1092        assert_eq!(1, batches.len());
1093        assert_eq!(1, batches[0].num_columns());
1094        let column = batches[0].column(0);
1095        assert_eq!(&DataType::Decimal128(4, 2), column.data_type());
1096
1097        Ok(())
1098    }
1099    #[tokio::test]
1100    async fn test_read_parquet_page_index() -> Result<()> {
1101        let testdata = datafusion_common::test_util::parquet_test_data();
1102        let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1103        let file = File::open(path).await?;
1104        let options = ArrowReaderOptions::new().with_page_index(true);
1105        let builder =
1106            ParquetRecordBatchStreamBuilder::new_with_options(file, options.clone())
1107                .await?
1108                .metadata()
1109                .clone();
1110        check_page_index_validation(builder.column_index(), builder.offset_index());
1111
1112        let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
1113        let file = File::open(path).await?;
1114
1115        let builder = ParquetRecordBatchStreamBuilder::new_with_options(file, options)
1116            .await?
1117            .metadata()
1118            .clone();
1119        check_page_index_validation(builder.column_index(), builder.offset_index());
1120
1121        Ok(())
1122    }
1123
1124    fn check_page_index_validation(
1125        page_index: Option<&ParquetColumnIndex>,
1126        offset_index: Option<&ParquetOffsetIndex>,
1127    ) {
1128        assert!(page_index.is_some());
1129        assert!(offset_index.is_some());
1130
1131        let page_index = page_index.unwrap();
1132        let offset_index = offset_index.unwrap();
1133
1134        // there is only one row group in one file.
1135        assert_eq!(page_index.len(), 1);
1136        assert_eq!(offset_index.len(), 1);
1137        let page_index = page_index.first().unwrap();
1138        let offset_index = offset_index.first().unwrap();
1139
1140        // 13 col in one row group
1141        assert_eq!(page_index.len(), 13);
1142        assert_eq!(offset_index.len(), 13);
1143
1144        // test result in int_col
1145        let int_col_index = page_index.get(4).unwrap();
1146        let int_col_offset = offset_index.get(4).unwrap().page_locations();
1147
1148        // 325 pages in int_col
1149        assert_eq!(int_col_offset.len(), 325);
1150        let ColumnIndexMetaData::INT32(index) = int_col_index else {
1151            panic!("fail to read page index.")
1152        };
1153        assert_eq!(index.min_values().len(), 325);
1154        assert_eq!(index.max_values().len(), 325);
1155        // all values are non null
1156        for idx in 0..325 {
1157            assert_eq!(index.null_count(idx), Some(0));
1158        }
1159    }
1160
1161    fn assert_bytes_scanned(exec: Arc<dyn ExecutionPlan>, expected: usize) {
1162        let actual = exec
1163            .metrics()
1164            .expect("Metrics not recorded")
1165            .sum(|metric| matches!(metric.value(), MetricValue::Count { name, .. } if name == "bytes_scanned"))
1166            .map(|t| t.as_usize())
1167            .expect("bytes_scanned metric not recorded");
1168
1169        assert_eq!(actual, expected);
1170    }
1171
1172    async fn get_exec(
1173        state: &dyn Session,
1174        file_name: &str,
1175        projection: Option<Vec<usize>>,
1176        limit: Option<usize>,
1177    ) -> Result<Arc<dyn ExecutionPlan>> {
1178        let testdata = datafusion_common::test_util::parquet_test_data();
1179        let state = state.as_any().downcast_ref::<SessionState>().unwrap();
1180        let format = state
1181            .get_file_format_factory("parquet")
1182            .map(|factory| factory.create(state, &Default::default()).unwrap())
1183            .unwrap_or_else(|| Arc::new(ParquetFormat::new()));
1184
1185        scan_format(
1186            state, &*format, None, &testdata, file_name, projection, limit,
1187        )
1188        .await
1189    }
1190
1191    /// Test that 0-byte files don't break while reading
1192    #[tokio::test]
1193    async fn test_read_empty_parquet() -> Result<()> {
1194        let tmp_dir = tempfile::TempDir::new()?;
1195        let path = format!("{}/empty.parquet", tmp_dir.path().to_string_lossy());
1196        File::create(&path).await?;
1197
1198        let ctx = SessionContext::new();
1199
1200        let df = ctx
1201            .read_parquet(&path, ParquetReadOptions::default())
1202            .await
1203            .expect("read_parquet should succeed");
1204
1205        let result = df.collect().await?;
1206
1207        assert_snapshot!(batches_to_string(&result), @r###"
1208            ++
1209            ++
1210       "###);
1211
1212        Ok(())
1213    }
1214
1215    /// Test that 0-byte files don't break while reading
1216    #[tokio::test]
1217    async fn test_read_partitioned_empty_parquet() -> Result<()> {
1218        let tmp_dir = tempfile::TempDir::new()?;
1219        let partition_dir = tmp_dir.path().join("col1=a");
1220        std::fs::create_dir(&partition_dir)?;
1221        File::create(partition_dir.join("empty.parquet")).await?;
1222
1223        let ctx = SessionContext::new();
1224
1225        let df = ctx
1226            .read_parquet(
1227                tmp_dir.path().to_str().unwrap(),
1228                ParquetReadOptions::new()
1229                    .table_partition_cols(vec![("col1".to_string(), DataType::Utf8)]),
1230            )
1231            .await
1232            .expect("read_parquet should succeed");
1233
1234        let result = df.collect().await?;
1235
1236        assert_snapshot!(batches_to_string(&result), @r###"
1237            ++
1238            ++
1239       "###);
1240
1241        Ok(())
1242    }
1243
1244    fn build_ctx(store_url: &url::Url) -> Arc<TaskContext> {
1245        let tmp_dir = tempfile::TempDir::new().unwrap();
1246        let local = Arc::new(
1247            LocalFileSystem::new_with_prefix(&tmp_dir)
1248                .expect("should create object store"),
1249        );
1250
1251        let mut session = SessionConfig::default();
1252        let mut parquet_opts = ParquetOptions {
1253            allow_single_file_parallelism: true,
1254            ..Default::default()
1255        };
1256        parquet_opts.allow_single_file_parallelism = true;
1257        session.options_mut().execution.parquet = parquet_opts;
1258
1259        let runtime = RuntimeEnv::default();
1260        runtime
1261            .object_store_registry
1262            .register_store(store_url, local);
1263
1264        Arc::new(
1265            TaskContext::default()
1266                .with_session_config(session)
1267                .with_runtime(Arc::new(runtime)),
1268        )
1269    }
1270
1271    #[tokio::test]
1272    async fn parquet_sink_write() -> Result<()> {
1273        let parquet_sink = create_written_parquet_sink("file:///").await?;
1274
1275        // assert written to proper path
1276        let (path, file_metadata) = get_written(parquet_sink)?;
1277        let path_parts = path.parts().collect::<Vec<_>>();
1278        assert_eq!(path_parts.len(), 1, "should not have path prefix");
1279
1280        // check the file metadata
1281        let expected_kv_meta = vec![
1282            // default is to include arrow schema
1283            KeyValue {
1284                key: "ARROW:schema".to_string(),
1285                value: Some(ENCODED_ARROW_SCHEMA.to_string()),
1286            },
1287            KeyValue {
1288                key: "my-data".to_string(),
1289                value: Some("stuff".to_string()),
1290            },
1291            KeyValue {
1292                key: "my-data-bool-key".to_string(),
1293                value: None,
1294            },
1295        ];
1296        assert_file_metadata(file_metadata, &expected_kv_meta);
1297
1298        Ok(())
1299    }
1300
1301    #[tokio::test]
1302    async fn parquet_sink_parallel_write() -> Result<()> {
1303        let opts = ParquetOptions {
1304            allow_single_file_parallelism: true,
1305            maximum_parallel_row_group_writers: 2,
1306            maximum_buffered_record_batches_per_stream: 2,
1307            ..Default::default()
1308        };
1309
1310        let parquet_sink =
1311            create_written_parquet_sink_using_config("file:///", opts).await?;
1312
1313        // assert written to proper path
1314        let (path, file_metadata) = get_written(parquet_sink)?;
1315        let path_parts = path.parts().collect::<Vec<_>>();
1316        assert_eq!(path_parts.len(), 1, "should not have path prefix");
1317
1318        // check the file metadata
1319        let expected_kv_meta = vec![
1320            // default is to include arrow schema
1321            KeyValue {
1322                key: "ARROW:schema".to_string(),
1323                value: Some(ENCODED_ARROW_SCHEMA.to_string()),
1324            },
1325            KeyValue {
1326                key: "my-data".to_string(),
1327                value: Some("stuff".to_string()),
1328            },
1329            KeyValue {
1330                key: "my-data-bool-key".to_string(),
1331                value: None,
1332            },
1333        ];
1334        assert_file_metadata(file_metadata, &expected_kv_meta);
1335
1336        Ok(())
1337    }
1338
1339    #[tokio::test]
1340    async fn test_write_empty_recordbatch_creates_file() -> Result<()> {
1341        let empty_record_batch = RecordBatch::try_new(
1342            Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])),
1343            vec![Arc::new(Int32Array::from(Vec::<i32>::new()))],
1344        )
1345        .expect("Failed to create empty RecordBatch");
1346
1347        let tmp_dir = tempfile::TempDir::new()?;
1348        let path = format!("{}/empty2.parquet", tmp_dir.path().to_string_lossy());
1349
1350        let ctx = SessionContext::new();
1351        let df = ctx.read_batch(empty_record_batch.clone())?;
1352        df.write_parquet(&path, crate::dataframe::DataFrameWriteOptions::new(), None)
1353            .await?;
1354        assert!(std::path::Path::new(&path).exists());
1355
1356        let stream = ctx
1357            .read_parquet(&path, ParquetReadOptions::new())
1358            .await?
1359            .execute_stream()
1360            .await?;
1361        assert_eq!(stream.schema(), empty_record_batch.schema());
1362        let results = stream.collect::<Vec<_>>().await;
1363        assert_eq!(results.len(), 0);
1364        Ok(())
1365    }
1366
1367    #[tokio::test]
1368    async fn parquet_sink_write_insert_schema_into_metadata() -> Result<()> {
1369        // expected kv metadata without schema
1370        let expected_without = vec![
1371            KeyValue {
1372                key: "my-data".to_string(),
1373                value: Some("stuff".to_string()),
1374            },
1375            KeyValue {
1376                key: "my-data-bool-key".to_string(),
1377                value: None,
1378            },
1379        ];
1380        // expected kv metadata with schema
1381        let expected_with = [
1382            vec![KeyValue {
1383                key: "ARROW:schema".to_string(),
1384                value: Some(ENCODED_ARROW_SCHEMA.to_string()),
1385            }],
1386            expected_without.clone(),
1387        ]
1388        .concat();
1389
1390        // single threaded write, skip insert
1391        let opts = ParquetOptions {
1392            allow_single_file_parallelism: false,
1393            skip_arrow_metadata: true,
1394            ..Default::default()
1395        };
1396        let parquet_sink =
1397            create_written_parquet_sink_using_config("file:///", opts).await?;
1398        let (_, file_metadata) = get_written(parquet_sink)?;
1399        assert_file_metadata(file_metadata, &expected_without);
1400
1401        // single threaded write, do not skip insert
1402        let opts = ParquetOptions {
1403            allow_single_file_parallelism: false,
1404            skip_arrow_metadata: false,
1405            ..Default::default()
1406        };
1407        let parquet_sink =
1408            create_written_parquet_sink_using_config("file:///", opts).await?;
1409        let (_, file_metadata) = get_written(parquet_sink)?;
1410        assert_file_metadata(file_metadata, &expected_with);
1411
1412        // multithreaded write, skip insert
1413        let opts = ParquetOptions {
1414            allow_single_file_parallelism: true,
1415            maximum_parallel_row_group_writers: 2,
1416            maximum_buffered_record_batches_per_stream: 2,
1417            skip_arrow_metadata: true,
1418            ..Default::default()
1419        };
1420        let parquet_sink =
1421            create_written_parquet_sink_using_config("file:///", opts).await?;
1422        let (_, file_metadata) = get_written(parquet_sink)?;
1423        assert_file_metadata(file_metadata, &expected_without);
1424
1425        // multithreaded write, do not skip insert
1426        let opts = ParquetOptions {
1427            allow_single_file_parallelism: true,
1428            maximum_parallel_row_group_writers: 2,
1429            maximum_buffered_record_batches_per_stream: 2,
1430            skip_arrow_metadata: false,
1431            ..Default::default()
1432        };
1433        let parquet_sink =
1434            create_written_parquet_sink_using_config("file:///", opts).await?;
1435        let (_, file_metadata) = get_written(parquet_sink)?;
1436        assert_file_metadata(file_metadata, &expected_with);
1437
1438        Ok(())
1439    }
1440
1441    #[tokio::test]
1442    async fn parquet_sink_write_with_extension() -> Result<()> {
1443        let filename = "test_file.custom_ext";
1444        let file_path = format!("file:///path/to/{filename}");
1445        let parquet_sink = create_written_parquet_sink(file_path.as_str()).await?;
1446
1447        // assert written to proper path
1448        let (path, _) = get_written(parquet_sink)?;
1449        let path_parts = path.parts().collect::<Vec<_>>();
1450        assert_eq!(
1451            path_parts.len(),
1452            3,
1453            "Expected 3 path parts, instead found {}",
1454            path_parts.len()
1455        );
1456        assert_eq!(path_parts.last().unwrap().as_ref(), filename);
1457
1458        Ok(())
1459    }
1460
1461    #[tokio::test]
1462    async fn parquet_sink_write_with_directory_name() -> Result<()> {
1463        let file_path = "file:///path/to";
1464        let parquet_sink = create_written_parquet_sink(file_path).await?;
1465
1466        // assert written to proper path
1467        let (path, _) = get_written(parquet_sink)?;
1468        let path_parts = path.parts().collect::<Vec<_>>();
1469        assert_eq!(
1470            path_parts.len(),
1471            3,
1472            "Expected 3 path parts, instead found {}",
1473            path_parts.len()
1474        );
1475        assert!(path_parts.last().unwrap().as_ref().ends_with(".parquet"));
1476
1477        Ok(())
1478    }
1479
1480    #[tokio::test]
1481    async fn parquet_sink_write_with_folder_ending() -> Result<()> {
1482        let file_path = "file:///path/to/";
1483        let parquet_sink = create_written_parquet_sink(file_path).await?;
1484
1485        // assert written to proper path
1486        let (path, _) = get_written(parquet_sink)?;
1487        let path_parts = path.parts().collect::<Vec<_>>();
1488        assert_eq!(
1489            path_parts.len(),
1490            3,
1491            "Expected 3 path parts, instead found {}",
1492            path_parts.len()
1493        );
1494        assert!(path_parts.last().unwrap().as_ref().ends_with(".parquet"));
1495
1496        Ok(())
1497    }
1498
1499    async fn create_written_parquet_sink(table_path: &str) -> Result<Arc<ParquetSink>> {
1500        create_written_parquet_sink_using_config(table_path, ParquetOptions::default())
1501            .await
1502    }
1503
1504    static ENCODED_ARROW_SCHEMA: &str = "/////5QAAAAQAAAAAAAKAAwACgAJAAQACgAAABAAAAAAAQQACAAIAAAABAAIAAAABAAAAAIAAAA8AAAABAAAANz///8UAAAADAAAAAAAAAUMAAAAAAAAAMz///8BAAAAYgAAABAAFAAQAAAADwAEAAAACAAQAAAAGAAAAAwAAAAAAAAFEAAAAAAAAAAEAAQABAAAAAEAAABhAAAA";
1505
1506    async fn create_written_parquet_sink_using_config(
1507        table_path: &str,
1508        global: ParquetOptions,
1509    ) -> Result<Arc<ParquetSink>> {
1510        // schema should match the ENCODED_ARROW_SCHEMA bove
1511        let field_a = Field::new("a", DataType::Utf8, false);
1512        let field_b = Field::new("b", DataType::Utf8, false);
1513        let schema = Arc::new(Schema::new(vec![field_a, field_b]));
1514        let object_store_url = ObjectStoreUrl::local_filesystem();
1515
1516        let file_sink_config = FileSinkConfig {
1517            original_url: String::default(),
1518            object_store_url: object_store_url.clone(),
1519            file_group: FileGroup::new(vec![PartitionedFile::new("/tmp".to_string(), 1)]),
1520            table_paths: vec![ListingTableUrl::parse(table_path)?],
1521            output_schema: schema.clone(),
1522            table_partition_cols: vec![],
1523            insert_op: InsertOp::Overwrite,
1524            keep_partition_by_columns: false,
1525            file_extension: "parquet".into(),
1526        };
1527        let parquet_sink = Arc::new(ParquetSink::new(
1528            file_sink_config,
1529            TableParquetOptions {
1530                key_value_metadata: std::collections::HashMap::from([
1531                    ("my-data".to_string(), Some("stuff".to_string())),
1532                    ("my-data-bool-key".to_string(), None),
1533                ]),
1534                global,
1535                ..Default::default()
1536            },
1537        ));
1538
1539        // create data
1540        let col_a: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar"]));
1541        let col_b: ArrayRef = Arc::new(StringArray::from(vec!["baz", "baz"]));
1542        let batch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)])?;
1543
1544        // write stream
1545        FileSink::write_all(
1546            parquet_sink.as_ref(),
1547            Box::pin(RecordBatchStreamAdapter::new(
1548                schema,
1549                futures::stream::iter(vec![Ok(batch)]),
1550            )),
1551            &build_ctx(object_store_url.as_ref()),
1552        )
1553        .await?;
1554
1555        Ok(parquet_sink)
1556    }
1557
1558    fn get_written(parquet_sink: Arc<ParquetSink>) -> Result<(Path, ParquetMetaData)> {
1559        let mut written = parquet_sink.written();
1560        let written = written.drain();
1561        assert_eq!(
1562            written.len(),
1563            1,
1564            "expected a single parquet files to be written, instead found {}",
1565            written.len()
1566        );
1567
1568        let (path, parquet_meta_data) = written.take(1).next().unwrap();
1569        Ok((path, parquet_meta_data))
1570    }
1571
1572    fn assert_file_metadata(
1573        parquet_meta_data: ParquetMetaData,
1574        expected_kv: &Vec<KeyValue>,
1575    ) {
1576        let file_metadata = parquet_meta_data.file_metadata();
1577        let schema_descr = file_metadata.schema_descr();
1578        assert_eq!(file_metadata.num_rows(), 2, "file metadata to have 2 rows");
1579        assert!(
1580            schema_descr
1581                .columns()
1582                .iter()
1583                .any(|col_schema| col_schema.name() == "a"),
1584            "output file metadata should contain col a"
1585        );
1586        assert!(
1587            schema_descr
1588                .columns()
1589                .iter()
1590                .any(|col_schema| col_schema.name() == "b"),
1591            "output file metadata should contain col b"
1592        );
1593
1594        let mut key_value_metadata = file_metadata.key_value_metadata().unwrap().clone();
1595        key_value_metadata.sort_by(|a, b| a.key.cmp(&b.key));
1596        assert_eq!(&key_value_metadata, expected_kv);
1597    }
1598
1599    #[tokio::test]
1600    async fn parquet_sink_write_partitions() -> Result<()> {
1601        let field_a = Field::new("a", DataType::Utf8, false);
1602        let field_b = Field::new("b", DataType::Utf8, false);
1603        let schema = Arc::new(Schema::new(vec![field_a, field_b]));
1604        let object_store_url = ObjectStoreUrl::local_filesystem();
1605
1606        // set file config to include partitioning on field_a
1607        let file_sink_config = FileSinkConfig {
1608            original_url: String::default(),
1609            object_store_url: object_store_url.clone(),
1610            file_group: FileGroup::new(vec![PartitionedFile::new("/tmp".to_string(), 1)]),
1611            table_paths: vec![ListingTableUrl::parse("file:///")?],
1612            output_schema: schema.clone(),
1613            table_partition_cols: vec![("a".to_string(), DataType::Utf8)], // add partitioning
1614            insert_op: InsertOp::Overwrite,
1615            keep_partition_by_columns: false,
1616            file_extension: "parquet".into(),
1617        };
1618        let parquet_sink = Arc::new(ParquetSink::new(
1619            file_sink_config,
1620            TableParquetOptions::default(),
1621        ));
1622
1623        // create data with 2 partitions
1624        let col_a: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar"]));
1625        let col_b: ArrayRef = Arc::new(StringArray::from(vec!["baz", "baz"]));
1626        let batch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)])?;
1627
1628        // write stream
1629        FileSink::write_all(
1630            parquet_sink.as_ref(),
1631            Box::pin(RecordBatchStreamAdapter::new(
1632                schema,
1633                futures::stream::iter(vec![Ok(batch)]),
1634            )),
1635            &build_ctx(object_store_url.as_ref()),
1636        )
1637        .await?;
1638
1639        // assert written
1640        let mut written = parquet_sink.written();
1641        let written = written.drain();
1642        assert_eq!(
1643            written.len(),
1644            2,
1645            "expected two parquet files to be written, instead found {}",
1646            written.len()
1647        );
1648
1649        // check the file metadata includes partitions
1650        let mut expected_partitions = std::collections::HashSet::from(["a=foo", "a=bar"]);
1651        for (path, parquet_metadata) in written.take(2) {
1652            let file_metadata = parquet_metadata.file_metadata();
1653            let schema = file_metadata.schema_descr();
1654            let num_rows = file_metadata.num_rows();
1655
1656            let path_parts = path.parts().collect::<Vec<_>>();
1657            assert_eq!(path_parts.len(), 2, "should have path prefix");
1658
1659            let prefix = path_parts[0].as_ref();
1660            assert!(
1661                expected_partitions.contains(prefix),
1662                "expected path prefix to match partition, instead found {prefix:?}"
1663            );
1664            expected_partitions.remove(prefix);
1665
1666            assert_eq!(num_rows, 1, "file metadata to have 1 row");
1667            assert!(
1668                !schema
1669                    .columns()
1670                    .iter()
1671                    .any(|col_schema| col_schema.name() == "a"),
1672                "output file metadata will not contain partitioned col a"
1673            );
1674            assert!(
1675                schema
1676                    .columns()
1677                    .iter()
1678                    .any(|col_schema| col_schema.name() == "b"),
1679                "output file metadata should contain col b"
1680            );
1681        }
1682
1683        Ok(())
1684    }
1685
1686    #[tokio::test]
1687    async fn parquet_sink_write_memory_reservation() -> Result<()> {
1688        async fn test_memory_reservation(global: ParquetOptions) -> Result<()> {
1689            let field_a = Field::new("a", DataType::Utf8, false);
1690            let field_b = Field::new("b", DataType::Utf8, false);
1691            let schema = Arc::new(Schema::new(vec![field_a, field_b]));
1692            let object_store_url = ObjectStoreUrl::local_filesystem();
1693
1694            let file_sink_config = FileSinkConfig {
1695                original_url: String::default(),
1696                object_store_url: object_store_url.clone(),
1697                file_group: FileGroup::new(vec![PartitionedFile::new(
1698                    "/tmp".to_string(),
1699                    1,
1700                )]),
1701                table_paths: vec![ListingTableUrl::parse("file:///")?],
1702                output_schema: schema.clone(),
1703                table_partition_cols: vec![],
1704                insert_op: InsertOp::Overwrite,
1705                keep_partition_by_columns: false,
1706                file_extension: "parquet".into(),
1707            };
1708            let parquet_sink = Arc::new(ParquetSink::new(
1709                file_sink_config,
1710                TableParquetOptions {
1711                    key_value_metadata: std::collections::HashMap::from([
1712                        ("my-data".to_string(), Some("stuff".to_string())),
1713                        ("my-data-bool-key".to_string(), None),
1714                    ]),
1715                    global,
1716                    ..Default::default()
1717                },
1718            ));
1719
1720            // create data
1721            let col_a: ArrayRef = Arc::new(StringArray::from(vec!["foo", "bar"]));
1722            let col_b: ArrayRef = Arc::new(StringArray::from(vec!["baz", "baz"]));
1723            let batch = RecordBatch::try_from_iter(vec![("a", col_a), ("b", col_b)])?;
1724
1725            // create task context
1726            let task_context = build_ctx(object_store_url.as_ref());
1727            assert_eq!(
1728                task_context.memory_pool().reserved(),
1729                0,
1730                "no bytes are reserved yet"
1731            );
1732
1733            let mut write_task = FileSink::write_all(
1734                parquet_sink.as_ref(),
1735                Box::pin(RecordBatchStreamAdapter::new(
1736                    schema,
1737                    bounded_stream(batch, 1000),
1738                )),
1739                &task_context,
1740            );
1741
1742            // incrementally poll and check for memory reservation
1743            let mut reserved_bytes = 0;
1744            while futures::poll!(&mut write_task).is_pending() {
1745                reserved_bytes += task_context.memory_pool().reserved();
1746                tokio::time::sleep(Duration::from_micros(1)).await;
1747            }
1748            assert!(
1749                reserved_bytes > 0,
1750                "should have bytes reserved during write"
1751            );
1752            assert_eq!(
1753                task_context.memory_pool().reserved(),
1754                0,
1755                "no leaking byte reservation"
1756            );
1757
1758            Ok(())
1759        }
1760
1761        let write_opts = ParquetOptions {
1762            allow_single_file_parallelism: false,
1763            ..Default::default()
1764        };
1765        test_memory_reservation(write_opts)
1766            .await
1767            .expect("should track for non-parallel writes");
1768
1769        let row_parallel_write_opts = ParquetOptions {
1770            allow_single_file_parallelism: true,
1771            maximum_parallel_row_group_writers: 10,
1772            maximum_buffered_record_batches_per_stream: 1,
1773            ..Default::default()
1774        };
1775        test_memory_reservation(row_parallel_write_opts)
1776            .await
1777            .expect("should track for row-parallel writes");
1778
1779        let col_parallel_write_opts = ParquetOptions {
1780            allow_single_file_parallelism: true,
1781            maximum_parallel_row_group_writers: 1,
1782            maximum_buffered_record_batches_per_stream: 2,
1783            ..Default::default()
1784        };
1785        test_memory_reservation(col_parallel_write_opts)
1786            .await
1787            .expect("should track for column-parallel writes");
1788
1789        Ok(())
1790    }
1791}