datafusion/datasource/physical_plan/
parquet.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Reexports the [`datafusion_datasource_parquet`] crate, containing Parquet based [`FileSource`].
19//!
20//! [`FileSource`]: datafusion_datasource::file::FileSource
21
22pub use datafusion_datasource_parquet::*;
23
24#[cfg(test)]
25mod tests {
26    // See also `parquet_exec` integration test
27    use std::fs::{self, File};
28    use std::io::Write;
29    use std::sync::Arc;
30    use std::sync::Mutex;
31
32    use crate::dataframe::DataFrameWriteOptions;
33    use crate::datasource::file_format::options::CsvReadOptions;
34    use crate::datasource::file_format::parquet::test_util::store_parquet;
35    use crate::datasource::file_format::test_util::scan_format;
36    use crate::datasource::listing::ListingOptions;
37    use crate::execution::context::SessionState;
38    use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
39    use crate::test::object_store::local_unpartitioned_file;
40    use arrow::array::{
41        ArrayRef, AsArray, Date64Array, Int32Array, Int64Array, Int8Array, StringArray,
42        StringViewArray, StructArray, TimestampNanosecondArray,
43    };
44    use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaBuilder};
45    use arrow::record_batch::RecordBatch;
46    use arrow::util::pretty::pretty_format_batches;
47    use arrow_schema::{SchemaRef, TimeUnit};
48    use bytes::{BufMut, BytesMut};
49    use datafusion_common::config::TableParquetOptions;
50    use datafusion_common::test_util::{batches_to_sort_string, batches_to_string};
51    use datafusion_common::{assert_contains, Result, ScalarValue};
52    use datafusion_datasource::file_format::FileFormat;
53    use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
54    use datafusion_datasource::source::DataSourceExec;
55
56    use datafusion_datasource::file::FileSource;
57    use datafusion_datasource::{FileRange, PartitionedFile, TableSchema};
58    use datafusion_datasource_parquet::source::ParquetSource;
59    use datafusion_datasource_parquet::{
60        DefaultParquetFileReaderFactory, ParquetFileReaderFactory, ParquetFormat,
61    };
62    use datafusion_execution::object_store::ObjectStoreUrl;
63    use datafusion_expr::{col, lit, when, Expr};
64    use datafusion_physical_expr::planner::logical2physical;
65    use datafusion_physical_plan::analyze::AnalyzeExec;
66    use datafusion_physical_plan::collect;
67    use datafusion_physical_plan::metrics::{
68        ExecutionPlanMetricsSet, MetricType, MetricValue, MetricsSet,
69    };
70    use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
71
72    use chrono::{TimeZone, Utc};
73    use datafusion_datasource::file_groups::FileGroup;
74    use futures::StreamExt;
75    use insta;
76    use insta::assert_snapshot;
77    use object_store::local::LocalFileSystem;
78    use object_store::path::Path;
79    use object_store::{ObjectMeta, ObjectStore};
80    use parquet::arrow::ArrowWriter;
81    use parquet::file::properties::WriterProperties;
82    use tempfile::TempDir;
83    use url::Url;
84
85    struct RoundTripResult {
86        /// Data that was read back from ParquetFiles
87        batches: Result<Vec<RecordBatch>>,
88        /// The EXPLAIN ANALYZE output
89        explain: Result<String>,
90        /// The physical plan that was created (that has statistics, etc)
91        parquet_exec: Arc<DataSourceExec>,
92    }
93
94    /// round-trip record batches by writing each individual RecordBatch to
95    /// a parquet file and then reading that parquet file with the specified
96    /// options.
97    #[derive(Debug, Default)]
98    struct RoundTrip {
99        projection: Option<Vec<usize>>,
100        /// Optional logical table schema to use when reading the parquet files
101        ///
102        /// If None, the logical schema to use will be inferred from the
103        /// original data via [`Schema::try_merge`]
104        table_schema: Option<SchemaRef>,
105        predicate: Option<Expr>,
106        pushdown_predicate: bool,
107        page_index_predicate: bool,
108        bloom_filters: bool,
109    }
110
111    impl RoundTrip {
112        fn new() -> Self {
113            Default::default()
114        }
115
116        fn with_projection(mut self, projection: Vec<usize>) -> Self {
117            self.projection = Some(projection);
118            self
119        }
120
121        /// Specify table schema.
122        ///
123        ///See  [`Self::table_schema`] for more details
124        fn with_table_schema(mut self, schema: SchemaRef) -> Self {
125            self.table_schema = Some(schema);
126            self
127        }
128
129        fn with_predicate(mut self, predicate: Expr) -> Self {
130            self.predicate = Some(predicate);
131            self
132        }
133
134        fn with_pushdown_predicate(mut self) -> Self {
135            self.pushdown_predicate = true;
136            self
137        }
138
139        fn with_page_index_predicate(mut self) -> Self {
140            self.page_index_predicate = true;
141            self
142        }
143
144        fn with_bloom_filters(mut self) -> Self {
145            self.bloom_filters = true;
146            self
147        }
148
149        /// run the test, returning only the resulting RecordBatches
150        async fn round_trip_to_batches(
151            self,
152            batches: Vec<RecordBatch>,
153        ) -> Result<Vec<RecordBatch>> {
154            self.round_trip(batches).await.batches
155        }
156
157        fn build_file_source(&self, table_schema: SchemaRef) -> Arc<dyn FileSource> {
158            // set up predicate (this is normally done by a layer higher up)
159            let predicate = self
160                .predicate
161                .as_ref()
162                .map(|p| logical2physical(p, &table_schema));
163
164            let mut source = ParquetSource::default();
165            if let Some(predicate) = predicate {
166                source = source.with_predicate(predicate);
167            }
168
169            if self.pushdown_predicate {
170                source = source
171                    .with_pushdown_filters(true)
172                    .with_reorder_filters(true);
173            } else {
174                source = source.with_pushdown_filters(false);
175            }
176
177            if self.page_index_predicate {
178                source = source.with_enable_page_index(true);
179            } else {
180                source = source.with_enable_page_index(false);
181            }
182
183            if self.bloom_filters {
184                source = source.with_bloom_filter_on_read(true);
185            } else {
186                source = source.with_bloom_filter_on_read(false);
187            }
188
189            source.with_schema(TableSchema::new(Arc::clone(&table_schema), vec![]))
190        }
191
192        fn build_parquet_exec(
193            &self,
194            file_schema: SchemaRef,
195            file_group: FileGroup,
196            source: Arc<dyn FileSource>,
197        ) -> Arc<DataSourceExec> {
198            let base_config = FileScanConfigBuilder::new(
199                ObjectStoreUrl::local_filesystem(),
200                file_schema,
201                source,
202            )
203            .with_file_group(file_group)
204            .with_projection_indices(self.projection.clone())
205            .build();
206            DataSourceExec::from_data_source(base_config)
207        }
208
209        /// run the test, returning the `RoundTripResult`
210        ///
211        /// Each input batch is written into one or more parquet files (and thus
212        /// they could potentially have different schemas). The resulting
213        /// parquet files are then read back and filters are applied to the
214        async fn round_trip(&self, batches: Vec<RecordBatch>) -> RoundTripResult {
215            // If table_schema is not set, we need to merge the schema of the
216            // input batches to get a unified schema.
217            let table_schema = match &self.table_schema {
218                Some(schema) => schema,
219                None => &Arc::new(
220                    Schema::try_merge(
221                        batches.iter().map(|b| b.schema().as_ref().clone()),
222                    )
223                    .unwrap(),
224                ),
225            };
226            // If testing with page_index_predicate, write parquet
227            // files with multiple pages
228            let multi_page = self.page_index_predicate;
229            let (meta, _files) = store_parquet(batches, multi_page).await.unwrap();
230            let file_group: FileGroup = meta.into_iter().map(Into::into).collect();
231
232            // build a ParquetExec to return the results
233            let parquet_source = self.build_file_source(Arc::clone(table_schema));
234            let parquet_exec = self.build_parquet_exec(
235                Arc::clone(table_schema),
236                file_group.clone(),
237                Arc::clone(&parquet_source),
238            );
239
240            let analyze_exec = Arc::new(AnalyzeExec::new(
241                false,
242                false,
243                vec![MetricType::SUMMARY, MetricType::DEV],
244                // use a new ParquetSource to avoid sharing execution metrics
245                self.build_parquet_exec(
246                    Arc::clone(table_schema),
247                    file_group.clone(),
248                    self.build_file_source(Arc::clone(table_schema)),
249                ),
250                Arc::new(Schema::new(vec![
251                    Field::new("plan_type", DataType::Utf8, true),
252                    Field::new("plan", DataType::Utf8, true),
253                ])),
254            ));
255
256            let session_ctx = SessionContext::new();
257            let task_ctx = session_ctx.task_ctx();
258
259            let batches = collect(
260                Arc::clone(&parquet_exec) as Arc<dyn ExecutionPlan>,
261                task_ctx.clone(),
262            )
263            .await;
264
265            let explain = collect(analyze_exec, task_ctx.clone())
266                .await
267                .map(|batches| {
268                    let batches = pretty_format_batches(&batches).unwrap();
269                    format!("{batches}")
270                });
271
272            RoundTripResult {
273                batches,
274                explain,
275                parquet_exec,
276            }
277        }
278    }
279
280    // Add a new column with the specified field name to the RecordBatch
281    fn add_to_batch(
282        batch: &RecordBatch,
283        field_name: &str,
284        array: ArrayRef,
285    ) -> RecordBatch {
286        let mut fields = SchemaBuilder::from(batch.schema().fields());
287        fields.push(Field::new(field_name, array.data_type().clone(), true));
288        let schema = Arc::new(fields.finish());
289
290        let mut columns = batch.columns().to_vec();
291        columns.push(array);
292        RecordBatch::try_new(schema, columns).expect("error; creating record batch")
293    }
294
295    fn create_batch(columns: Vec<(&str, ArrayRef)>) -> RecordBatch {
296        columns.into_iter().fold(
297            RecordBatch::new_empty(Arc::new(Schema::empty())),
298            |batch, (field_name, arr)| add_to_batch(&batch, field_name, arr.clone()),
299        )
300    }
301
302    #[tokio::test]
303    async fn test_pushdown_with_missing_column_in_file() {
304        let c1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
305
306        let file_schema =
307            Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
308
309        let table_schema = Arc::new(Schema::new(vec![
310            Field::new("c1", DataType::Int32, true),
311            Field::new("c2", DataType::Int32, true),
312        ]));
313
314        let batch = RecordBatch::try_new(file_schema.clone(), vec![c1]).unwrap();
315
316        // Since c2 is missing from the file and we didn't supply a custom `SchemaAdapterFactory`,
317        // the default behavior is to fill in missing columns with nulls.
318        // Thus this predicate will come back as false.
319        let filter = col("c2").eq(lit(1_i32));
320        let rt = RoundTrip::new()
321            .with_table_schema(table_schema.clone())
322            .with_predicate(filter.clone())
323            .with_pushdown_predicate()
324            .round_trip(vec![batch.clone()])
325            .await;
326        let total_rows = rt
327            .batches
328            .unwrap()
329            .iter()
330            .map(|b| b.num_rows())
331            .sum::<usize>();
332        assert_eq!(total_rows, 0, "Expected no rows to match the predicate");
333        let metrics = rt.parquet_exec.metrics().unwrap();
334        let metric = get_value(&metrics, "pushdown_rows_pruned");
335        assert_eq!(metric, 3, "Expected all rows to be pruned");
336
337        // If we explicitly allow nulls the rest of the predicate should work
338        let filter = col("c2").is_null().and(col("c1").eq(lit(1_i32)));
339        let rt = RoundTrip::new()
340            .with_table_schema(table_schema.clone())
341            .with_predicate(filter.clone())
342            .with_pushdown_predicate()
343            .round_trip(vec![batch.clone()])
344            .await;
345        let batches = rt.batches.unwrap();
346
347        insta::assert_snapshot!(batches_to_sort_string(&batches),@r###"
348        +----+----+
349        | c1 | c2 |
350        +----+----+
351        | 1  |    |
352        +----+----+
353        "###);
354
355        let metrics = rt.parquet_exec.metrics().unwrap();
356        let metric = get_value(&metrics, "pushdown_rows_pruned");
357        assert_eq!(metric, 2, "Expected all rows to be pruned");
358    }
359
360    #[tokio::test]
361    async fn test_pushdown_with_missing_column_in_file_multiple_types() {
362        let c1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
363
364        let file_schema =
365            Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
366
367        let table_schema = Arc::new(Schema::new(vec![
368            Field::new("c1", DataType::Int32, true),
369            Field::new("c2", DataType::Utf8, true),
370        ]));
371
372        let batch = RecordBatch::try_new(file_schema.clone(), vec![c1]).unwrap();
373
374        // Since c2 is missing from the file and we didn't supply a custom `SchemaAdapterFactory`,
375        // the default behavior is to fill in missing columns with nulls.
376        // Thus this predicate will come back as false.
377        let filter = col("c2").eq(lit("abc"));
378        let rt = RoundTrip::new()
379            .with_table_schema(table_schema.clone())
380            .with_predicate(filter.clone())
381            .with_pushdown_predicate()
382            .round_trip(vec![batch.clone()])
383            .await;
384        let total_rows = rt
385            .batches
386            .unwrap()
387            .iter()
388            .map(|b| b.num_rows())
389            .sum::<usize>();
390        assert_eq!(total_rows, 0, "Expected no rows to match the predicate");
391        let metrics = rt.parquet_exec.metrics().unwrap();
392        let metric = get_value(&metrics, "pushdown_rows_pruned");
393        assert_eq!(metric, 3, "Expected all rows to be pruned");
394
395        // If we explicitly allow nulls the rest of the predicate should work
396        let filter = col("c2").is_null().and(col("c1").eq(lit(1_i32)));
397        let rt = RoundTrip::new()
398            .with_table_schema(table_schema.clone())
399            .with_predicate(filter.clone())
400            .with_pushdown_predicate()
401            .round_trip(vec![batch.clone()])
402            .await;
403        let batches = rt.batches.unwrap();
404
405        insta::assert_snapshot!(batches_to_sort_string(&batches),@r###"
406        +----+----+
407        | c1 | c2 |
408        +----+----+
409        | 1  |    |
410        +----+----+
411        "###);
412
413        let metrics = rt.parquet_exec.metrics().unwrap();
414        let metric = get_value(&metrics, "pushdown_rows_pruned");
415        assert_eq!(metric, 2, "Expected all rows to be pruned");
416    }
417
418    #[tokio::test]
419    async fn test_pushdown_with_missing_middle_column() {
420        let c1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
421        let c3 = Arc::new(Int32Array::from(vec![7, 8, 9]));
422
423        let file_schema = Arc::new(Schema::new(vec![
424            Field::new("c1", DataType::Int32, true),
425            Field::new("c3", DataType::Int32, true),
426        ]));
427
428        let table_schema = Arc::new(Schema::new(vec![
429            Field::new("c1", DataType::Int32, true),
430            Field::new("c2", DataType::Utf8, true),
431            Field::new("c3", DataType::Int32, true),
432        ]));
433
434        let batch = RecordBatch::try_new(file_schema.clone(), vec![c1, c3]).unwrap();
435
436        // Since c2 is missing from the file and we didn't supply a custom `SchemaAdapterFactory`,
437        // the default behavior is to fill in missing columns with nulls.
438        // Thus this predicate will come back as false.
439        let filter = col("c2").eq(lit("abc"));
440        let rt = RoundTrip::new()
441            .with_table_schema(table_schema.clone())
442            .with_predicate(filter.clone())
443            .with_pushdown_predicate()
444            .round_trip(vec![batch.clone()])
445            .await;
446        let total_rows = rt
447            .batches
448            .unwrap()
449            .iter()
450            .map(|b| b.num_rows())
451            .sum::<usize>();
452        assert_eq!(total_rows, 0, "Expected no rows to match the predicate");
453        let metrics = rt.parquet_exec.metrics().unwrap();
454        let metric = get_value(&metrics, "pushdown_rows_pruned");
455        assert_eq!(metric, 3, "Expected all rows to be pruned");
456
457        // If we explicitly allow nulls the rest of the predicate should work
458        let filter = col("c2").is_null().and(col("c1").eq(lit(1_i32)));
459        let rt = RoundTrip::new()
460            .with_table_schema(table_schema.clone())
461            .with_predicate(filter.clone())
462            .with_pushdown_predicate()
463            .round_trip(vec![batch.clone()])
464            .await;
465        let batches = rt.batches.unwrap();
466
467        insta::assert_snapshot!(batches_to_sort_string(&batches),@r###"
468        +----+----+----+
469        | c1 | c2 | c3 |
470        +----+----+----+
471        | 1  |    | 7  |
472        +----+----+----+
473        "###);
474
475        let metrics = rt.parquet_exec.metrics().unwrap();
476        let metric = get_value(&metrics, "pushdown_rows_pruned");
477        assert_eq!(metric, 2, "Expected all rows to be pruned");
478    }
479
480    #[tokio::test]
481    async fn test_pushdown_with_file_column_order_mismatch() {
482        let c3 = Arc::new(Int32Array::from(vec![7, 8, 9]));
483
484        let file_schema = Arc::new(Schema::new(vec![
485            Field::new("c3", DataType::Int32, true),
486            Field::new("c3", DataType::Int32, true),
487        ]));
488
489        let table_schema = Arc::new(Schema::new(vec![
490            Field::new("c1", DataType::Int32, true),
491            Field::new("c2", DataType::Utf8, true),
492            Field::new("c3", DataType::Int32, true),
493        ]));
494
495        let batch =
496            RecordBatch::try_new(file_schema.clone(), vec![c3.clone(), c3]).unwrap();
497
498        // Since c2 is missing from the file and we didn't supply a custom `SchemaAdapterFactory`,
499        // the default behavior is to fill in missing columns with nulls.
500        // Thus this predicate will come back as false.
501        let filter = col("c2").eq(lit("abc"));
502        let rt = RoundTrip::new()
503            .with_table_schema(table_schema.clone())
504            .with_predicate(filter.clone())
505            .with_pushdown_predicate()
506            .round_trip(vec![batch.clone()])
507            .await;
508        let total_rows = rt
509            .batches
510            .unwrap()
511            .iter()
512            .map(|b| b.num_rows())
513            .sum::<usize>();
514        assert_eq!(total_rows, 0, "Expected no rows to match the predicate");
515        let metrics = rt.parquet_exec.metrics().unwrap();
516        let metric = get_value(&metrics, "pushdown_rows_pruned");
517        assert_eq!(metric, 3, "Expected all rows to be pruned");
518
519        // If we explicitly allow nulls the rest of the predicate should work
520        let filter = col("c2").is_null().and(col("c3").eq(lit(7_i32)));
521        let rt = RoundTrip::new()
522            .with_table_schema(table_schema.clone())
523            .with_predicate(filter.clone())
524            .with_pushdown_predicate()
525            .round_trip(vec![batch.clone()])
526            .await;
527        let batches = rt.batches.unwrap();
528
529        insta::assert_snapshot!(batches_to_sort_string(&batches),@r###"
530        +----+----+----+
531        | c1 | c2 | c3 |
532        +----+----+----+
533        |    |    | 7  |
534        +----+----+----+
535        "###);
536
537        let metrics = rt.parquet_exec.metrics().unwrap();
538        let metric = get_value(&metrics, "pushdown_rows_pruned");
539        assert_eq!(metric, 2, "Expected all rows to be pruned");
540    }
541
542    #[tokio::test]
543    async fn test_pushdown_with_missing_column_nested_conditions() {
544        // Create test data with c1 and c3 columns
545        let c1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
546        let c3: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30, 40, 50]));
547
548        let file_schema = Arc::new(Schema::new(vec![
549            Field::new("c1", DataType::Int32, true),
550            Field::new("c3", DataType::Int32, true),
551        ]));
552
553        let table_schema = Arc::new(Schema::new(vec![
554            Field::new("c1", DataType::Int32, true),
555            Field::new("c2", DataType::Int32, true),
556            Field::new("c3", DataType::Int32, true),
557        ]));
558
559        let batch = RecordBatch::try_new(file_schema.clone(), vec![c1, c3]).unwrap();
560
561        // Test with complex nested AND/OR:
562        // (c1 = 1 OR c2 = 5) AND (c3 = 10 OR c2 IS NULL)
563        // Should return 1 row where c1=1 AND c3=10 (since c2 IS NULL is always true)
564        let filter = col("c1")
565            .eq(lit(1_i32))
566            .or(col("c2").eq(lit(5_i32)))
567            .and(col("c3").eq(lit(10_i32)).or(col("c2").is_null()));
568
569        let rt = RoundTrip::new()
570            .with_table_schema(table_schema.clone())
571            .with_predicate(filter.clone())
572            .with_pushdown_predicate()
573            .round_trip(vec![batch.clone()])
574            .await;
575
576        let batches = rt.batches.unwrap();
577
578        insta::assert_snapshot!(batches_to_sort_string(&batches),@r###"
579        +----+----+----+
580        | c1 | c2 | c3 |
581        +----+----+----+
582        | 1  |    | 10 |
583        +----+----+----+
584        "###);
585
586        let metrics = rt.parquet_exec.metrics().unwrap();
587        let metric = get_value(&metrics, "pushdown_rows_pruned");
588        assert_eq!(metric, 4, "Expected 4 rows to be pruned");
589
590        // Test a more complex nested condition:
591        // (c1 < 3 AND c2 IS NOT NULL) OR (c3 > 20 AND c2 IS NULL)
592        // First part should return 0 rows (c2 IS NOT NULL is always false)
593        // Second part should return rows where c3 > 20 (3 rows: where c3 is 30, 40, 50)
594        let filter = col("c1")
595            .lt(lit(3_i32))
596            .and(col("c2").is_not_null())
597            .or(col("c3").gt(lit(20_i32)).and(col("c2").is_null()));
598
599        let rt = RoundTrip::new()
600            .with_table_schema(table_schema)
601            .with_predicate(filter.clone())
602            .with_pushdown_predicate()
603            .round_trip(vec![batch])
604            .await;
605
606        let batches = rt.batches.unwrap();
607
608        insta::assert_snapshot!(batches_to_sort_string(&batches),@r###"
609        +----+----+----+
610        | c1 | c2 | c3 |
611        +----+----+----+
612        | 3  |    | 30 |
613        | 4  |    | 40 |
614        | 5  |    | 50 |
615        +----+----+----+
616        "###);
617
618        let metrics = rt.parquet_exec.metrics().unwrap();
619        let metric = get_value(&metrics, "pushdown_rows_pruned");
620        assert_eq!(metric, 2, "Expected 2 rows to be pruned");
621    }
622
623    #[tokio::test]
624    async fn evolved_schema() {
625        let c1: ArrayRef =
626            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
627        // batch1: c1(string)
628        let batch1 =
629            add_to_batch(&RecordBatch::new_empty(Arc::new(Schema::empty())), "c1", c1);
630
631        // batch2: c1(string) and c2(int64)
632        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
633        let batch2 = add_to_batch(&batch1, "c2", c2);
634
635        // batch3: c1(string) and c3(int8)
636        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
637        let batch3 = add_to_batch(&batch1, "c3", c3);
638
639        // read/write them files:
640        let read = RoundTrip::new()
641            .round_trip_to_batches(vec![batch1, batch2, batch3])
642            .await
643            .unwrap();
644
645        insta::assert_snapshot!(batches_to_sort_string(&read), @r###"
646        +-----+----+----+
647        | c1  | c2 | c3 |
648        +-----+----+----+
649        |     |    |    |
650        |     |    | 20 |
651        |     | 2  |    |
652        | Foo |    |    |
653        | Foo |    | 10 |
654        | Foo | 1  |    |
655        | bar |    |    |
656        | bar |    |    |
657        | bar |    |    |
658        +-----+----+----+
659        "###);
660    }
661
662    #[tokio::test]
663    async fn evolved_schema_inconsistent_order() {
664        let c1: ArrayRef =
665            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
666
667        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
668
669        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
670
671        // batch1: c1(string), c2(int64), c3(int8)
672        let batch1 = create_batch(vec![
673            ("c1", c1.clone()),
674            ("c2", c2.clone()),
675            ("c3", c3.clone()),
676        ]);
677
678        // batch2: c3(int8), c2(int64), c1(string)
679        let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1)]);
680
681        // read/write them files:
682        let read = RoundTrip::new()
683            .round_trip_to_batches(vec![batch1, batch2])
684            .await
685            .unwrap();
686
687        insta::assert_snapshot!(batches_to_sort_string(&read),@r"
688        +-----+----+----+
689        | c1  | c2 | c3 |
690        +-----+----+----+
691        |     | 2  | 20 |
692        |     | 2  | 20 |
693        | Foo | 1  | 10 |
694        | Foo | 1  | 10 |
695        | bar |    |    |
696        | bar |    |    |
697        +-----+----+----+
698        ");
699    }
700
701    #[tokio::test]
702    async fn evolved_schema_intersection() {
703        let c1: ArrayRef =
704            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
705
706        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
707
708        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
709
710        // batch1: c1(string), c2(int64), c3(int8)
711        let batch1 = create_batch(vec![("c1", c1), ("c3", c3.clone())]);
712
713        // batch2: c3(int8), c2(int64), c1(string)
714        let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]);
715
716        // read/write them files:
717        let read = RoundTrip::new()
718            .round_trip_to_batches(vec![batch1, batch2])
719            .await
720            .unwrap();
721
722        insta::assert_snapshot!(batches_to_sort_string(&read),@r"
723        +-----+----+----+
724        | c1  | c3 | c2 |
725        +-----+----+----+
726        |     |    |    |
727        |     | 10 | 1  |
728        |     | 20 |    |
729        |     | 20 | 2  |
730        | Foo | 10 |    |
731        | bar |    |    |
732        +-----+----+----+
733        ");
734    }
735
736    #[tokio::test]
737    async fn evolved_schema_intersection_filter() {
738        let c1: ArrayRef =
739            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
740
741        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
742
743        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
744
745        // batch1: c1(string), c3(int8)
746        let batch1 = create_batch(vec![("c1", c1), ("c3", c3.clone())]);
747
748        // batch2: c3(int8), c2(int64)
749        let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]);
750
751        let filter = col("c2").eq(lit(2_i64));
752
753        // read/write them files:
754        let read = RoundTrip::new()
755            .with_predicate(filter)
756            .round_trip_to_batches(vec![batch1, batch2])
757            .await
758            .unwrap();
759
760        insta::assert_snapshot!(batches_to_sort_string(&read),@r###"
761            +-----+----+----+
762            | c1  | c3 | c2 |
763            +-----+----+----+
764            |     |    |    |
765            |     | 10 | 1  |
766            |     | 20 |    |
767            |     | 20 | 2  |
768            | Foo | 10 |    |
769            | bar |    |    |
770            +-----+----+----+
771        "###);
772    }
773
774    #[tokio::test]
775    async fn evolved_schema_intersection_filter_with_filter_pushdown() {
776        let c1: ArrayRef =
777            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
778        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
779        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
780        // batch1: c1(string), c3(int8)
781        let batch1 = create_batch(vec![("c1", c1), ("c3", c3.clone())]);
782        // batch2: c3(int8), c2(int64)
783        let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]);
784        let filter = col("c2").eq(lit(2_i64)).or(col("c2").eq(lit(1_i64)));
785        // read/write them files:
786        let rt = RoundTrip::new()
787            .with_predicate(filter)
788            .with_pushdown_predicate()
789            .round_trip(vec![batch1, batch2])
790            .await;
791
792        insta::assert_snapshot!(batches_to_sort_string(&rt.batches.unwrap()), @r###"
793        +----+----+----+
794        | c1 | c3 | c2 |
795        +----+----+----+
796        |    | 10 | 1  |
797        |    | 20 | 2  |
798        +----+----+----+
799        "###);
800        let metrics = rt.parquet_exec.metrics().unwrap();
801        // Note there are were 6 rows in total (across three batches)
802        assert_eq!(get_value(&metrics, "pushdown_rows_pruned"), 4);
803        assert_eq!(get_value(&metrics, "pushdown_rows_matched"), 2);
804    }
805
806    #[tokio::test]
807    async fn evolved_schema_projection() {
808        let c1: ArrayRef =
809            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
810
811        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
812
813        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
814
815        let c4: ArrayRef =
816            Arc::new(StringArray::from(vec![Some("baz"), Some("boo"), None]));
817
818        // batch1: c1(string), c2(int64), c3(int8)
819        let batch1 = create_batch(vec![
820            ("c1", c1.clone()),
821            ("c2", c2.clone()),
822            ("c3", c3.clone()),
823        ]);
824
825        // batch2: c3(int8), c2(int64), c1(string), c4(string)
826        let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1), ("c4", c4)]);
827
828        // read/write them files:
829        let read = RoundTrip::new()
830            .with_projection(vec![0, 3])
831            .round_trip_to_batches(vec![batch1, batch2])
832            .await
833            .unwrap();
834
835        insta::assert_snapshot!(batches_to_sort_string(&read), @r###"
836        +-----+-----+
837        | c1  | c4  |
838        +-----+-----+
839        |     |     |
840        |     | boo |
841        | Foo |     |
842        | Foo | baz |
843        | bar |     |
844        | bar |     |
845        +-----+-----+
846        "###);
847    }
848
849    #[tokio::test]
850    async fn evolved_schema_column_order_filter() {
851        let c1: ArrayRef =
852            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
853
854        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
855
856        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
857
858        // batch1: c1(string), c2(int64), c3(int8)
859        let batch1 = create_batch(vec![
860            ("c1", c1.clone()),
861            ("c2", c2.clone()),
862            ("c3", c3.clone()),
863        ]);
864
865        // batch2: c3(int8), c2(int64), c1(string)
866        let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1)]);
867
868        let filter = col("c3").eq(lit(0_i8));
869
870        // read/write them files:
871        let read = RoundTrip::new()
872            .with_predicate(filter)
873            .round_trip_to_batches(vec![batch1, batch2])
874            .await
875            .unwrap();
876
877        // Predicate should prune all row groups
878        assert_eq!(read.len(), 0);
879    }
880
881    #[tokio::test]
882    async fn evolved_schema_column_type_filter_strings() {
883        // The table and filter have a common data type, but the file schema differs
884        let c1: ArrayRef =
885            Arc::new(StringViewArray::from(vec![Some("foo"), Some("bar")]));
886        let batch = create_batch(vec![("c1", c1.clone())]);
887
888        // Table schema is Utf8 but file schema is StringView
889        let table_schema =
890            Arc::new(Schema::new(vec![Field::new("c1", DataType::Utf8, false)]));
891
892        // Predicate should prune all row groups
893        let filter = col("c1").eq(lit(ScalarValue::Utf8(Some("aaa".to_string()))));
894        let rt = RoundTrip::new()
895            .with_predicate(filter)
896            .with_table_schema(table_schema.clone())
897            .round_trip(vec![batch.clone()])
898            .await;
899        // There should be no predicate evaluation errors
900        let metrics = rt.parquet_exec.metrics().unwrap();
901        assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
902        assert_eq!(get_value(&metrics, "pushdown_rows_matched"), 0);
903        assert_eq!(rt.batches.unwrap().len(), 0);
904
905        // Predicate should prune no row groups
906        let filter = col("c1").eq(lit(ScalarValue::Utf8(Some("foo".to_string()))));
907        let rt = RoundTrip::new()
908            .with_predicate(filter)
909            .with_table_schema(table_schema)
910            .round_trip(vec![batch])
911            .await;
912        // There should be no predicate evaluation errors
913        let metrics = rt.parquet_exec.metrics().unwrap();
914        assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
915        assert_eq!(get_value(&metrics, "pushdown_rows_matched"), 0);
916        let read = rt
917            .batches
918            .unwrap()
919            .iter()
920            .map(|b| b.num_rows())
921            .sum::<usize>();
922        assert_eq!(read, 2, "Expected 2 rows to match the predicate");
923    }
924
925    #[tokio::test]
926    async fn evolved_schema_column_type_filter_ints() {
927        // The table and filter have a common data type, but the file schema differs
928        let c1: ArrayRef = Arc::new(Int8Array::from(vec![Some(1), Some(2)]));
929        let batch = create_batch(vec![("c1", c1.clone())]);
930
931        let table_schema =
932            Arc::new(Schema::new(vec![Field::new("c1", DataType::UInt64, false)]));
933
934        // Predicate should prune all row groups
935        let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(5))));
936        let rt = RoundTrip::new()
937            .with_predicate(filter)
938            .with_table_schema(table_schema.clone())
939            .round_trip(vec![batch.clone()])
940            .await;
941        // There should be no predicate evaluation errors
942        let metrics = rt.parquet_exec.metrics().unwrap();
943        assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
944        assert_eq!(rt.batches.unwrap().len(), 0);
945
946        // Predicate should prune no row groups
947        let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(1))));
948        let rt = RoundTrip::new()
949            .with_predicate(filter)
950            .with_table_schema(table_schema)
951            .round_trip(vec![batch])
952            .await;
953        // There should be no predicate evaluation errors
954        let metrics = rt.parquet_exec.metrics().unwrap();
955        assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
956        let read = rt
957            .batches
958            .unwrap()
959            .iter()
960            .map(|b| b.num_rows())
961            .sum::<usize>();
962        assert_eq!(read, 2, "Expected 2 rows to match the predicate");
963    }
964
965    #[tokio::test]
966    async fn evolved_schema_column_type_filter_timestamp_units() {
967        // The table and filter have a common data type
968        // The table schema is in milliseconds, but the file schema is in nanoseconds
969        let c1: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![
970            Some(1_000_000_000), // 1970-01-01T00:00:01Z
971            Some(2_000_000_000), // 1970-01-01T00:00:02Z
972            Some(3_000_000_000), // 1970-01-01T00:00:03Z
973            Some(4_000_000_000), // 1970-01-01T00:00:04Z
974        ]));
975        let batch = create_batch(vec![("c1", c1.clone())]);
976        let table_schema = Arc::new(Schema::new(vec![Field::new(
977            "c1",
978            DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
979            false,
980        )]));
981        // One row should match, 2 pruned via page index, 1 pruned via filter pushdown
982        let filter = col("c1").eq(lit(ScalarValue::TimestampMillisecond(
983            Some(1_000),
984            Some("UTC".into()),
985        )));
986        let rt = RoundTrip::new()
987            .with_predicate(filter)
988            .with_pushdown_predicate()
989            .with_page_index_predicate() // produces pages with 2 rows each (2 pages total for our data)
990            .with_table_schema(table_schema.clone())
991            .round_trip(vec![batch.clone()])
992            .await;
993        // There should be no predicate evaluation errors and we keep 1 row
994        let metrics = rt.parquet_exec.metrics().unwrap();
995        assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
996        let read = rt
997            .batches
998            .unwrap()
999            .iter()
1000            .map(|b| b.num_rows())
1001            .sum::<usize>();
1002        assert_eq!(read, 1, "Expected 1 rows to match the predicate");
1003        assert_eq!(get_value(&metrics, "row_groups_pruned_statistics"), 0);
1004        assert_eq!(get_value(&metrics, "page_index_rows_pruned"), 2);
1005        assert_eq!(get_value(&metrics, "pushdown_rows_pruned"), 1);
1006        // If we filter with a value that is completely out of the range of the data
1007        // we prune at the row group level.
1008        let filter = col("c1").eq(lit(ScalarValue::TimestampMillisecond(
1009            Some(5_000),
1010            Some("UTC".into()),
1011        )));
1012        let rt = RoundTrip::new()
1013            .with_predicate(filter)
1014            .with_pushdown_predicate()
1015            .with_table_schema(table_schema)
1016            .round_trip(vec![batch])
1017            .await;
1018        // There should be no predicate evaluation errors and we keep 0 rows
1019        let metrics = rt.parquet_exec.metrics().unwrap();
1020        assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
1021        let read = rt
1022            .batches
1023            .unwrap()
1024            .iter()
1025            .map(|b| b.num_rows())
1026            .sum::<usize>();
1027        assert_eq!(read, 0, "Expected 0 rows to match the predicate");
1028        assert_eq!(get_value(&metrics, "row_groups_pruned_statistics"), 1);
1029    }
1030
1031    #[tokio::test]
1032    async fn evolved_schema_disjoint_schema_filter() {
1033        let c1: ArrayRef =
1034            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
1035
1036        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
1037
1038        // batch1: c1(string)
1039        let batch1 = create_batch(vec![("c1", c1.clone())]);
1040
1041        // batch2: c2(int64)
1042        let batch2 = create_batch(vec![("c2", c2)]);
1043
1044        let filter = col("c2").eq(lit(1_i64));
1045
1046        // read/write them files:
1047        let read = RoundTrip::new()
1048            .with_predicate(filter)
1049            .round_trip_to_batches(vec![batch1, batch2])
1050            .await
1051            .unwrap();
1052
1053        // This does not look correct since the "c2" values in the result do not in fact match the predicate `c2 == 0`
1054        // but parquet pruning is not exact. If the min/max values are not defined (which they are not in this case since the it is
1055        // a null array, then the pruning predicate (currently) can not be applied.
1056        // In a real query where this predicate was pushed down from a filter stage instead of created directly in the `DataSourceExec`,
1057        // the filter stage would be preserved as a separate execution plan stage so the actual query results would be as expected.
1058
1059        insta::assert_snapshot!(batches_to_sort_string(&read),@r###"
1060            +-----+----+
1061            | c1  | c2 |
1062            +-----+----+
1063            |     |    |
1064            |     |    |
1065            |     | 1  |
1066            |     | 2  |
1067            | Foo |    |
1068            | bar |    |
1069            +-----+----+
1070        "###);
1071    }
1072
1073    #[tokio::test]
1074    async fn evolved_schema_disjoint_schema_with_filter_pushdown() {
1075        let c1: ArrayRef =
1076            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
1077
1078        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
1079
1080        // batch1: c1(string)
1081        let batch1 = create_batch(vec![("c1", c1.clone())]);
1082
1083        // batch2: c2(int64)
1084        let batch2 = create_batch(vec![("c2", c2)]);
1085
1086        let filter = col("c2").eq(lit(1_i64));
1087
1088        // read/write them files:
1089        let rt = RoundTrip::new()
1090            .with_predicate(filter)
1091            .with_pushdown_predicate()
1092            .round_trip(vec![batch1, batch2])
1093            .await;
1094
1095        insta::assert_snapshot!(batches_to_sort_string(&rt.batches.unwrap()), @r###"
1096        +----+----+
1097        | c1 | c2 |
1098        +----+----+
1099        |    | 1  |
1100        +----+----+
1101        "###);
1102        let metrics = rt.parquet_exec.metrics().unwrap();
1103        // Note there are were 6 rows in total (across three batches)
1104        assert_eq!(get_value(&metrics, "pushdown_rows_pruned"), 5);
1105        assert_eq!(get_value(&metrics, "pushdown_rows_matched"), 1);
1106    }
1107
1108    #[tokio::test]
1109    async fn evolved_schema_disjoint_schema_with_page_index_pushdown() {
1110        let c1: ArrayRef = Arc::new(StringArray::from(vec![
1111            // Page 1
1112            Some("Foo"),
1113            Some("Bar"),
1114            // Page 2
1115            Some("Foo2"),
1116            Some("Bar2"),
1117            // Page 3
1118            Some("Foo3"),
1119            Some("Bar3"),
1120        ]));
1121
1122        let c2: ArrayRef = Arc::new(Int64Array::from(vec![
1123            // Page 1:
1124            Some(1),
1125            Some(2),
1126            // Page 2: (pruned)
1127            Some(3),
1128            Some(4),
1129            // Page 3: (pruned)
1130            Some(5),
1131            None,
1132        ]));
1133
1134        // batch1: c1(string)
1135        let batch1 = create_batch(vec![("c1", c1.clone())]);
1136
1137        // batch2: c2(int64)
1138        let batch2 = create_batch(vec![("c2", c2.clone())]);
1139
1140        // batch3 (has c2, c1) -- both columns, should still prune
1141        let batch3 = create_batch(vec![("c1", c1.clone()), ("c2", c2.clone())]);
1142
1143        // batch4 (has c2, c1) -- different column order, should still prune
1144        let batch4 = create_batch(vec![("c2", c2), ("c1", c1)]);
1145
1146        let filter = col("c2").eq(lit(1_i64));
1147
1148        // read/write them files:
1149        let rt = RoundTrip::new()
1150            .with_predicate(filter)
1151            .with_page_index_predicate()
1152            .round_trip(vec![batch1, batch2, batch3, batch4])
1153            .await;
1154
1155        insta::assert_snapshot!(batches_to_sort_string(&rt.batches.unwrap()), @r###"
1156        +------+----+
1157        | c1   | c2 |
1158        +------+----+
1159        |      | 1  |
1160        |      | 2  |
1161        | Bar  |    |
1162        | Bar  | 2  |
1163        | Bar  | 2  |
1164        | Bar2 |    |
1165        | Bar3 |    |
1166        | Foo  |    |
1167        | Foo  | 1  |
1168        | Foo  | 1  |
1169        | Foo2 |    |
1170        | Foo3 |    |
1171        +------+----+
1172        "###);
1173        let metrics = rt.parquet_exec.metrics().unwrap();
1174
1175        // There are 4 rows pruned in each of batch2, batch3, and
1176        // batch4 for a total of 12. batch1 had no pruning as c2 was
1177        // filled in as null
1178        let (page_index_pruned, page_index_matched) =
1179            get_pruning_metric(&metrics, "page_index_rows_pruned");
1180        assert_eq!(page_index_pruned, 12);
1181        assert_eq!(page_index_matched, 6);
1182    }
1183
1184    #[tokio::test]
1185    async fn multi_column_predicate_pushdown() {
1186        let c1: ArrayRef =
1187            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
1188
1189        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
1190
1191        let batch1 = create_batch(vec![("c1", c1.clone()), ("c2", c2.clone())]);
1192
1193        // Columns in different order to schema
1194        let filter = col("c2").eq(lit(1_i64)).or(col("c1").eq(lit("bar")));
1195
1196        // read/write them files:
1197        let read = RoundTrip::new()
1198            .with_predicate(filter)
1199            .with_pushdown_predicate()
1200            .round_trip_to_batches(vec![batch1])
1201            .await
1202            .unwrap();
1203
1204        insta::assert_snapshot!(batches_to_sort_string(&read),@r###"
1205            +-----+----+
1206            | c1  | c2 |
1207            +-----+----+
1208            | Foo | 1  |
1209            | bar |    |
1210            +-----+----+
1211        "###);
1212    }
1213
1214    #[tokio::test]
1215    async fn multi_column_predicate_pushdown_page_index_pushdown() {
1216        let c1: ArrayRef =
1217            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
1218
1219        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
1220
1221        let batch1 = create_batch(vec![("c1", c1.clone()), ("c2", c2.clone())]);
1222
1223        // Columns in different order to schema
1224        let filter = col("c2").eq(lit(1_i64)).or(col("c1").eq(lit("bar")));
1225
1226        // read/write them files:
1227        let read = RoundTrip::new()
1228            .with_predicate(filter)
1229            .with_page_index_predicate()
1230            .round_trip_to_batches(vec![batch1])
1231            .await
1232            .unwrap();
1233
1234        insta::assert_snapshot!(batches_to_sort_string(&read),@r###"
1235            +-----+----+
1236            | c1  | c2 |
1237            +-----+----+
1238            |     | 2  |
1239            | Foo | 1  |
1240            | bar |    |
1241            +-----+----+
1242        "###);
1243    }
1244
1245    #[tokio::test]
1246    async fn evolved_schema_incompatible_types() {
1247        let c1: ArrayRef =
1248            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
1249
1250        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
1251
1252        let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
1253
1254        let c4: ArrayRef = Arc::new(Date64Array::from(vec![
1255            Some(86400000),
1256            None,
1257            Some(259200000),
1258        ]));
1259
1260        // batch1: c1(string), c2(int64), c3(int8)
1261        let batch1 = create_batch(vec![
1262            ("c1", c1.clone()),
1263            ("c2", c2.clone()),
1264            ("c3", c3.clone()),
1265        ]);
1266
1267        // batch2: c3(int8), c2(int64), c1(string), c4(string)
1268        let batch2 = create_batch(vec![("c3", c4), ("c2", c2), ("c1", c1)]);
1269
1270        let table_schema = Schema::new(vec![
1271            Field::new("c1", DataType::Utf8, true),
1272            Field::new("c2", DataType::Int64, true),
1273            Field::new("c3", DataType::Int8, true),
1274        ]);
1275
1276        // read/write them files:
1277        let read = RoundTrip::new()
1278            .with_table_schema(Arc::new(table_schema))
1279            .round_trip_to_batches(vec![batch1, batch2])
1280            .await;
1281        assert_contains!(read.unwrap_err().to_string(),
1282            "Cannot cast file schema field c3 of type Date64 to table schema field of type Int8");
1283    }
1284
1285    #[tokio::test]
1286    async fn parquet_exec_with_projection() -> Result<()> {
1287        let testdata = datafusion_common::test_util::parquet_test_data();
1288        let filename = "alltypes_plain.parquet";
1289        let session_ctx = SessionContext::new();
1290        let state = session_ctx.state();
1291        let task_ctx = state.task_ctx();
1292        let parquet_exec = scan_format(
1293            &state,
1294            &ParquetFormat::default(),
1295            None,
1296            &testdata,
1297            filename,
1298            Some(vec![0, 1, 2]),
1299            None,
1300        )
1301        .await
1302        .unwrap();
1303        assert_eq!(parquet_exec.output_partitioning().partition_count(), 1);
1304
1305        let mut results = parquet_exec.execute(0, task_ctx)?;
1306        let batch = results.next().await.unwrap()?;
1307
1308        assert_eq!(8, batch.num_rows());
1309        assert_eq!(3, batch.num_columns());
1310
1311        let schema = batch.schema();
1312        let field_names: Vec<&str> =
1313            schema.fields().iter().map(|f| f.name().as_str()).collect();
1314        assert_eq!(vec!["id", "bool_col", "tinyint_col"], field_names);
1315
1316        let batch = results.next().await;
1317        assert!(batch.is_none());
1318
1319        let batch = results.next().await;
1320        assert!(batch.is_none());
1321
1322        let batch = results.next().await;
1323        assert!(batch.is_none());
1324
1325        Ok(())
1326    }
1327
1328    #[tokio::test]
1329    async fn parquet_exec_with_int96_from_spark() -> Result<()> {
1330        // arrow-rs relies on the chrono library to convert between timestamps and strings, so
1331        // instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
1332        // anyway, so this should be a zero-copy non-modifying cast at the SchemaAdapter.
1333
1334        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]));
1335        let testdata = datafusion_common::test_util::parquet_test_data();
1336        let filename = "int96_from_spark.parquet";
1337        let session_ctx = SessionContext::new();
1338        let state = session_ctx.state();
1339        let task_ctx = state.task_ctx();
1340
1341        let time_units_and_expected = vec![
1342            (
1343                None, // Same as "ns" time_unit
1344                Arc::new(Int64Array::from(vec![
1345                    Some(1704141296123456000), // Reads as nanosecond fine (note 3 extra 0s)
1346                    Some(1704070800000000000), // Reads as nanosecond fine (note 3 extra 0s)
1347                    Some(-4852191831933722624), // Cannot be represented with nanos timestamp (year 9999)
1348                    Some(1735599600000000000), // Reads as nanosecond fine (note 3 extra 0s)
1349                    None,
1350                    Some(-4864435138808946688), // Cannot be represented with nanos timestamp (year 290000)
1351                ])),
1352            ),
1353            (
1354                Some("ns".to_string()),
1355                Arc::new(Int64Array::from(vec![
1356                    Some(1704141296123456000),
1357                    Some(1704070800000000000),
1358                    Some(-4852191831933722624),
1359                    Some(1735599600000000000),
1360                    None,
1361                    Some(-4864435138808946688),
1362                ])),
1363            ),
1364            (
1365                Some("us".to_string()),
1366                Arc::new(Int64Array::from(vec![
1367                    Some(1704141296123456),
1368                    Some(1704070800000000),
1369                    Some(253402225200000000),
1370                    Some(1735599600000000),
1371                    None,
1372                    Some(9089380393200000000),
1373                ])),
1374            ),
1375        ];
1376
1377        for (time_unit, expected) in time_units_and_expected {
1378            let parquet_exec = scan_format(
1379                &state,
1380                &ParquetFormat::default().with_coerce_int96(time_unit.clone()),
1381                Some(schema.clone()),
1382                &testdata,
1383                filename,
1384                Some(vec![0]),
1385                None,
1386            )
1387            .await
1388            .unwrap();
1389            assert_eq!(parquet_exec.output_partitioning().partition_count(), 1);
1390
1391            let mut results = parquet_exec.execute(0, task_ctx.clone())?;
1392            let batch = results.next().await.unwrap()?;
1393
1394            assert_eq!(6, batch.num_rows());
1395            assert_eq!(1, batch.num_columns());
1396
1397            assert_eq!(batch.num_columns(), 1);
1398            let column = batch.column(0);
1399
1400            assert_eq!(column.len(), expected.len());
1401
1402            column
1403                .as_primitive::<arrow::datatypes::Int64Type>()
1404                .iter()
1405                .zip(expected.iter())
1406                .for_each(|(lhs, rhs)| {
1407                    assert_eq!(lhs, rhs);
1408                });
1409        }
1410
1411        Ok(())
1412    }
1413
1414    #[tokio::test]
1415    async fn parquet_exec_with_int96_nested() -> Result<()> {
1416        // This test ensures that we maintain compatibility with coercing int96 to the desired
1417        // resolution when they're within a nested type (e.g., struct, map, list). This file
1418        // originates from a modified CometFuzzTestSuite ParquetGenerator to generate combinations
1419        // of primitive and complex columns using int96. Other tests cover reading the data
1420        // correctly with this coercion. Here we're only checking the coerced schema is correct.
1421        let testdata = "../../datafusion/core/tests/data";
1422        let filename = "int96_nested.parquet";
1423        let session_ctx = SessionContext::new();
1424        let state = session_ctx.state();
1425        let task_ctx = state.task_ctx();
1426
1427        let parquet_exec = scan_format(
1428            &state,
1429            &ParquetFormat::default().with_coerce_int96(Some("us".to_string())),
1430            None,
1431            testdata,
1432            filename,
1433            None,
1434            None,
1435        )
1436        .await
1437        .unwrap();
1438        assert_eq!(parquet_exec.output_partitioning().partition_count(), 1);
1439
1440        let mut results = parquet_exec.execute(0, task_ctx.clone())?;
1441        let batch = results.next().await.unwrap()?;
1442
1443        let expected_schema = Arc::new(Schema::new(vec![
1444            Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None), true),
1445            Field::new_struct(
1446                "c1",
1447                vec![Field::new(
1448                    "c0",
1449                    DataType::Timestamp(TimeUnit::Microsecond, None),
1450                    true,
1451                )],
1452                true,
1453            ),
1454            Field::new_struct(
1455                "c2",
1456                vec![Field::new_list(
1457                    "c0",
1458                    Field::new(
1459                        "element",
1460                        DataType::Timestamp(TimeUnit::Microsecond, None),
1461                        true,
1462                    ),
1463                    true,
1464                )],
1465                true,
1466            ),
1467            Field::new_map(
1468                "c3",
1469                "key_value",
1470                Field::new(
1471                    "key",
1472                    DataType::Timestamp(TimeUnit::Microsecond, None),
1473                    false,
1474                ),
1475                Field::new(
1476                    "value",
1477                    DataType::Timestamp(TimeUnit::Microsecond, None),
1478                    true,
1479                ),
1480                false,
1481                true,
1482            ),
1483            Field::new_list(
1484                "c4",
1485                Field::new(
1486                    "element",
1487                    DataType::Timestamp(TimeUnit::Microsecond, None),
1488                    true,
1489                ),
1490                true,
1491            ),
1492            Field::new_list(
1493                "c5",
1494                Field::new_struct(
1495                    "element",
1496                    vec![Field::new(
1497                        "c0",
1498                        DataType::Timestamp(TimeUnit::Microsecond, None),
1499                        true,
1500                    )],
1501                    true,
1502                ),
1503                true,
1504            ),
1505            Field::new_list(
1506                "c6",
1507                Field::new_map(
1508                    "element",
1509                    "key_value",
1510                    Field::new(
1511                        "key",
1512                        DataType::Timestamp(TimeUnit::Microsecond, None),
1513                        false,
1514                    ),
1515                    Field::new(
1516                        "value",
1517                        DataType::Timestamp(TimeUnit::Microsecond, None),
1518                        true,
1519                    ),
1520                    false,
1521                    true,
1522                ),
1523                true,
1524            ),
1525        ]));
1526
1527        assert_eq!(batch.schema(), expected_schema);
1528
1529        Ok(())
1530    }
1531
1532    #[tokio::test]
1533    async fn parquet_exec_with_range() -> Result<()> {
1534        fn file_range(meta: &ObjectMeta, start: i64, end: i64) -> PartitionedFile {
1535            PartitionedFile {
1536                object_meta: meta.clone(),
1537                partition_values: vec![],
1538                range: Some(FileRange { start, end }),
1539                statistics: None,
1540                extensions: None,
1541                metadata_size_hint: None,
1542            }
1543        }
1544
1545        async fn assert_parquet_read(
1546            state: &SessionState,
1547            file_groups: Vec<FileGroup>,
1548            expected_row_num: Option<usize>,
1549            file_schema: SchemaRef,
1550        ) -> Result<()> {
1551            let config = FileScanConfigBuilder::new(
1552                ObjectStoreUrl::local_filesystem(),
1553                file_schema,
1554                Arc::new(ParquetSource::default()),
1555            )
1556            .with_file_groups(file_groups)
1557            .build();
1558
1559            let parquet_exec = DataSourceExec::from_data_source(config);
1560            assert_eq!(
1561                parquet_exec
1562                    .properties()
1563                    .output_partitioning()
1564                    .partition_count(),
1565                1
1566            );
1567            let results = parquet_exec.execute(0, state.task_ctx())?.next().await;
1568
1569            if let Some(expected_row_num) = expected_row_num {
1570                let batch = results.unwrap()?;
1571                assert_eq!(expected_row_num, batch.num_rows());
1572            } else {
1573                assert!(results.is_none());
1574            }
1575
1576            Ok(())
1577        }
1578
1579        let session_ctx = SessionContext::new();
1580        let state = session_ctx.state();
1581
1582        let testdata = datafusion_common::test_util::parquet_test_data();
1583        let filename = format!("{testdata}/alltypes_plain.parquet");
1584
1585        let meta = local_unpartitioned_file(filename);
1586
1587        let store = Arc::new(LocalFileSystem::new()) as _;
1588        let file_schema = ParquetFormat::default()
1589            .infer_schema(&state, &store, std::slice::from_ref(&meta))
1590            .await?;
1591
1592        let group_empty = vec![FileGroup::new(vec![file_range(&meta, 0, 2)])];
1593        let group_contain = vec![FileGroup::new(vec![file_range(&meta, 2, i64::MAX)])];
1594        let group_all = vec![FileGroup::new(vec![
1595            file_range(&meta, 0, 2),
1596            file_range(&meta, 2, i64::MAX),
1597        ])];
1598
1599        assert_parquet_read(&state, group_empty, None, file_schema.clone()).await?;
1600        assert_parquet_read(&state, group_contain, Some(8), file_schema.clone()).await?;
1601        assert_parquet_read(&state, group_all, Some(8), file_schema).await?;
1602
1603        Ok(())
1604    }
1605
1606    #[tokio::test]
1607    async fn parquet_exec_with_partition() -> Result<()> {
1608        let session_ctx = SessionContext::new();
1609        let state = session_ctx.state();
1610        let task_ctx = session_ctx.task_ctx();
1611
1612        let object_store_url = ObjectStoreUrl::local_filesystem();
1613        let store = state.runtime_env().object_store(&object_store_url).unwrap();
1614
1615        let testdata = datafusion_common::test_util::parquet_test_data();
1616        let filename = format!("{testdata}/alltypes_plain.parquet");
1617
1618        let meta = local_unpartitioned_file(filename);
1619
1620        let schema = ParquetFormat::default()
1621            .infer_schema(&state, &store, std::slice::from_ref(&meta))
1622            .await
1623            .unwrap();
1624
1625        let partitioned_file = PartitionedFile {
1626            object_meta: meta,
1627            partition_values: vec![
1628                ScalarValue::from("2021"),
1629                ScalarValue::UInt8(Some(10)),
1630                ScalarValue::Dictionary(
1631                    Box::new(DataType::UInt16),
1632                    Box::new(ScalarValue::from("26")),
1633                ),
1634            ],
1635            range: None,
1636            statistics: None,
1637            extensions: None,
1638            metadata_size_hint: None,
1639        };
1640
1641        let expected_schema = Schema::new(vec![
1642            Field::new("id", DataType::Int32, true),
1643            Field::new("bool_col", DataType::Boolean, true),
1644            Field::new("tinyint_col", DataType::Int32, true),
1645            Field::new("month", DataType::UInt8, false),
1646            Field::new(
1647                "day",
1648                DataType::Dictionary(
1649                    Box::new(DataType::UInt16),
1650                    Box::new(DataType::Utf8),
1651                ),
1652                false,
1653            ),
1654        ]);
1655
1656        let source = Arc::new(ParquetSource::default());
1657        let config = FileScanConfigBuilder::new(object_store_url, schema.clone(), source)
1658            .with_file(partitioned_file)
1659            // file has 10 cols so index 12 should be month and 13 should be day
1660            .with_projection_indices(Some(vec![0, 1, 2, 12, 13]))
1661            .with_table_partition_cols(vec![
1662                Field::new("year", DataType::Utf8, false),
1663                Field::new("month", DataType::UInt8, false),
1664                Field::new(
1665                    "day",
1666                    DataType::Dictionary(
1667                        Box::new(DataType::UInt16),
1668                        Box::new(DataType::Utf8),
1669                    ),
1670                    false,
1671                ),
1672            ])
1673            .build();
1674
1675        let parquet_exec = DataSourceExec::from_data_source(config);
1676        let partition_count = parquet_exec
1677            .data_source()
1678            .output_partitioning()
1679            .partition_count();
1680        assert_eq!(partition_count, 1);
1681        assert_eq!(parquet_exec.schema().as_ref(), &expected_schema);
1682
1683        let mut results = parquet_exec.execute(0, task_ctx)?;
1684        let batch = results.next().await.unwrap()?;
1685        assert_eq!(batch.schema().as_ref(), &expected_schema);
1686
1687        assert_snapshot!(batches_to_string(&[batch]),@r###"
1688            +----+----------+-------------+-------+-----+
1689            | id | bool_col | tinyint_col | month | day |
1690            +----+----------+-------------+-------+-----+
1691            | 4  | true     | 0           | 10    | 26  |
1692            | 5  | false    | 1           | 10    | 26  |
1693            | 6  | true     | 0           | 10    | 26  |
1694            | 7  | false    | 1           | 10    | 26  |
1695            | 2  | true     | 0           | 10    | 26  |
1696            | 3  | false    | 1           | 10    | 26  |
1697            | 0  | true     | 0           | 10    | 26  |
1698            | 1  | false    | 1           | 10    | 26  |
1699            +----+----------+-------------+-------+-----+
1700        "###);
1701
1702        let batch = results.next().await;
1703        assert!(batch.is_none());
1704
1705        Ok(())
1706    }
1707
1708    #[tokio::test]
1709    async fn parquet_exec_with_error() -> Result<()> {
1710        let session_ctx = SessionContext::new();
1711        let state = session_ctx.state();
1712        let location = Path::from_filesystem_path(".")
1713            .unwrap()
1714            .child("invalid.parquet");
1715
1716        let partitioned_file = PartitionedFile {
1717            object_meta: ObjectMeta {
1718                location,
1719                last_modified: Utc.timestamp_nanos(0),
1720                size: 1337,
1721                e_tag: None,
1722                version: None,
1723            },
1724            partition_values: vec![],
1725            range: None,
1726            statistics: None,
1727            extensions: None,
1728            metadata_size_hint: None,
1729        };
1730
1731        let file_schema = Arc::new(Schema::empty());
1732        let config = FileScanConfigBuilder::new(
1733            ObjectStoreUrl::local_filesystem(),
1734            file_schema,
1735            Arc::new(ParquetSource::default()),
1736        )
1737        .with_file(partitioned_file)
1738        .build();
1739
1740        let parquet_exec = DataSourceExec::from_data_source(config);
1741
1742        let mut results = parquet_exec.execute(0, state.task_ctx())?;
1743        let batch = results.next().await.unwrap();
1744        // invalid file should produce an error to that effect
1745        assert_contains!(batch.unwrap_err().to_string(), "invalid.parquet not found");
1746        assert!(results.next().await.is_none());
1747
1748        Ok(())
1749    }
1750
1751    #[tokio::test]
1752    async fn parquet_page_index_exec_metrics() {
1753        let c1: ArrayRef = Arc::new(Int32Array::from(vec![
1754            Some(1),
1755            None,
1756            Some(2),
1757            Some(3),
1758            Some(4),
1759            Some(5),
1760        ]));
1761        let batch1 = create_batch(vec![("int", c1.clone())]);
1762
1763        let filter = col("int").eq(lit(4_i32));
1764
1765        let rt = RoundTrip::new()
1766            .with_predicate(filter)
1767            .with_page_index_predicate()
1768            .round_trip(vec![batch1])
1769            .await;
1770
1771        let metrics = rt.parquet_exec.metrics().unwrap();
1772
1773        assert_snapshot!(batches_to_sort_string(&rt.batches.unwrap()),@r###"
1774            +-----+
1775            | int |
1776            +-----+
1777            | 4   |
1778            | 5   |
1779            +-----+
1780        "###);
1781        let (page_index_pruned, page_index_matched) =
1782            get_pruning_metric(&metrics, "page_index_rows_pruned");
1783        assert_eq!(page_index_pruned, 4);
1784        assert_eq!(page_index_matched, 2);
1785        assert!(
1786            get_value(&metrics, "page_index_eval_time") > 0,
1787            "no eval time in metrics: {metrics:#?}"
1788        );
1789    }
1790
1791    /// Returns a string array with contents:
1792    /// "[Foo, null, bar, bar, bar, bar, zzz]"
1793    fn string_batch() -> RecordBatch {
1794        let c1: ArrayRef = Arc::new(StringArray::from(vec![
1795            Some("Foo"),
1796            None,
1797            Some("bar"),
1798            Some("bar"),
1799            Some("bar"),
1800            Some("bar"),
1801            Some("zzz"),
1802        ]));
1803
1804        // batch1: c1(string)
1805        create_batch(vec![("c1", c1.clone())])
1806    }
1807
1808    #[tokio::test]
1809    async fn parquet_exec_metrics() {
1810        // batch1: c1(string)
1811        let batch1 = string_batch();
1812
1813        // c1 != 'bar'
1814        let filter = col("c1").not_eq(lit("bar"));
1815
1816        // read/write them files:
1817        let rt = RoundTrip::new()
1818            .with_predicate(filter)
1819            .with_pushdown_predicate()
1820            .round_trip(vec![batch1])
1821            .await;
1822
1823        let metrics = rt.parquet_exec.metrics().unwrap();
1824
1825        // assert the batches and some metrics
1826        assert_snapshot!(batches_to_string(&rt.batches.unwrap()),@r###"
1827            +-----+
1828            | c1  |
1829            +-----+
1830            | Foo |
1831            | zzz |
1832            +-----+
1833        "###);
1834
1835        // pushdown predicates have eliminated all 4 bar rows and the
1836        // null row for 5 rows total
1837        assert_eq!(get_value(&metrics, "pushdown_rows_pruned"), 5);
1838        assert_eq!(get_value(&metrics, "pushdown_rows_matched"), 2);
1839        assert!(
1840            get_value(&metrics, "row_pushdown_eval_time") > 0,
1841            "no pushdown eval time in metrics: {metrics:#?}"
1842        );
1843        assert!(
1844            get_value(&metrics, "statistics_eval_time") > 0,
1845            "no statistics eval time in metrics: {metrics:#?}"
1846        );
1847        assert!(
1848            get_value(&metrics, "bloom_filter_eval_time") > 0,
1849            "no Bloom Filter eval time in metrics: {metrics:#?}"
1850        );
1851    }
1852
1853    #[tokio::test]
1854    async fn parquet_exec_display() {
1855        // batch1: c1(string)
1856        let batch1 = string_batch();
1857
1858        // c1 != 'bar'
1859        let filter = col("c1").not_eq(lit("bar"));
1860
1861        let rt = RoundTrip::new()
1862            .with_predicate(filter)
1863            .with_pushdown_predicate()
1864            .round_trip(vec![batch1])
1865            .await;
1866
1867        let explain = rt.explain.unwrap();
1868
1869        // check that there was a pruning predicate -> row groups got pruned
1870        assert_contains!(&explain, "predicate=c1@0 != bar");
1871
1872        // there's a single row group, but we can check that it matched
1873        assert_contains!(
1874            &explain,
1875            "row_groups_pruned_statistics=1 total \u{2192} 1 matched"
1876        );
1877
1878        // check the projection
1879        assert_contains!(&explain, "projection=[c1]");
1880    }
1881
1882    #[tokio::test]
1883    async fn parquet_exec_has_no_pruning_predicate_if_can_not_prune() {
1884        // batch1: c1(string)
1885        let batch1 = string_batch();
1886
1887        // filter is too complicated for pruning (PruningPredicate code does not
1888        // handle case expressions), so the pruning predicate will always be
1889        // "true"
1890
1891        // WHEN c1 != bar THEN true ELSE false END
1892        let filter = when(col("c1").not_eq(lit("bar")), lit(true))
1893            .otherwise(lit(false))
1894            .unwrap();
1895
1896        let rt = RoundTrip::new()
1897            .with_predicate(filter.clone())
1898            .with_pushdown_predicate()
1899            .round_trip(vec![batch1])
1900            .await;
1901
1902        // Should not contain a pruning predicate (since nothing can be pruned)
1903        let explain = rt.explain.unwrap();
1904
1905        // When both matched and pruned are 0, it means that the pruning predicate
1906        // was not used at all.
1907        assert_contains!(
1908            &explain,
1909            "row_groups_pruned_statistics=1 total \u{2192} 1 matched"
1910        );
1911
1912        // But pushdown predicate should be present
1913        assert_contains!(
1914            &explain,
1915            "predicate=CASE WHEN c1@0 != bar THEN true ELSE false END"
1916        );
1917        assert_contains!(&explain, "pushdown_rows_pruned=5");
1918    }
1919
1920    #[tokio::test]
1921    async fn parquet_exec_has_pruning_predicate_for_guarantees() {
1922        // batch1: c1(string)
1923        let batch1 = string_batch();
1924
1925        // part of the filter is too complicated for pruning (PruningPredicate code does not
1926        // handle case expressions), but part (c1 = 'foo') can be used for bloom filtering, so
1927        // should still have the pruning predicate.
1928
1929        // c1 = 'foo' AND (WHEN c1 != bar THEN true ELSE false END)
1930        let filter = col("c1").eq(lit("foo")).and(
1931            when(col("c1").not_eq(lit("bar")), lit(true))
1932                .otherwise(lit(false))
1933                .unwrap(),
1934        );
1935
1936        let rt = RoundTrip::new()
1937            .with_predicate(filter.clone())
1938            .with_pushdown_predicate()
1939            .with_bloom_filters()
1940            .round_trip(vec![batch1])
1941            .await;
1942
1943        // Should have a pruning predicate
1944        let explain = rt.explain.unwrap();
1945        assert_contains!(
1946            &explain,
1947            "predicate=c1@0 = foo AND CASE WHEN c1@0 != bar THEN true ELSE false END"
1948        );
1949
1950        // And bloom filters should have been evaluated
1951        assert_contains!(&explain, "row_groups_pruned_bloom_filter=1");
1952    }
1953
1954    /// Returns the sum of all the metrics with the specified name
1955    /// the returned set.
1956    ///
1957    /// Count: returns value
1958    /// Time: returns elapsed nanoseconds
1959    ///
1960    /// Panics if no such metric.
1961    fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize {
1962        match metrics.sum_by_name(metric_name) {
1963            Some(v) => match v {
1964                MetricValue::PruningMetrics {
1965                    pruning_metrics, ..
1966                } => pruning_metrics.pruned(),
1967                _ => v.as_usize(),
1968            },
1969            _ => {
1970                panic!(
1971                    "Expected metric not found. Looking for '{metric_name}' in\n\n{metrics:#?}"
1972                );
1973            }
1974        }
1975    }
1976
1977    fn get_pruning_metric(metrics: &MetricsSet, metric_name: &str) -> (usize, usize) {
1978        match metrics.sum_by_name(metric_name) {
1979            Some(MetricValue::PruningMetrics {
1980                pruning_metrics, ..
1981            }) => (pruning_metrics.pruned(), pruning_metrics.matched()),
1982            Some(_) => panic!(
1983                "Metric '{metric_name}' is not a pruning metric in\n\n{metrics:#?}"
1984            ),
1985            None => panic!(
1986                "Expected metric not found. Looking for '{metric_name}' in\n\n{metrics:#?}"
1987            ),
1988        }
1989    }
1990
1991    fn populate_csv_partitions(
1992        tmp_dir: &TempDir,
1993        partition_count: usize,
1994        file_extension: &str,
1995    ) -> Result<SchemaRef> {
1996        // define schema for data source (csv file)
1997        let schema = Arc::new(Schema::new(vec![
1998            Field::new("c1", DataType::UInt32, false),
1999            Field::new("c2", DataType::UInt64, false),
2000            Field::new("c3", DataType::Boolean, false),
2001        ]));
2002
2003        // generate a partitioned file
2004        for partition in 0..partition_count {
2005            let filename = format!("partition-{partition}.{file_extension}");
2006            let file_path = tmp_dir.path().join(filename);
2007            let mut file = File::create(file_path)?;
2008
2009            // generate some data
2010            for i in 0..=10 {
2011                let data = format!("{},{},{}\n", partition, i, i % 2 == 0);
2012                file.write_all(data.as_bytes())?;
2013            }
2014        }
2015
2016        Ok(schema)
2017    }
2018
2019    #[tokio::test]
2020    async fn write_table_results() -> Result<()> {
2021        // create partitioned input file and context
2022        let tmp_dir = TempDir::new()?;
2023        // let mut ctx = create_ctx(&tmp_dir, 4).await?;
2024        let ctx = SessionContext::new_with_config(
2025            SessionConfig::new().with_target_partitions(8),
2026        );
2027        let schema = populate_csv_partitions(&tmp_dir, 4, ".csv")?;
2028        // register csv file with the execution context
2029        ctx.register_csv(
2030            "test",
2031            tmp_dir.path().to_str().unwrap(),
2032            CsvReadOptions::new().schema(&schema),
2033        )
2034        .await?;
2035
2036        // register a local file system object store for /tmp directory
2037        let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
2038        let local_url = Url::parse("file://local").unwrap();
2039        ctx.register_object_store(&local_url, local);
2040
2041        // Configure listing options
2042        let file_format = ParquetFormat::default().with_enable_pruning(true);
2043        let listing_options = ListingOptions::new(Arc::new(file_format))
2044            .with_file_extension(ParquetFormat::default().get_ext());
2045
2046        // execute a simple query and write the results to parquet
2047        let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
2048        fs::create_dir(&out_dir).unwrap();
2049        let df = ctx.sql("SELECT c1, c2 FROM test").await?;
2050        let schema = Arc::clone(df.schema().inner());
2051        // Register a listing table - this will use all files in the directory as data sources
2052        // for the query
2053        ctx.register_listing_table(
2054            "my_table",
2055            &out_dir,
2056            listing_options,
2057            Some(schema),
2058            None,
2059        )
2060        .await
2061        .unwrap();
2062        df.write_table("my_table", DataFrameWriteOptions::new())
2063            .await?;
2064
2065        // create a new context and verify that the results were saved to a partitioned parquet file
2066        let ctx = SessionContext::new();
2067
2068        // get write_id
2069        let mut paths = fs::read_dir(&out_dir).unwrap();
2070        let path = paths.next();
2071        let name = path
2072            .unwrap()?
2073            .path()
2074            .file_name()
2075            .expect("Should be a file name")
2076            .to_str()
2077            .expect("Should be a str")
2078            .to_owned();
2079        let (parsed_id, _) = name.split_once('_').expect("File should contain _ !");
2080        let write_id = parsed_id.to_owned();
2081
2082        // register each partition as well as the top level dir
2083        ctx.register_parquet(
2084            "part0",
2085            &format!("{out_dir}/{write_id}_0.parquet"),
2086            ParquetReadOptions::default(),
2087        )
2088        .await?;
2089
2090        ctx.register_parquet("allparts", &out_dir, ParquetReadOptions::default())
2091            .await?;
2092
2093        let part0 = ctx.sql("SELECT c1, c2 FROM part0").await?.collect().await?;
2094        let allparts = ctx
2095            .sql("SELECT c1, c2 FROM allparts")
2096            .await?
2097            .collect()
2098            .await?;
2099
2100        let allparts_count: usize = allparts.iter().map(|batch| batch.num_rows()).sum();
2101
2102        assert_eq!(part0[0].schema(), allparts[0].schema());
2103
2104        assert_eq!(allparts_count, 40);
2105
2106        Ok(())
2107    }
2108
2109    #[tokio::test]
2110    async fn test_struct_filter_parquet() -> Result<()> {
2111        let tmp_dir = TempDir::new()?;
2112        let path = tmp_dir.path().to_str().unwrap().to_string() + "/test.parquet";
2113        write_file(&path);
2114        let ctx = SessionContext::new();
2115        let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
2116        ctx.register_listing_table("base_table", path, opt, None, None)
2117            .await
2118            .unwrap();
2119        let sql = "select * from base_table where name='test02'";
2120        let batch = ctx.sql(sql).await.unwrap().collect().await.unwrap();
2121        assert_eq!(batch.len(), 1);
2122        insta::assert_snapshot!(batches_to_string(&batch),@r###"
2123            +---------------------+----+--------+
2124            | struct              | id | name   |
2125            +---------------------+----+--------+
2126            | {id: 4, name: aaa2} | 2  | test02 |
2127            +---------------------+----+--------+
2128        "###);
2129        Ok(())
2130    }
2131
2132    #[tokio::test]
2133    async fn test_struct_filter_parquet_with_view_types() -> Result<()> {
2134        let tmp_dir = TempDir::new().unwrap();
2135        let path = tmp_dir.path().to_str().unwrap().to_string() + "/test.parquet";
2136        write_file(&path);
2137
2138        let ctx = SessionContext::new();
2139
2140        let mut options = TableParquetOptions::default();
2141        options.global.schema_force_view_types = true;
2142        let opt =
2143            ListingOptions::new(Arc::new(ParquetFormat::default().with_options(options)));
2144
2145        ctx.register_listing_table("base_table", path, opt, None, None)
2146            .await
2147            .unwrap();
2148        let sql = "select * from base_table where name='test02'";
2149        let batch = ctx.sql(sql).await.unwrap().collect().await.unwrap();
2150        assert_eq!(batch.len(), 1);
2151        insta::assert_snapshot!(batches_to_string(&batch),@r###"
2152            +---------------------+----+--------+
2153            | struct              | id | name   |
2154            +---------------------+----+--------+
2155            | {id: 4, name: aaa2} | 2  | test02 |
2156            +---------------------+----+--------+
2157        "###);
2158        Ok(())
2159    }
2160
2161    fn write_file(file: &String) {
2162        let struct_fields = Fields::from(vec![
2163            Field::new("id", DataType::Int64, false),
2164            Field::new("name", DataType::Utf8, false),
2165        ]);
2166        let schema = Schema::new(vec![
2167            Field::new("struct", DataType::Struct(struct_fields.clone()), false),
2168            Field::new("id", DataType::Int64, true),
2169            Field::new("name", DataType::Utf8, false),
2170        ]);
2171        let id_array = Int64Array::from(vec![Some(1), Some(2)]);
2172        let columns = vec![
2173            Arc::new(Int64Array::from(vec![3, 4])) as _,
2174            Arc::new(StringArray::from(vec!["aaa1", "aaa2"])) as _,
2175        ];
2176        let struct_array = StructArray::new(struct_fields, columns, None);
2177
2178        let name_array = StringArray::from(vec![Some("test01"), Some("test02")]);
2179        let schema = Arc::new(schema);
2180
2181        let batch = RecordBatch::try_new(
2182            schema.clone(),
2183            vec![
2184                Arc::new(struct_array),
2185                Arc::new(id_array),
2186                Arc::new(name_array),
2187            ],
2188        )
2189        .unwrap();
2190        let file = File::create(file).unwrap();
2191        let w_opt = WriterProperties::builder().build();
2192        let mut writer = ArrowWriter::try_new(file, schema, Some(w_opt)).unwrap();
2193        writer.write(&batch).unwrap();
2194        writer.flush().unwrap();
2195        writer.close().unwrap();
2196    }
2197
2198    /// Write out a batch to a parquet file and return the total size of the file
2199    async fn write_batch(
2200        path: &str,
2201        store: Arc<dyn ObjectStore>,
2202        batch: RecordBatch,
2203    ) -> u64 {
2204        let mut writer =
2205            ArrowWriter::try_new(BytesMut::new().writer(), batch.schema(), None).unwrap();
2206        writer.write(&batch).unwrap();
2207        writer.flush().unwrap();
2208        let bytes = writer.into_inner().unwrap().into_inner().freeze();
2209        let total_size = bytes.len() as u64;
2210        let path = Path::from(path);
2211        let payload = object_store::PutPayload::from_bytes(bytes);
2212        store
2213            .put_opts(&path, payload, object_store::PutOptions::default())
2214            .await
2215            .unwrap();
2216        total_size
2217    }
2218
2219    /// A ParquetFileReaderFactory that tracks the metadata_size_hint passed to it
2220    #[derive(Debug, Clone)]
2221    struct TrackingParquetFileReaderFactory {
2222        inner: Arc<dyn ParquetFileReaderFactory>,
2223        metadata_size_hint_calls: Arc<Mutex<Vec<Option<usize>>>>,
2224    }
2225
2226    impl TrackingParquetFileReaderFactory {
2227        fn new(store: Arc<dyn ObjectStore>) -> Self {
2228            Self {
2229                inner: Arc::new(DefaultParquetFileReaderFactory::new(store)) as _,
2230                metadata_size_hint_calls: Arc::new(Mutex::new(vec![])),
2231            }
2232        }
2233    }
2234
2235    impl ParquetFileReaderFactory for TrackingParquetFileReaderFactory {
2236        fn create_reader(
2237            &self,
2238            partition_index: usize,
2239            partitioned_file: PartitionedFile,
2240            metadata_size_hint: Option<usize>,
2241            metrics: &ExecutionPlanMetricsSet,
2242        ) -> Result<Box<dyn parquet::arrow::async_reader::AsyncFileReader + Send>>
2243        {
2244            self.metadata_size_hint_calls
2245                .lock()
2246                .unwrap()
2247                .push(metadata_size_hint);
2248            self.inner.create_reader(
2249                partition_index,
2250                partitioned_file,
2251                metadata_size_hint,
2252                metrics,
2253            )
2254        }
2255    }
2256
2257    /// Test passing `metadata_size_hint` to either a single file or the whole exec
2258    #[tokio::test]
2259    async fn test_metadata_size_hint() {
2260        let store =
2261            Arc::new(object_store::memory::InMemory::new()) as Arc<dyn ObjectStore>;
2262        let store_url = ObjectStoreUrl::parse("memory://test").unwrap();
2263
2264        let ctx = SessionContext::new();
2265        ctx.register_object_store(store_url.as_ref(), store.clone());
2266
2267        // write some data out, it doesn't matter what it is
2268        let c1: ArrayRef = Arc::new(Int32Array::from(vec![Some(1)]));
2269        let batch = create_batch(vec![("c1", c1)]);
2270        let schema = batch.schema();
2271        let name_1 = "test1.parquet";
2272        let name_2 = "test2.parquet";
2273        let total_size_1 = write_batch(name_1, store.clone(), batch.clone()).await;
2274        let total_size_2 = write_batch(name_2, store.clone(), batch.clone()).await;
2275
2276        let reader_factory =
2277            Arc::new(TrackingParquetFileReaderFactory::new(store.clone()));
2278
2279        let size_hint_calls = reader_factory.metadata_size_hint_calls.clone();
2280
2281        let source = Arc::new(
2282            ParquetSource::default()
2283                .with_parquet_file_reader_factory(reader_factory)
2284                .with_metadata_size_hint(456),
2285        );
2286        let config = FileScanConfigBuilder::new(store_url, schema, source)
2287            .with_file(
2288                PartitionedFile {
2289                    object_meta: ObjectMeta {
2290                        location: Path::from(name_1),
2291                        last_modified: Utc::now(),
2292                        size: total_size_1,
2293                        e_tag: None,
2294                        version: None,
2295                    },
2296                    partition_values: vec![],
2297                    range: None,
2298                    statistics: None,
2299                    extensions: None,
2300                    metadata_size_hint: None,
2301                }
2302                .with_metadata_size_hint(123),
2303            )
2304            .with_file(PartitionedFile {
2305                object_meta: ObjectMeta {
2306                    location: Path::from(name_2),
2307                    last_modified: Utc::now(),
2308                    size: total_size_2,
2309                    e_tag: None,
2310                    version: None,
2311                },
2312                partition_values: vec![],
2313                range: None,
2314                statistics: None,
2315                extensions: None,
2316                metadata_size_hint: None,
2317            })
2318            .build();
2319
2320        let exec = DataSourceExec::from_data_source(config);
2321
2322        let res = collect(exec, ctx.task_ctx()).await.unwrap();
2323        assert_eq!(res.len(), 2);
2324
2325        let calls = size_hint_calls.lock().unwrap().clone();
2326        assert_eq!(calls.len(), 2);
2327        assert_eq!(calls, vec![Some(123), Some(456)]);
2328    }
2329}