1pub use datafusion_datasource_parquet::*;
23
24#[cfg(test)]
25mod tests {
26 use std::fs::{self, File};
28 use std::io::Write;
29 use std::sync::Arc;
30 use std::sync::Mutex;
31
32 use crate::dataframe::DataFrameWriteOptions;
33 use crate::datasource::file_format::options::CsvReadOptions;
34 use crate::datasource::file_format::parquet::test_util::store_parquet;
35 use crate::datasource::file_format::test_util::scan_format;
36 use crate::datasource::listing::ListingOptions;
37 use crate::execution::context::SessionState;
38 use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
39 use crate::test::object_store::local_unpartitioned_file;
40 use arrow::array::{
41 ArrayRef, AsArray, Date64Array, Int32Array, Int64Array, Int8Array, StringArray,
42 StringViewArray, StructArray, TimestampNanosecondArray,
43 };
44 use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaBuilder};
45 use arrow::record_batch::RecordBatch;
46 use arrow::util::pretty::pretty_format_batches;
47 use arrow_schema::{SchemaRef, TimeUnit};
48 use bytes::{BufMut, BytesMut};
49 use datafusion_common::config::TableParquetOptions;
50 use datafusion_common::test_util::{batches_to_sort_string, batches_to_string};
51 use datafusion_common::{assert_contains, Result, ScalarValue};
52 use datafusion_datasource::file_format::FileFormat;
53 use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
54 use datafusion_datasource::source::DataSourceExec;
55
56 use datafusion_datasource::file::FileSource;
57 use datafusion_datasource::{FileRange, PartitionedFile, TableSchema};
58 use datafusion_datasource_parquet::source::ParquetSource;
59 use datafusion_datasource_parquet::{
60 DefaultParquetFileReaderFactory, ParquetFileReaderFactory, ParquetFormat,
61 };
62 use datafusion_execution::object_store::ObjectStoreUrl;
63 use datafusion_expr::{col, lit, when, Expr};
64 use datafusion_physical_expr::planner::logical2physical;
65 use datafusion_physical_plan::analyze::AnalyzeExec;
66 use datafusion_physical_plan::collect;
67 use datafusion_physical_plan::metrics::{
68 ExecutionPlanMetricsSet, MetricType, MetricValue, MetricsSet,
69 };
70 use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
71
72 use chrono::{TimeZone, Utc};
73 use datafusion_datasource::file_groups::FileGroup;
74 use futures::StreamExt;
75 use insta;
76 use insta::assert_snapshot;
77 use object_store::local::LocalFileSystem;
78 use object_store::path::Path;
79 use object_store::{ObjectMeta, ObjectStore};
80 use parquet::arrow::ArrowWriter;
81 use parquet::file::properties::WriterProperties;
82 use tempfile::TempDir;
83 use url::Url;
84
85 struct RoundTripResult {
86 batches: Result<Vec<RecordBatch>>,
88 explain: Result<String>,
90 parquet_exec: Arc<DataSourceExec>,
92 }
93
94 #[derive(Debug, Default)]
98 struct RoundTrip {
99 projection: Option<Vec<usize>>,
100 table_schema: Option<SchemaRef>,
105 predicate: Option<Expr>,
106 pushdown_predicate: bool,
107 page_index_predicate: bool,
108 bloom_filters: bool,
109 }
110
111 impl RoundTrip {
112 fn new() -> Self {
113 Default::default()
114 }
115
116 fn with_projection(mut self, projection: Vec<usize>) -> Self {
117 self.projection = Some(projection);
118 self
119 }
120
121 fn with_table_schema(mut self, schema: SchemaRef) -> Self {
125 self.table_schema = Some(schema);
126 self
127 }
128
129 fn with_predicate(mut self, predicate: Expr) -> Self {
130 self.predicate = Some(predicate);
131 self
132 }
133
134 fn with_pushdown_predicate(mut self) -> Self {
135 self.pushdown_predicate = true;
136 self
137 }
138
139 fn with_page_index_predicate(mut self) -> Self {
140 self.page_index_predicate = true;
141 self
142 }
143
144 fn with_bloom_filters(mut self) -> Self {
145 self.bloom_filters = true;
146 self
147 }
148
149 async fn round_trip_to_batches(
151 self,
152 batches: Vec<RecordBatch>,
153 ) -> Result<Vec<RecordBatch>> {
154 self.round_trip(batches).await.batches
155 }
156
157 fn build_file_source(&self, table_schema: SchemaRef) -> Arc<dyn FileSource> {
158 let predicate = self
160 .predicate
161 .as_ref()
162 .map(|p| logical2physical(p, &table_schema));
163
164 let mut source = ParquetSource::default();
165 if let Some(predicate) = predicate {
166 source = source.with_predicate(predicate);
167 }
168
169 if self.pushdown_predicate {
170 source = source
171 .with_pushdown_filters(true)
172 .with_reorder_filters(true);
173 } else {
174 source = source.with_pushdown_filters(false);
175 }
176
177 if self.page_index_predicate {
178 source = source.with_enable_page_index(true);
179 } else {
180 source = source.with_enable_page_index(false);
181 }
182
183 if self.bloom_filters {
184 source = source.with_bloom_filter_on_read(true);
185 } else {
186 source = source.with_bloom_filter_on_read(false);
187 }
188
189 source.with_schema(TableSchema::new(Arc::clone(&table_schema), vec![]))
190 }
191
192 fn build_parquet_exec(
193 &self,
194 file_schema: SchemaRef,
195 file_group: FileGroup,
196 source: Arc<dyn FileSource>,
197 ) -> Arc<DataSourceExec> {
198 let base_config = FileScanConfigBuilder::new(
199 ObjectStoreUrl::local_filesystem(),
200 file_schema,
201 source,
202 )
203 .with_file_group(file_group)
204 .with_projection_indices(self.projection.clone())
205 .build();
206 DataSourceExec::from_data_source(base_config)
207 }
208
209 async fn round_trip(&self, batches: Vec<RecordBatch>) -> RoundTripResult {
215 let table_schema = match &self.table_schema {
218 Some(schema) => schema,
219 None => &Arc::new(
220 Schema::try_merge(
221 batches.iter().map(|b| b.schema().as_ref().clone()),
222 )
223 .unwrap(),
224 ),
225 };
226 let multi_page = self.page_index_predicate;
229 let (meta, _files) = store_parquet(batches, multi_page).await.unwrap();
230 let file_group: FileGroup = meta.into_iter().map(Into::into).collect();
231
232 let parquet_source = self.build_file_source(Arc::clone(table_schema));
234 let parquet_exec = self.build_parquet_exec(
235 Arc::clone(table_schema),
236 file_group.clone(),
237 Arc::clone(&parquet_source),
238 );
239
240 let analyze_exec = Arc::new(AnalyzeExec::new(
241 false,
242 false,
243 vec![MetricType::SUMMARY, MetricType::DEV],
244 self.build_parquet_exec(
246 Arc::clone(table_schema),
247 file_group.clone(),
248 self.build_file_source(Arc::clone(table_schema)),
249 ),
250 Arc::new(Schema::new(vec![
251 Field::new("plan_type", DataType::Utf8, true),
252 Field::new("plan", DataType::Utf8, true),
253 ])),
254 ));
255
256 let session_ctx = SessionContext::new();
257 let task_ctx = session_ctx.task_ctx();
258
259 let batches = collect(
260 Arc::clone(&parquet_exec) as Arc<dyn ExecutionPlan>,
261 task_ctx.clone(),
262 )
263 .await;
264
265 let explain = collect(analyze_exec, task_ctx.clone())
266 .await
267 .map(|batches| {
268 let batches = pretty_format_batches(&batches).unwrap();
269 format!("{batches}")
270 });
271
272 RoundTripResult {
273 batches,
274 explain,
275 parquet_exec,
276 }
277 }
278 }
279
280 fn add_to_batch(
282 batch: &RecordBatch,
283 field_name: &str,
284 array: ArrayRef,
285 ) -> RecordBatch {
286 let mut fields = SchemaBuilder::from(batch.schema().fields());
287 fields.push(Field::new(field_name, array.data_type().clone(), true));
288 let schema = Arc::new(fields.finish());
289
290 let mut columns = batch.columns().to_vec();
291 columns.push(array);
292 RecordBatch::try_new(schema, columns).expect("error; creating record batch")
293 }
294
295 fn create_batch(columns: Vec<(&str, ArrayRef)>) -> RecordBatch {
296 columns.into_iter().fold(
297 RecordBatch::new_empty(Arc::new(Schema::empty())),
298 |batch, (field_name, arr)| add_to_batch(&batch, field_name, arr.clone()),
299 )
300 }
301
302 #[tokio::test]
303 async fn test_pushdown_with_missing_column_in_file() {
304 let c1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
305
306 let file_schema =
307 Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
308
309 let table_schema = Arc::new(Schema::new(vec![
310 Field::new("c1", DataType::Int32, true),
311 Field::new("c2", DataType::Int32, true),
312 ]));
313
314 let batch = RecordBatch::try_new(file_schema.clone(), vec![c1]).unwrap();
315
316 let filter = col("c2").eq(lit(1_i32));
320 let rt = RoundTrip::new()
321 .with_table_schema(table_schema.clone())
322 .with_predicate(filter.clone())
323 .with_pushdown_predicate()
324 .round_trip(vec![batch.clone()])
325 .await;
326 let total_rows = rt
327 .batches
328 .unwrap()
329 .iter()
330 .map(|b| b.num_rows())
331 .sum::<usize>();
332 assert_eq!(total_rows, 0, "Expected no rows to match the predicate");
333 let metrics = rt.parquet_exec.metrics().unwrap();
334 let metric = get_value(&metrics, "pushdown_rows_pruned");
335 assert_eq!(metric, 3, "Expected all rows to be pruned");
336
337 let filter = col("c2").is_null().and(col("c1").eq(lit(1_i32)));
339 let rt = RoundTrip::new()
340 .with_table_schema(table_schema.clone())
341 .with_predicate(filter.clone())
342 .with_pushdown_predicate()
343 .round_trip(vec![batch.clone()])
344 .await;
345 let batches = rt.batches.unwrap();
346
347 insta::assert_snapshot!(batches_to_sort_string(&batches),@r###"
348 +----+----+
349 | c1 | c2 |
350 +----+----+
351 | 1 | |
352 +----+----+
353 "###);
354
355 let metrics = rt.parquet_exec.metrics().unwrap();
356 let metric = get_value(&metrics, "pushdown_rows_pruned");
357 assert_eq!(metric, 2, "Expected all rows to be pruned");
358 }
359
360 #[tokio::test]
361 async fn test_pushdown_with_missing_column_in_file_multiple_types() {
362 let c1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
363
364 let file_schema =
365 Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, true)]));
366
367 let table_schema = Arc::new(Schema::new(vec![
368 Field::new("c1", DataType::Int32, true),
369 Field::new("c2", DataType::Utf8, true),
370 ]));
371
372 let batch = RecordBatch::try_new(file_schema.clone(), vec![c1]).unwrap();
373
374 let filter = col("c2").eq(lit("abc"));
378 let rt = RoundTrip::new()
379 .with_table_schema(table_schema.clone())
380 .with_predicate(filter.clone())
381 .with_pushdown_predicate()
382 .round_trip(vec![batch.clone()])
383 .await;
384 let total_rows = rt
385 .batches
386 .unwrap()
387 .iter()
388 .map(|b| b.num_rows())
389 .sum::<usize>();
390 assert_eq!(total_rows, 0, "Expected no rows to match the predicate");
391 let metrics = rt.parquet_exec.metrics().unwrap();
392 let metric = get_value(&metrics, "pushdown_rows_pruned");
393 assert_eq!(metric, 3, "Expected all rows to be pruned");
394
395 let filter = col("c2").is_null().and(col("c1").eq(lit(1_i32)));
397 let rt = RoundTrip::new()
398 .with_table_schema(table_schema.clone())
399 .with_predicate(filter.clone())
400 .with_pushdown_predicate()
401 .round_trip(vec![batch.clone()])
402 .await;
403 let batches = rt.batches.unwrap();
404
405 insta::assert_snapshot!(batches_to_sort_string(&batches),@r###"
406 +----+----+
407 | c1 | c2 |
408 +----+----+
409 | 1 | |
410 +----+----+
411 "###);
412
413 let metrics = rt.parquet_exec.metrics().unwrap();
414 let metric = get_value(&metrics, "pushdown_rows_pruned");
415 assert_eq!(metric, 2, "Expected all rows to be pruned");
416 }
417
418 #[tokio::test]
419 async fn test_pushdown_with_missing_middle_column() {
420 let c1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
421 let c3 = Arc::new(Int32Array::from(vec![7, 8, 9]));
422
423 let file_schema = Arc::new(Schema::new(vec![
424 Field::new("c1", DataType::Int32, true),
425 Field::new("c3", DataType::Int32, true),
426 ]));
427
428 let table_schema = Arc::new(Schema::new(vec![
429 Field::new("c1", DataType::Int32, true),
430 Field::new("c2", DataType::Utf8, true),
431 Field::new("c3", DataType::Int32, true),
432 ]));
433
434 let batch = RecordBatch::try_new(file_schema.clone(), vec![c1, c3]).unwrap();
435
436 let filter = col("c2").eq(lit("abc"));
440 let rt = RoundTrip::new()
441 .with_table_schema(table_schema.clone())
442 .with_predicate(filter.clone())
443 .with_pushdown_predicate()
444 .round_trip(vec![batch.clone()])
445 .await;
446 let total_rows = rt
447 .batches
448 .unwrap()
449 .iter()
450 .map(|b| b.num_rows())
451 .sum::<usize>();
452 assert_eq!(total_rows, 0, "Expected no rows to match the predicate");
453 let metrics = rt.parquet_exec.metrics().unwrap();
454 let metric = get_value(&metrics, "pushdown_rows_pruned");
455 assert_eq!(metric, 3, "Expected all rows to be pruned");
456
457 let filter = col("c2").is_null().and(col("c1").eq(lit(1_i32)));
459 let rt = RoundTrip::new()
460 .with_table_schema(table_schema.clone())
461 .with_predicate(filter.clone())
462 .with_pushdown_predicate()
463 .round_trip(vec![batch.clone()])
464 .await;
465 let batches = rt.batches.unwrap();
466
467 insta::assert_snapshot!(batches_to_sort_string(&batches),@r###"
468 +----+----+----+
469 | c1 | c2 | c3 |
470 +----+----+----+
471 | 1 | | 7 |
472 +----+----+----+
473 "###);
474
475 let metrics = rt.parquet_exec.metrics().unwrap();
476 let metric = get_value(&metrics, "pushdown_rows_pruned");
477 assert_eq!(metric, 2, "Expected all rows to be pruned");
478 }
479
480 #[tokio::test]
481 async fn test_pushdown_with_file_column_order_mismatch() {
482 let c3 = Arc::new(Int32Array::from(vec![7, 8, 9]));
483
484 let file_schema = Arc::new(Schema::new(vec![
485 Field::new("c3", DataType::Int32, true),
486 Field::new("c3", DataType::Int32, true),
487 ]));
488
489 let table_schema = Arc::new(Schema::new(vec![
490 Field::new("c1", DataType::Int32, true),
491 Field::new("c2", DataType::Utf8, true),
492 Field::new("c3", DataType::Int32, true),
493 ]));
494
495 let batch =
496 RecordBatch::try_new(file_schema.clone(), vec![c3.clone(), c3]).unwrap();
497
498 let filter = col("c2").eq(lit("abc"));
502 let rt = RoundTrip::new()
503 .with_table_schema(table_schema.clone())
504 .with_predicate(filter.clone())
505 .with_pushdown_predicate()
506 .round_trip(vec![batch.clone()])
507 .await;
508 let total_rows = rt
509 .batches
510 .unwrap()
511 .iter()
512 .map(|b| b.num_rows())
513 .sum::<usize>();
514 assert_eq!(total_rows, 0, "Expected no rows to match the predicate");
515 let metrics = rt.parquet_exec.metrics().unwrap();
516 let metric = get_value(&metrics, "pushdown_rows_pruned");
517 assert_eq!(metric, 3, "Expected all rows to be pruned");
518
519 let filter = col("c2").is_null().and(col("c3").eq(lit(7_i32)));
521 let rt = RoundTrip::new()
522 .with_table_schema(table_schema.clone())
523 .with_predicate(filter.clone())
524 .with_pushdown_predicate()
525 .round_trip(vec![batch.clone()])
526 .await;
527 let batches = rt.batches.unwrap();
528
529 insta::assert_snapshot!(batches_to_sort_string(&batches),@r###"
530 +----+----+----+
531 | c1 | c2 | c3 |
532 +----+----+----+
533 | | | 7 |
534 +----+----+----+
535 "###);
536
537 let metrics = rt.parquet_exec.metrics().unwrap();
538 let metric = get_value(&metrics, "pushdown_rows_pruned");
539 assert_eq!(metric, 2, "Expected all rows to be pruned");
540 }
541
542 #[tokio::test]
543 async fn test_pushdown_with_missing_column_nested_conditions() {
544 let c1: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
546 let c3: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30, 40, 50]));
547
548 let file_schema = Arc::new(Schema::new(vec![
549 Field::new("c1", DataType::Int32, true),
550 Field::new("c3", DataType::Int32, true),
551 ]));
552
553 let table_schema = Arc::new(Schema::new(vec![
554 Field::new("c1", DataType::Int32, true),
555 Field::new("c2", DataType::Int32, true),
556 Field::new("c3", DataType::Int32, true),
557 ]));
558
559 let batch = RecordBatch::try_new(file_schema.clone(), vec![c1, c3]).unwrap();
560
561 let filter = col("c1")
565 .eq(lit(1_i32))
566 .or(col("c2").eq(lit(5_i32)))
567 .and(col("c3").eq(lit(10_i32)).or(col("c2").is_null()));
568
569 let rt = RoundTrip::new()
570 .with_table_schema(table_schema.clone())
571 .with_predicate(filter.clone())
572 .with_pushdown_predicate()
573 .round_trip(vec![batch.clone()])
574 .await;
575
576 let batches = rt.batches.unwrap();
577
578 insta::assert_snapshot!(batches_to_sort_string(&batches),@r###"
579 +----+----+----+
580 | c1 | c2 | c3 |
581 +----+----+----+
582 | 1 | | 10 |
583 +----+----+----+
584 "###);
585
586 let metrics = rt.parquet_exec.metrics().unwrap();
587 let metric = get_value(&metrics, "pushdown_rows_pruned");
588 assert_eq!(metric, 4, "Expected 4 rows to be pruned");
589
590 let filter = col("c1")
595 .lt(lit(3_i32))
596 .and(col("c2").is_not_null())
597 .or(col("c3").gt(lit(20_i32)).and(col("c2").is_null()));
598
599 let rt = RoundTrip::new()
600 .with_table_schema(table_schema)
601 .with_predicate(filter.clone())
602 .with_pushdown_predicate()
603 .round_trip(vec![batch])
604 .await;
605
606 let batches = rt.batches.unwrap();
607
608 insta::assert_snapshot!(batches_to_sort_string(&batches),@r###"
609 +----+----+----+
610 | c1 | c2 | c3 |
611 +----+----+----+
612 | 3 | | 30 |
613 | 4 | | 40 |
614 | 5 | | 50 |
615 +----+----+----+
616 "###);
617
618 let metrics = rt.parquet_exec.metrics().unwrap();
619 let metric = get_value(&metrics, "pushdown_rows_pruned");
620 assert_eq!(metric, 2, "Expected 2 rows to be pruned");
621 }
622
623 #[tokio::test]
624 async fn evolved_schema() {
625 let c1: ArrayRef =
626 Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
627 let batch1 =
629 add_to_batch(&RecordBatch::new_empty(Arc::new(Schema::empty())), "c1", c1);
630
631 let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
633 let batch2 = add_to_batch(&batch1, "c2", c2);
634
635 let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
637 let batch3 = add_to_batch(&batch1, "c3", c3);
638
639 let read = RoundTrip::new()
641 .round_trip_to_batches(vec![batch1, batch2, batch3])
642 .await
643 .unwrap();
644
645 insta::assert_snapshot!(batches_to_sort_string(&read), @r###"
646 +-----+----+----+
647 | c1 | c2 | c3 |
648 +-----+----+----+
649 | | | |
650 | | | 20 |
651 | | 2 | |
652 | Foo | | |
653 | Foo | | 10 |
654 | Foo | 1 | |
655 | bar | | |
656 | bar | | |
657 | bar | | |
658 +-----+----+----+
659 "###);
660 }
661
662 #[tokio::test]
663 async fn evolved_schema_inconsistent_order() {
664 let c1: ArrayRef =
665 Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
666
667 let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
668
669 let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
670
671 let batch1 = create_batch(vec![
673 ("c1", c1.clone()),
674 ("c2", c2.clone()),
675 ("c3", c3.clone()),
676 ]);
677
678 let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1)]);
680
681 let read = RoundTrip::new()
683 .round_trip_to_batches(vec![batch1, batch2])
684 .await
685 .unwrap();
686
687 insta::assert_snapshot!(batches_to_sort_string(&read),@r"
688 +-----+----+----+
689 | c1 | c2 | c3 |
690 +-----+----+----+
691 | | 2 | 20 |
692 | | 2 | 20 |
693 | Foo | 1 | 10 |
694 | Foo | 1 | 10 |
695 | bar | | |
696 | bar | | |
697 +-----+----+----+
698 ");
699 }
700
701 #[tokio::test]
702 async fn evolved_schema_intersection() {
703 let c1: ArrayRef =
704 Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
705
706 let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
707
708 let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
709
710 let batch1 = create_batch(vec![("c1", c1), ("c3", c3.clone())]);
712
713 let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]);
715
716 let read = RoundTrip::new()
718 .round_trip_to_batches(vec![batch1, batch2])
719 .await
720 .unwrap();
721
722 insta::assert_snapshot!(batches_to_sort_string(&read),@r"
723 +-----+----+----+
724 | c1 | c3 | c2 |
725 +-----+----+----+
726 | | | |
727 | | 10 | 1 |
728 | | 20 | |
729 | | 20 | 2 |
730 | Foo | 10 | |
731 | bar | | |
732 +-----+----+----+
733 ");
734 }
735
736 #[tokio::test]
737 async fn evolved_schema_intersection_filter() {
738 let c1: ArrayRef =
739 Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
740
741 let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
742
743 let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
744
745 let batch1 = create_batch(vec![("c1", c1), ("c3", c3.clone())]);
747
748 let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]);
750
751 let filter = col("c2").eq(lit(2_i64));
752
753 let read = RoundTrip::new()
755 .with_predicate(filter)
756 .round_trip_to_batches(vec![batch1, batch2])
757 .await
758 .unwrap();
759
760 insta::assert_snapshot!(batches_to_sort_string(&read),@r###"
761 +-----+----+----+
762 | c1 | c3 | c2 |
763 +-----+----+----+
764 | | | |
765 | | 10 | 1 |
766 | | 20 | |
767 | | 20 | 2 |
768 | Foo | 10 | |
769 | bar | | |
770 +-----+----+----+
771 "###);
772 }
773
774 #[tokio::test]
775 async fn evolved_schema_intersection_filter_with_filter_pushdown() {
776 let c1: ArrayRef =
777 Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
778 let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
779 let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
780 let batch1 = create_batch(vec![("c1", c1), ("c3", c3.clone())]);
782 let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]);
784 let filter = col("c2").eq(lit(2_i64)).or(col("c2").eq(lit(1_i64)));
785 let rt = RoundTrip::new()
787 .with_predicate(filter)
788 .with_pushdown_predicate()
789 .round_trip(vec![batch1, batch2])
790 .await;
791
792 insta::assert_snapshot!(batches_to_sort_string(&rt.batches.unwrap()), @r###"
793 +----+----+----+
794 | c1 | c3 | c2 |
795 +----+----+----+
796 | | 10 | 1 |
797 | | 20 | 2 |
798 +----+----+----+
799 "###);
800 let metrics = rt.parquet_exec.metrics().unwrap();
801 assert_eq!(get_value(&metrics, "pushdown_rows_pruned"), 4);
803 assert_eq!(get_value(&metrics, "pushdown_rows_matched"), 2);
804 }
805
806 #[tokio::test]
807 async fn evolved_schema_projection() {
808 let c1: ArrayRef =
809 Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
810
811 let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
812
813 let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
814
815 let c4: ArrayRef =
816 Arc::new(StringArray::from(vec![Some("baz"), Some("boo"), None]));
817
818 let batch1 = create_batch(vec![
820 ("c1", c1.clone()),
821 ("c2", c2.clone()),
822 ("c3", c3.clone()),
823 ]);
824
825 let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1), ("c4", c4)]);
827
828 let read = RoundTrip::new()
830 .with_projection(vec![0, 3])
831 .round_trip_to_batches(vec![batch1, batch2])
832 .await
833 .unwrap();
834
835 insta::assert_snapshot!(batches_to_sort_string(&read), @r###"
836 +-----+-----+
837 | c1 | c4 |
838 +-----+-----+
839 | | |
840 | | boo |
841 | Foo | |
842 | Foo | baz |
843 | bar | |
844 | bar | |
845 +-----+-----+
846 "###);
847 }
848
849 #[tokio::test]
850 async fn evolved_schema_column_order_filter() {
851 let c1: ArrayRef =
852 Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
853
854 let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
855
856 let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
857
858 let batch1 = create_batch(vec![
860 ("c1", c1.clone()),
861 ("c2", c2.clone()),
862 ("c3", c3.clone()),
863 ]);
864
865 let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1)]);
867
868 let filter = col("c3").eq(lit(0_i8));
869
870 let read = RoundTrip::new()
872 .with_predicate(filter)
873 .round_trip_to_batches(vec![batch1, batch2])
874 .await
875 .unwrap();
876
877 assert_eq!(read.len(), 0);
879 }
880
881 #[tokio::test]
882 async fn evolved_schema_column_type_filter_strings() {
883 let c1: ArrayRef =
885 Arc::new(StringViewArray::from(vec![Some("foo"), Some("bar")]));
886 let batch = create_batch(vec![("c1", c1.clone())]);
887
888 let table_schema =
890 Arc::new(Schema::new(vec![Field::new("c1", DataType::Utf8, false)]));
891
892 let filter = col("c1").eq(lit(ScalarValue::Utf8(Some("aaa".to_string()))));
894 let rt = RoundTrip::new()
895 .with_predicate(filter)
896 .with_table_schema(table_schema.clone())
897 .round_trip(vec![batch.clone()])
898 .await;
899 let metrics = rt.parquet_exec.metrics().unwrap();
901 assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
902 assert_eq!(get_value(&metrics, "pushdown_rows_matched"), 0);
903 assert_eq!(rt.batches.unwrap().len(), 0);
904
905 let filter = col("c1").eq(lit(ScalarValue::Utf8(Some("foo".to_string()))));
907 let rt = RoundTrip::new()
908 .with_predicate(filter)
909 .with_table_schema(table_schema)
910 .round_trip(vec![batch])
911 .await;
912 let metrics = rt.parquet_exec.metrics().unwrap();
914 assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
915 assert_eq!(get_value(&metrics, "pushdown_rows_matched"), 0);
916 let read = rt
917 .batches
918 .unwrap()
919 .iter()
920 .map(|b| b.num_rows())
921 .sum::<usize>();
922 assert_eq!(read, 2, "Expected 2 rows to match the predicate");
923 }
924
925 #[tokio::test]
926 async fn evolved_schema_column_type_filter_ints() {
927 let c1: ArrayRef = Arc::new(Int8Array::from(vec![Some(1), Some(2)]));
929 let batch = create_batch(vec![("c1", c1.clone())]);
930
931 let table_schema =
932 Arc::new(Schema::new(vec![Field::new("c1", DataType::UInt64, false)]));
933
934 let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(5))));
936 let rt = RoundTrip::new()
937 .with_predicate(filter)
938 .with_table_schema(table_schema.clone())
939 .round_trip(vec![batch.clone()])
940 .await;
941 let metrics = rt.parquet_exec.metrics().unwrap();
943 assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
944 assert_eq!(rt.batches.unwrap().len(), 0);
945
946 let filter = col("c1").eq(lit(ScalarValue::UInt64(Some(1))));
948 let rt = RoundTrip::new()
949 .with_predicate(filter)
950 .with_table_schema(table_schema)
951 .round_trip(vec![batch])
952 .await;
953 let metrics = rt.parquet_exec.metrics().unwrap();
955 assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
956 let read = rt
957 .batches
958 .unwrap()
959 .iter()
960 .map(|b| b.num_rows())
961 .sum::<usize>();
962 assert_eq!(read, 2, "Expected 2 rows to match the predicate");
963 }
964
965 #[tokio::test]
966 async fn evolved_schema_column_type_filter_timestamp_units() {
967 let c1: ArrayRef = Arc::new(TimestampNanosecondArray::from(vec![
970 Some(1_000_000_000), Some(2_000_000_000), Some(3_000_000_000), Some(4_000_000_000), ]));
975 let batch = create_batch(vec![("c1", c1.clone())]);
976 let table_schema = Arc::new(Schema::new(vec![Field::new(
977 "c1",
978 DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
979 false,
980 )]));
981 let filter = col("c1").eq(lit(ScalarValue::TimestampMillisecond(
983 Some(1_000),
984 Some("UTC".into()),
985 )));
986 let rt = RoundTrip::new()
987 .with_predicate(filter)
988 .with_pushdown_predicate()
989 .with_page_index_predicate() .with_table_schema(table_schema.clone())
991 .round_trip(vec![batch.clone()])
992 .await;
993 let metrics = rt.parquet_exec.metrics().unwrap();
995 assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
996 let read = rt
997 .batches
998 .unwrap()
999 .iter()
1000 .map(|b| b.num_rows())
1001 .sum::<usize>();
1002 assert_eq!(read, 1, "Expected 1 rows to match the predicate");
1003 assert_eq!(get_value(&metrics, "row_groups_pruned_statistics"), 0);
1004 assert_eq!(get_value(&metrics, "page_index_rows_pruned"), 2);
1005 assert_eq!(get_value(&metrics, "pushdown_rows_pruned"), 1);
1006 let filter = col("c1").eq(lit(ScalarValue::TimestampMillisecond(
1009 Some(5_000),
1010 Some("UTC".into()),
1011 )));
1012 let rt = RoundTrip::new()
1013 .with_predicate(filter)
1014 .with_pushdown_predicate()
1015 .with_table_schema(table_schema)
1016 .round_trip(vec![batch])
1017 .await;
1018 let metrics = rt.parquet_exec.metrics().unwrap();
1020 assert_eq!(get_value(&metrics, "predicate_evaluation_errors"), 0);
1021 let read = rt
1022 .batches
1023 .unwrap()
1024 .iter()
1025 .map(|b| b.num_rows())
1026 .sum::<usize>();
1027 assert_eq!(read, 0, "Expected 0 rows to match the predicate");
1028 assert_eq!(get_value(&metrics, "row_groups_pruned_statistics"), 1);
1029 }
1030
1031 #[tokio::test]
1032 async fn evolved_schema_disjoint_schema_filter() {
1033 let c1: ArrayRef =
1034 Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
1035
1036 let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
1037
1038 let batch1 = create_batch(vec![("c1", c1.clone())]);
1040
1041 let batch2 = create_batch(vec![("c2", c2)]);
1043
1044 let filter = col("c2").eq(lit(1_i64));
1045
1046 let read = RoundTrip::new()
1048 .with_predicate(filter)
1049 .round_trip_to_batches(vec![batch1, batch2])
1050 .await
1051 .unwrap();
1052
1053 insta::assert_snapshot!(batches_to_sort_string(&read),@r###"
1060 +-----+----+
1061 | c1 | c2 |
1062 +-----+----+
1063 | | |
1064 | | |
1065 | | 1 |
1066 | | 2 |
1067 | Foo | |
1068 | bar | |
1069 +-----+----+
1070 "###);
1071 }
1072
1073 #[tokio::test]
1074 async fn evolved_schema_disjoint_schema_with_filter_pushdown() {
1075 let c1: ArrayRef =
1076 Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
1077
1078 let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
1079
1080 let batch1 = create_batch(vec![("c1", c1.clone())]);
1082
1083 let batch2 = create_batch(vec![("c2", c2)]);
1085
1086 let filter = col("c2").eq(lit(1_i64));
1087
1088 let rt = RoundTrip::new()
1090 .with_predicate(filter)
1091 .with_pushdown_predicate()
1092 .round_trip(vec![batch1, batch2])
1093 .await;
1094
1095 insta::assert_snapshot!(batches_to_sort_string(&rt.batches.unwrap()), @r###"
1096 +----+----+
1097 | c1 | c2 |
1098 +----+----+
1099 | | 1 |
1100 +----+----+
1101 "###);
1102 let metrics = rt.parquet_exec.metrics().unwrap();
1103 assert_eq!(get_value(&metrics, "pushdown_rows_pruned"), 5);
1105 assert_eq!(get_value(&metrics, "pushdown_rows_matched"), 1);
1106 }
1107
1108 #[tokio::test]
1109 async fn evolved_schema_disjoint_schema_with_page_index_pushdown() {
1110 let c1: ArrayRef = Arc::new(StringArray::from(vec![
1111 Some("Foo"),
1113 Some("Bar"),
1114 Some("Foo2"),
1116 Some("Bar2"),
1117 Some("Foo3"),
1119 Some("Bar3"),
1120 ]));
1121
1122 let c2: ArrayRef = Arc::new(Int64Array::from(vec![
1123 Some(1),
1125 Some(2),
1126 Some(3),
1128 Some(4),
1129 Some(5),
1131 None,
1132 ]));
1133
1134 let batch1 = create_batch(vec![("c1", c1.clone())]);
1136
1137 let batch2 = create_batch(vec![("c2", c2.clone())]);
1139
1140 let batch3 = create_batch(vec![("c1", c1.clone()), ("c2", c2.clone())]);
1142
1143 let batch4 = create_batch(vec![("c2", c2), ("c1", c1)]);
1145
1146 let filter = col("c2").eq(lit(1_i64));
1147
1148 let rt = RoundTrip::new()
1150 .with_predicate(filter)
1151 .with_page_index_predicate()
1152 .round_trip(vec![batch1, batch2, batch3, batch4])
1153 .await;
1154
1155 insta::assert_snapshot!(batches_to_sort_string(&rt.batches.unwrap()), @r###"
1156 +------+----+
1157 | c1 | c2 |
1158 +------+----+
1159 | | 1 |
1160 | | 2 |
1161 | Bar | |
1162 | Bar | 2 |
1163 | Bar | 2 |
1164 | Bar2 | |
1165 | Bar3 | |
1166 | Foo | |
1167 | Foo | 1 |
1168 | Foo | 1 |
1169 | Foo2 | |
1170 | Foo3 | |
1171 +------+----+
1172 "###);
1173 let metrics = rt.parquet_exec.metrics().unwrap();
1174
1175 let (page_index_pruned, page_index_matched) =
1179 get_pruning_metric(&metrics, "page_index_rows_pruned");
1180 assert_eq!(page_index_pruned, 12);
1181 assert_eq!(page_index_matched, 6);
1182 }
1183
1184 #[tokio::test]
1185 async fn multi_column_predicate_pushdown() {
1186 let c1: ArrayRef =
1187 Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
1188
1189 let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
1190
1191 let batch1 = create_batch(vec![("c1", c1.clone()), ("c2", c2.clone())]);
1192
1193 let filter = col("c2").eq(lit(1_i64)).or(col("c1").eq(lit("bar")));
1195
1196 let read = RoundTrip::new()
1198 .with_predicate(filter)
1199 .with_pushdown_predicate()
1200 .round_trip_to_batches(vec![batch1])
1201 .await
1202 .unwrap();
1203
1204 insta::assert_snapshot!(batches_to_sort_string(&read),@r###"
1205 +-----+----+
1206 | c1 | c2 |
1207 +-----+----+
1208 | Foo | 1 |
1209 | bar | |
1210 +-----+----+
1211 "###);
1212 }
1213
1214 #[tokio::test]
1215 async fn multi_column_predicate_pushdown_page_index_pushdown() {
1216 let c1: ArrayRef =
1217 Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
1218
1219 let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
1220
1221 let batch1 = create_batch(vec![("c1", c1.clone()), ("c2", c2.clone())]);
1222
1223 let filter = col("c2").eq(lit(1_i64)).or(col("c1").eq(lit("bar")));
1225
1226 let read = RoundTrip::new()
1228 .with_predicate(filter)
1229 .with_page_index_predicate()
1230 .round_trip_to_batches(vec![batch1])
1231 .await
1232 .unwrap();
1233
1234 insta::assert_snapshot!(batches_to_sort_string(&read),@r###"
1235 +-----+----+
1236 | c1 | c2 |
1237 +-----+----+
1238 | | 2 |
1239 | Foo | 1 |
1240 | bar | |
1241 +-----+----+
1242 "###);
1243 }
1244
1245 #[tokio::test]
1246 async fn evolved_schema_incompatible_types() {
1247 let c1: ArrayRef =
1248 Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
1249
1250 let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
1251
1252 let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));
1253
1254 let c4: ArrayRef = Arc::new(Date64Array::from(vec![
1255 Some(86400000),
1256 None,
1257 Some(259200000),
1258 ]));
1259
1260 let batch1 = create_batch(vec![
1262 ("c1", c1.clone()),
1263 ("c2", c2.clone()),
1264 ("c3", c3.clone()),
1265 ]);
1266
1267 let batch2 = create_batch(vec![("c3", c4), ("c2", c2), ("c1", c1)]);
1269
1270 let table_schema = Schema::new(vec![
1271 Field::new("c1", DataType::Utf8, true),
1272 Field::new("c2", DataType::Int64, true),
1273 Field::new("c3", DataType::Int8, true),
1274 ]);
1275
1276 let read = RoundTrip::new()
1278 .with_table_schema(Arc::new(table_schema))
1279 .round_trip_to_batches(vec![batch1, batch2])
1280 .await;
1281 assert_contains!(read.unwrap_err().to_string(),
1282 "Cannot cast file schema field c3 of type Date64 to table schema field of type Int8");
1283 }
1284
1285 #[tokio::test]
1286 async fn parquet_exec_with_projection() -> Result<()> {
1287 let testdata = datafusion_common::test_util::parquet_test_data();
1288 let filename = "alltypes_plain.parquet";
1289 let session_ctx = SessionContext::new();
1290 let state = session_ctx.state();
1291 let task_ctx = state.task_ctx();
1292 let parquet_exec = scan_format(
1293 &state,
1294 &ParquetFormat::default(),
1295 None,
1296 &testdata,
1297 filename,
1298 Some(vec![0, 1, 2]),
1299 None,
1300 )
1301 .await
1302 .unwrap();
1303 assert_eq!(parquet_exec.output_partitioning().partition_count(), 1);
1304
1305 let mut results = parquet_exec.execute(0, task_ctx)?;
1306 let batch = results.next().await.unwrap()?;
1307
1308 assert_eq!(8, batch.num_rows());
1309 assert_eq!(3, batch.num_columns());
1310
1311 let schema = batch.schema();
1312 let field_names: Vec<&str> =
1313 schema.fields().iter().map(|f| f.name().as_str()).collect();
1314 assert_eq!(vec!["id", "bool_col", "tinyint_col"], field_names);
1315
1316 let batch = results.next().await;
1317 assert!(batch.is_none());
1318
1319 let batch = results.next().await;
1320 assert!(batch.is_none());
1321
1322 let batch = results.next().await;
1323 assert!(batch.is_none());
1324
1325 Ok(())
1326 }
1327
1328 #[tokio::test]
1329 async fn parquet_exec_with_int96_from_spark() -> Result<()> {
1330 let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]));
1335 let testdata = datafusion_common::test_util::parquet_test_data();
1336 let filename = "int96_from_spark.parquet";
1337 let session_ctx = SessionContext::new();
1338 let state = session_ctx.state();
1339 let task_ctx = state.task_ctx();
1340
1341 let time_units_and_expected = vec![
1342 (
1343 None, Arc::new(Int64Array::from(vec![
1345 Some(1704141296123456000), Some(1704070800000000000), Some(-4852191831933722624), Some(1735599600000000000), None,
1350 Some(-4864435138808946688), ])),
1352 ),
1353 (
1354 Some("ns".to_string()),
1355 Arc::new(Int64Array::from(vec![
1356 Some(1704141296123456000),
1357 Some(1704070800000000000),
1358 Some(-4852191831933722624),
1359 Some(1735599600000000000),
1360 None,
1361 Some(-4864435138808946688),
1362 ])),
1363 ),
1364 (
1365 Some("us".to_string()),
1366 Arc::new(Int64Array::from(vec![
1367 Some(1704141296123456),
1368 Some(1704070800000000),
1369 Some(253402225200000000),
1370 Some(1735599600000000),
1371 None,
1372 Some(9089380393200000000),
1373 ])),
1374 ),
1375 ];
1376
1377 for (time_unit, expected) in time_units_and_expected {
1378 let parquet_exec = scan_format(
1379 &state,
1380 &ParquetFormat::default().with_coerce_int96(time_unit.clone()),
1381 Some(schema.clone()),
1382 &testdata,
1383 filename,
1384 Some(vec![0]),
1385 None,
1386 )
1387 .await
1388 .unwrap();
1389 assert_eq!(parquet_exec.output_partitioning().partition_count(), 1);
1390
1391 let mut results = parquet_exec.execute(0, task_ctx.clone())?;
1392 let batch = results.next().await.unwrap()?;
1393
1394 assert_eq!(6, batch.num_rows());
1395 assert_eq!(1, batch.num_columns());
1396
1397 assert_eq!(batch.num_columns(), 1);
1398 let column = batch.column(0);
1399
1400 assert_eq!(column.len(), expected.len());
1401
1402 column
1403 .as_primitive::<arrow::datatypes::Int64Type>()
1404 .iter()
1405 .zip(expected.iter())
1406 .for_each(|(lhs, rhs)| {
1407 assert_eq!(lhs, rhs);
1408 });
1409 }
1410
1411 Ok(())
1412 }
1413
1414 #[tokio::test]
1415 async fn parquet_exec_with_int96_nested() -> Result<()> {
1416 let testdata = "../../datafusion/core/tests/data";
1422 let filename = "int96_nested.parquet";
1423 let session_ctx = SessionContext::new();
1424 let state = session_ctx.state();
1425 let task_ctx = state.task_ctx();
1426
1427 let parquet_exec = scan_format(
1428 &state,
1429 &ParquetFormat::default().with_coerce_int96(Some("us".to_string())),
1430 None,
1431 testdata,
1432 filename,
1433 None,
1434 None,
1435 )
1436 .await
1437 .unwrap();
1438 assert_eq!(parquet_exec.output_partitioning().partition_count(), 1);
1439
1440 let mut results = parquet_exec.execute(0, task_ctx.clone())?;
1441 let batch = results.next().await.unwrap()?;
1442
1443 let expected_schema = Arc::new(Schema::new(vec![
1444 Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None), true),
1445 Field::new_struct(
1446 "c1",
1447 vec![Field::new(
1448 "c0",
1449 DataType::Timestamp(TimeUnit::Microsecond, None),
1450 true,
1451 )],
1452 true,
1453 ),
1454 Field::new_struct(
1455 "c2",
1456 vec![Field::new_list(
1457 "c0",
1458 Field::new(
1459 "element",
1460 DataType::Timestamp(TimeUnit::Microsecond, None),
1461 true,
1462 ),
1463 true,
1464 )],
1465 true,
1466 ),
1467 Field::new_map(
1468 "c3",
1469 "key_value",
1470 Field::new(
1471 "key",
1472 DataType::Timestamp(TimeUnit::Microsecond, None),
1473 false,
1474 ),
1475 Field::new(
1476 "value",
1477 DataType::Timestamp(TimeUnit::Microsecond, None),
1478 true,
1479 ),
1480 false,
1481 true,
1482 ),
1483 Field::new_list(
1484 "c4",
1485 Field::new(
1486 "element",
1487 DataType::Timestamp(TimeUnit::Microsecond, None),
1488 true,
1489 ),
1490 true,
1491 ),
1492 Field::new_list(
1493 "c5",
1494 Field::new_struct(
1495 "element",
1496 vec![Field::new(
1497 "c0",
1498 DataType::Timestamp(TimeUnit::Microsecond, None),
1499 true,
1500 )],
1501 true,
1502 ),
1503 true,
1504 ),
1505 Field::new_list(
1506 "c6",
1507 Field::new_map(
1508 "element",
1509 "key_value",
1510 Field::new(
1511 "key",
1512 DataType::Timestamp(TimeUnit::Microsecond, None),
1513 false,
1514 ),
1515 Field::new(
1516 "value",
1517 DataType::Timestamp(TimeUnit::Microsecond, None),
1518 true,
1519 ),
1520 false,
1521 true,
1522 ),
1523 true,
1524 ),
1525 ]));
1526
1527 assert_eq!(batch.schema(), expected_schema);
1528
1529 Ok(())
1530 }
1531
1532 #[tokio::test]
1533 async fn parquet_exec_with_range() -> Result<()> {
1534 fn file_range(meta: &ObjectMeta, start: i64, end: i64) -> PartitionedFile {
1535 PartitionedFile {
1536 object_meta: meta.clone(),
1537 partition_values: vec![],
1538 range: Some(FileRange { start, end }),
1539 statistics: None,
1540 extensions: None,
1541 metadata_size_hint: None,
1542 }
1543 }
1544
1545 async fn assert_parquet_read(
1546 state: &SessionState,
1547 file_groups: Vec<FileGroup>,
1548 expected_row_num: Option<usize>,
1549 file_schema: SchemaRef,
1550 ) -> Result<()> {
1551 let config = FileScanConfigBuilder::new(
1552 ObjectStoreUrl::local_filesystem(),
1553 file_schema,
1554 Arc::new(ParquetSource::default()),
1555 )
1556 .with_file_groups(file_groups)
1557 .build();
1558
1559 let parquet_exec = DataSourceExec::from_data_source(config);
1560 assert_eq!(
1561 parquet_exec
1562 .properties()
1563 .output_partitioning()
1564 .partition_count(),
1565 1
1566 );
1567 let results = parquet_exec.execute(0, state.task_ctx())?.next().await;
1568
1569 if let Some(expected_row_num) = expected_row_num {
1570 let batch = results.unwrap()?;
1571 assert_eq!(expected_row_num, batch.num_rows());
1572 } else {
1573 assert!(results.is_none());
1574 }
1575
1576 Ok(())
1577 }
1578
1579 let session_ctx = SessionContext::new();
1580 let state = session_ctx.state();
1581
1582 let testdata = datafusion_common::test_util::parquet_test_data();
1583 let filename = format!("{testdata}/alltypes_plain.parquet");
1584
1585 let meta = local_unpartitioned_file(filename);
1586
1587 let store = Arc::new(LocalFileSystem::new()) as _;
1588 let file_schema = ParquetFormat::default()
1589 .infer_schema(&state, &store, std::slice::from_ref(&meta))
1590 .await?;
1591
1592 let group_empty = vec![FileGroup::new(vec![file_range(&meta, 0, 2)])];
1593 let group_contain = vec![FileGroup::new(vec![file_range(&meta, 2, i64::MAX)])];
1594 let group_all = vec![FileGroup::new(vec![
1595 file_range(&meta, 0, 2),
1596 file_range(&meta, 2, i64::MAX),
1597 ])];
1598
1599 assert_parquet_read(&state, group_empty, None, file_schema.clone()).await?;
1600 assert_parquet_read(&state, group_contain, Some(8), file_schema.clone()).await?;
1601 assert_parquet_read(&state, group_all, Some(8), file_schema).await?;
1602
1603 Ok(())
1604 }
1605
1606 #[tokio::test]
1607 async fn parquet_exec_with_partition() -> Result<()> {
1608 let session_ctx = SessionContext::new();
1609 let state = session_ctx.state();
1610 let task_ctx = session_ctx.task_ctx();
1611
1612 let object_store_url = ObjectStoreUrl::local_filesystem();
1613 let store = state.runtime_env().object_store(&object_store_url).unwrap();
1614
1615 let testdata = datafusion_common::test_util::parquet_test_data();
1616 let filename = format!("{testdata}/alltypes_plain.parquet");
1617
1618 let meta = local_unpartitioned_file(filename);
1619
1620 let schema = ParquetFormat::default()
1621 .infer_schema(&state, &store, std::slice::from_ref(&meta))
1622 .await
1623 .unwrap();
1624
1625 let partitioned_file = PartitionedFile {
1626 object_meta: meta,
1627 partition_values: vec![
1628 ScalarValue::from("2021"),
1629 ScalarValue::UInt8(Some(10)),
1630 ScalarValue::Dictionary(
1631 Box::new(DataType::UInt16),
1632 Box::new(ScalarValue::from("26")),
1633 ),
1634 ],
1635 range: None,
1636 statistics: None,
1637 extensions: None,
1638 metadata_size_hint: None,
1639 };
1640
1641 let expected_schema = Schema::new(vec![
1642 Field::new("id", DataType::Int32, true),
1643 Field::new("bool_col", DataType::Boolean, true),
1644 Field::new("tinyint_col", DataType::Int32, true),
1645 Field::new("month", DataType::UInt8, false),
1646 Field::new(
1647 "day",
1648 DataType::Dictionary(
1649 Box::new(DataType::UInt16),
1650 Box::new(DataType::Utf8),
1651 ),
1652 false,
1653 ),
1654 ]);
1655
1656 let source = Arc::new(ParquetSource::default());
1657 let config = FileScanConfigBuilder::new(object_store_url, schema.clone(), source)
1658 .with_file(partitioned_file)
1659 .with_projection_indices(Some(vec![0, 1, 2, 12, 13]))
1661 .with_table_partition_cols(vec![
1662 Field::new("year", DataType::Utf8, false),
1663 Field::new("month", DataType::UInt8, false),
1664 Field::new(
1665 "day",
1666 DataType::Dictionary(
1667 Box::new(DataType::UInt16),
1668 Box::new(DataType::Utf8),
1669 ),
1670 false,
1671 ),
1672 ])
1673 .build();
1674
1675 let parquet_exec = DataSourceExec::from_data_source(config);
1676 let partition_count = parquet_exec
1677 .data_source()
1678 .output_partitioning()
1679 .partition_count();
1680 assert_eq!(partition_count, 1);
1681 assert_eq!(parquet_exec.schema().as_ref(), &expected_schema);
1682
1683 let mut results = parquet_exec.execute(0, task_ctx)?;
1684 let batch = results.next().await.unwrap()?;
1685 assert_eq!(batch.schema().as_ref(), &expected_schema);
1686
1687 assert_snapshot!(batches_to_string(&[batch]),@r###"
1688 +----+----------+-------------+-------+-----+
1689 | id | bool_col | tinyint_col | month | day |
1690 +----+----------+-------------+-------+-----+
1691 | 4 | true | 0 | 10 | 26 |
1692 | 5 | false | 1 | 10 | 26 |
1693 | 6 | true | 0 | 10 | 26 |
1694 | 7 | false | 1 | 10 | 26 |
1695 | 2 | true | 0 | 10 | 26 |
1696 | 3 | false | 1 | 10 | 26 |
1697 | 0 | true | 0 | 10 | 26 |
1698 | 1 | false | 1 | 10 | 26 |
1699 +----+----------+-------------+-------+-----+
1700 "###);
1701
1702 let batch = results.next().await;
1703 assert!(batch.is_none());
1704
1705 Ok(())
1706 }
1707
1708 #[tokio::test]
1709 async fn parquet_exec_with_error() -> Result<()> {
1710 let session_ctx = SessionContext::new();
1711 let state = session_ctx.state();
1712 let location = Path::from_filesystem_path(".")
1713 .unwrap()
1714 .child("invalid.parquet");
1715
1716 let partitioned_file = PartitionedFile {
1717 object_meta: ObjectMeta {
1718 location,
1719 last_modified: Utc.timestamp_nanos(0),
1720 size: 1337,
1721 e_tag: None,
1722 version: None,
1723 },
1724 partition_values: vec![],
1725 range: None,
1726 statistics: None,
1727 extensions: None,
1728 metadata_size_hint: None,
1729 };
1730
1731 let file_schema = Arc::new(Schema::empty());
1732 let config = FileScanConfigBuilder::new(
1733 ObjectStoreUrl::local_filesystem(),
1734 file_schema,
1735 Arc::new(ParquetSource::default()),
1736 )
1737 .with_file(partitioned_file)
1738 .build();
1739
1740 let parquet_exec = DataSourceExec::from_data_source(config);
1741
1742 let mut results = parquet_exec.execute(0, state.task_ctx())?;
1743 let batch = results.next().await.unwrap();
1744 assert_contains!(batch.unwrap_err().to_string(), "invalid.parquet not found");
1746 assert!(results.next().await.is_none());
1747
1748 Ok(())
1749 }
1750
1751 #[tokio::test]
1752 async fn parquet_page_index_exec_metrics() {
1753 let c1: ArrayRef = Arc::new(Int32Array::from(vec![
1754 Some(1),
1755 None,
1756 Some(2),
1757 Some(3),
1758 Some(4),
1759 Some(5),
1760 ]));
1761 let batch1 = create_batch(vec![("int", c1.clone())]);
1762
1763 let filter = col("int").eq(lit(4_i32));
1764
1765 let rt = RoundTrip::new()
1766 .with_predicate(filter)
1767 .with_page_index_predicate()
1768 .round_trip(vec![batch1])
1769 .await;
1770
1771 let metrics = rt.parquet_exec.metrics().unwrap();
1772
1773 assert_snapshot!(batches_to_sort_string(&rt.batches.unwrap()),@r###"
1774 +-----+
1775 | int |
1776 +-----+
1777 | 4 |
1778 | 5 |
1779 +-----+
1780 "###);
1781 let (page_index_pruned, page_index_matched) =
1782 get_pruning_metric(&metrics, "page_index_rows_pruned");
1783 assert_eq!(page_index_pruned, 4);
1784 assert_eq!(page_index_matched, 2);
1785 assert!(
1786 get_value(&metrics, "page_index_eval_time") > 0,
1787 "no eval time in metrics: {metrics:#?}"
1788 );
1789 }
1790
1791 fn string_batch() -> RecordBatch {
1794 let c1: ArrayRef = Arc::new(StringArray::from(vec![
1795 Some("Foo"),
1796 None,
1797 Some("bar"),
1798 Some("bar"),
1799 Some("bar"),
1800 Some("bar"),
1801 Some("zzz"),
1802 ]));
1803
1804 create_batch(vec![("c1", c1.clone())])
1806 }
1807
1808 #[tokio::test]
1809 async fn parquet_exec_metrics() {
1810 let batch1 = string_batch();
1812
1813 let filter = col("c1").not_eq(lit("bar"));
1815
1816 let rt = RoundTrip::new()
1818 .with_predicate(filter)
1819 .with_pushdown_predicate()
1820 .round_trip(vec![batch1])
1821 .await;
1822
1823 let metrics = rt.parquet_exec.metrics().unwrap();
1824
1825 assert_snapshot!(batches_to_string(&rt.batches.unwrap()),@r###"
1827 +-----+
1828 | c1 |
1829 +-----+
1830 | Foo |
1831 | zzz |
1832 +-----+
1833 "###);
1834
1835 assert_eq!(get_value(&metrics, "pushdown_rows_pruned"), 5);
1838 assert_eq!(get_value(&metrics, "pushdown_rows_matched"), 2);
1839 assert!(
1840 get_value(&metrics, "row_pushdown_eval_time") > 0,
1841 "no pushdown eval time in metrics: {metrics:#?}"
1842 );
1843 assert!(
1844 get_value(&metrics, "statistics_eval_time") > 0,
1845 "no statistics eval time in metrics: {metrics:#?}"
1846 );
1847 assert!(
1848 get_value(&metrics, "bloom_filter_eval_time") > 0,
1849 "no Bloom Filter eval time in metrics: {metrics:#?}"
1850 );
1851 }
1852
1853 #[tokio::test]
1854 async fn parquet_exec_display() {
1855 let batch1 = string_batch();
1857
1858 let filter = col("c1").not_eq(lit("bar"));
1860
1861 let rt = RoundTrip::new()
1862 .with_predicate(filter)
1863 .with_pushdown_predicate()
1864 .round_trip(vec![batch1])
1865 .await;
1866
1867 let explain = rt.explain.unwrap();
1868
1869 assert_contains!(&explain, "predicate=c1@0 != bar");
1871
1872 assert_contains!(
1874 &explain,
1875 "row_groups_pruned_statistics=1 total \u{2192} 1 matched"
1876 );
1877
1878 assert_contains!(&explain, "projection=[c1]");
1880 }
1881
1882 #[tokio::test]
1883 async fn parquet_exec_has_no_pruning_predicate_if_can_not_prune() {
1884 let batch1 = string_batch();
1886
1887 let filter = when(col("c1").not_eq(lit("bar")), lit(true))
1893 .otherwise(lit(false))
1894 .unwrap();
1895
1896 let rt = RoundTrip::new()
1897 .with_predicate(filter.clone())
1898 .with_pushdown_predicate()
1899 .round_trip(vec![batch1])
1900 .await;
1901
1902 let explain = rt.explain.unwrap();
1904
1905 assert_contains!(
1908 &explain,
1909 "row_groups_pruned_statistics=1 total \u{2192} 1 matched"
1910 );
1911
1912 assert_contains!(
1914 &explain,
1915 "predicate=CASE WHEN c1@0 != bar THEN true ELSE false END"
1916 );
1917 assert_contains!(&explain, "pushdown_rows_pruned=5");
1918 }
1919
1920 #[tokio::test]
1921 async fn parquet_exec_has_pruning_predicate_for_guarantees() {
1922 let batch1 = string_batch();
1924
1925 let filter = col("c1").eq(lit("foo")).and(
1931 when(col("c1").not_eq(lit("bar")), lit(true))
1932 .otherwise(lit(false))
1933 .unwrap(),
1934 );
1935
1936 let rt = RoundTrip::new()
1937 .with_predicate(filter.clone())
1938 .with_pushdown_predicate()
1939 .with_bloom_filters()
1940 .round_trip(vec![batch1])
1941 .await;
1942
1943 let explain = rt.explain.unwrap();
1945 assert_contains!(
1946 &explain,
1947 "predicate=c1@0 = foo AND CASE WHEN c1@0 != bar THEN true ELSE false END"
1948 );
1949
1950 assert_contains!(&explain, "row_groups_pruned_bloom_filter=1");
1952 }
1953
1954 fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize {
1962 match metrics.sum_by_name(metric_name) {
1963 Some(v) => match v {
1964 MetricValue::PruningMetrics {
1965 pruning_metrics, ..
1966 } => pruning_metrics.pruned(),
1967 _ => v.as_usize(),
1968 },
1969 _ => {
1970 panic!(
1971 "Expected metric not found. Looking for '{metric_name}' in\n\n{metrics:#?}"
1972 );
1973 }
1974 }
1975 }
1976
1977 fn get_pruning_metric(metrics: &MetricsSet, metric_name: &str) -> (usize, usize) {
1978 match metrics.sum_by_name(metric_name) {
1979 Some(MetricValue::PruningMetrics {
1980 pruning_metrics, ..
1981 }) => (pruning_metrics.pruned(), pruning_metrics.matched()),
1982 Some(_) => panic!(
1983 "Metric '{metric_name}' is not a pruning metric in\n\n{metrics:#?}"
1984 ),
1985 None => panic!(
1986 "Expected metric not found. Looking for '{metric_name}' in\n\n{metrics:#?}"
1987 ),
1988 }
1989 }
1990
1991 fn populate_csv_partitions(
1992 tmp_dir: &TempDir,
1993 partition_count: usize,
1994 file_extension: &str,
1995 ) -> Result<SchemaRef> {
1996 let schema = Arc::new(Schema::new(vec![
1998 Field::new("c1", DataType::UInt32, false),
1999 Field::new("c2", DataType::UInt64, false),
2000 Field::new("c3", DataType::Boolean, false),
2001 ]));
2002
2003 for partition in 0..partition_count {
2005 let filename = format!("partition-{partition}.{file_extension}");
2006 let file_path = tmp_dir.path().join(filename);
2007 let mut file = File::create(file_path)?;
2008
2009 for i in 0..=10 {
2011 let data = format!("{},{},{}\n", partition, i, i % 2 == 0);
2012 file.write_all(data.as_bytes())?;
2013 }
2014 }
2015
2016 Ok(schema)
2017 }
2018
2019 #[tokio::test]
2020 async fn write_table_results() -> Result<()> {
2021 let tmp_dir = TempDir::new()?;
2023 let ctx = SessionContext::new_with_config(
2025 SessionConfig::new().with_target_partitions(8),
2026 );
2027 let schema = populate_csv_partitions(&tmp_dir, 4, ".csv")?;
2028 ctx.register_csv(
2030 "test",
2031 tmp_dir.path().to_str().unwrap(),
2032 CsvReadOptions::new().schema(&schema),
2033 )
2034 .await?;
2035
2036 let local = Arc::new(LocalFileSystem::new_with_prefix(&tmp_dir)?);
2038 let local_url = Url::parse("file://local").unwrap();
2039 ctx.register_object_store(&local_url, local);
2040
2041 let file_format = ParquetFormat::default().with_enable_pruning(true);
2043 let listing_options = ListingOptions::new(Arc::new(file_format))
2044 .with_file_extension(ParquetFormat::default().get_ext());
2045
2046 let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
2048 fs::create_dir(&out_dir).unwrap();
2049 let df = ctx.sql("SELECT c1, c2 FROM test").await?;
2050 let schema = Arc::clone(df.schema().inner());
2051 ctx.register_listing_table(
2054 "my_table",
2055 &out_dir,
2056 listing_options,
2057 Some(schema),
2058 None,
2059 )
2060 .await
2061 .unwrap();
2062 df.write_table("my_table", DataFrameWriteOptions::new())
2063 .await?;
2064
2065 let ctx = SessionContext::new();
2067
2068 let mut paths = fs::read_dir(&out_dir).unwrap();
2070 let path = paths.next();
2071 let name = path
2072 .unwrap()?
2073 .path()
2074 .file_name()
2075 .expect("Should be a file name")
2076 .to_str()
2077 .expect("Should be a str")
2078 .to_owned();
2079 let (parsed_id, _) = name.split_once('_').expect("File should contain _ !");
2080 let write_id = parsed_id.to_owned();
2081
2082 ctx.register_parquet(
2084 "part0",
2085 &format!("{out_dir}/{write_id}_0.parquet"),
2086 ParquetReadOptions::default(),
2087 )
2088 .await?;
2089
2090 ctx.register_parquet("allparts", &out_dir, ParquetReadOptions::default())
2091 .await?;
2092
2093 let part0 = ctx.sql("SELECT c1, c2 FROM part0").await?.collect().await?;
2094 let allparts = ctx
2095 .sql("SELECT c1, c2 FROM allparts")
2096 .await?
2097 .collect()
2098 .await?;
2099
2100 let allparts_count: usize = allparts.iter().map(|batch| batch.num_rows()).sum();
2101
2102 assert_eq!(part0[0].schema(), allparts[0].schema());
2103
2104 assert_eq!(allparts_count, 40);
2105
2106 Ok(())
2107 }
2108
2109 #[tokio::test]
2110 async fn test_struct_filter_parquet() -> Result<()> {
2111 let tmp_dir = TempDir::new()?;
2112 let path = tmp_dir.path().to_str().unwrap().to_string() + "/test.parquet";
2113 write_file(&path);
2114 let ctx = SessionContext::new();
2115 let opt = ListingOptions::new(Arc::new(ParquetFormat::default()));
2116 ctx.register_listing_table("base_table", path, opt, None, None)
2117 .await
2118 .unwrap();
2119 let sql = "select * from base_table where name='test02'";
2120 let batch = ctx.sql(sql).await.unwrap().collect().await.unwrap();
2121 assert_eq!(batch.len(), 1);
2122 insta::assert_snapshot!(batches_to_string(&batch),@r###"
2123 +---------------------+----+--------+
2124 | struct | id | name |
2125 +---------------------+----+--------+
2126 | {id: 4, name: aaa2} | 2 | test02 |
2127 +---------------------+----+--------+
2128 "###);
2129 Ok(())
2130 }
2131
2132 #[tokio::test]
2133 async fn test_struct_filter_parquet_with_view_types() -> Result<()> {
2134 let tmp_dir = TempDir::new().unwrap();
2135 let path = tmp_dir.path().to_str().unwrap().to_string() + "/test.parquet";
2136 write_file(&path);
2137
2138 let ctx = SessionContext::new();
2139
2140 let mut options = TableParquetOptions::default();
2141 options.global.schema_force_view_types = true;
2142 let opt =
2143 ListingOptions::new(Arc::new(ParquetFormat::default().with_options(options)));
2144
2145 ctx.register_listing_table("base_table", path, opt, None, None)
2146 .await
2147 .unwrap();
2148 let sql = "select * from base_table where name='test02'";
2149 let batch = ctx.sql(sql).await.unwrap().collect().await.unwrap();
2150 assert_eq!(batch.len(), 1);
2151 insta::assert_snapshot!(batches_to_string(&batch),@r###"
2152 +---------------------+----+--------+
2153 | struct | id | name |
2154 +---------------------+----+--------+
2155 | {id: 4, name: aaa2} | 2 | test02 |
2156 +---------------------+----+--------+
2157 "###);
2158 Ok(())
2159 }
2160
2161 fn write_file(file: &String) {
2162 let struct_fields = Fields::from(vec![
2163 Field::new("id", DataType::Int64, false),
2164 Field::new("name", DataType::Utf8, false),
2165 ]);
2166 let schema = Schema::new(vec![
2167 Field::new("struct", DataType::Struct(struct_fields.clone()), false),
2168 Field::new("id", DataType::Int64, true),
2169 Field::new("name", DataType::Utf8, false),
2170 ]);
2171 let id_array = Int64Array::from(vec![Some(1), Some(2)]);
2172 let columns = vec![
2173 Arc::new(Int64Array::from(vec![3, 4])) as _,
2174 Arc::new(StringArray::from(vec!["aaa1", "aaa2"])) as _,
2175 ];
2176 let struct_array = StructArray::new(struct_fields, columns, None);
2177
2178 let name_array = StringArray::from(vec![Some("test01"), Some("test02")]);
2179 let schema = Arc::new(schema);
2180
2181 let batch = RecordBatch::try_new(
2182 schema.clone(),
2183 vec![
2184 Arc::new(struct_array),
2185 Arc::new(id_array),
2186 Arc::new(name_array),
2187 ],
2188 )
2189 .unwrap();
2190 let file = File::create(file).unwrap();
2191 let w_opt = WriterProperties::builder().build();
2192 let mut writer = ArrowWriter::try_new(file, schema, Some(w_opt)).unwrap();
2193 writer.write(&batch).unwrap();
2194 writer.flush().unwrap();
2195 writer.close().unwrap();
2196 }
2197
2198 async fn write_batch(
2200 path: &str,
2201 store: Arc<dyn ObjectStore>,
2202 batch: RecordBatch,
2203 ) -> u64 {
2204 let mut writer =
2205 ArrowWriter::try_new(BytesMut::new().writer(), batch.schema(), None).unwrap();
2206 writer.write(&batch).unwrap();
2207 writer.flush().unwrap();
2208 let bytes = writer.into_inner().unwrap().into_inner().freeze();
2209 let total_size = bytes.len() as u64;
2210 let path = Path::from(path);
2211 let payload = object_store::PutPayload::from_bytes(bytes);
2212 store
2213 .put_opts(&path, payload, object_store::PutOptions::default())
2214 .await
2215 .unwrap();
2216 total_size
2217 }
2218
2219 #[derive(Debug, Clone)]
2221 struct TrackingParquetFileReaderFactory {
2222 inner: Arc<dyn ParquetFileReaderFactory>,
2223 metadata_size_hint_calls: Arc<Mutex<Vec<Option<usize>>>>,
2224 }
2225
2226 impl TrackingParquetFileReaderFactory {
2227 fn new(store: Arc<dyn ObjectStore>) -> Self {
2228 Self {
2229 inner: Arc::new(DefaultParquetFileReaderFactory::new(store)) as _,
2230 metadata_size_hint_calls: Arc::new(Mutex::new(vec![])),
2231 }
2232 }
2233 }
2234
2235 impl ParquetFileReaderFactory for TrackingParquetFileReaderFactory {
2236 fn create_reader(
2237 &self,
2238 partition_index: usize,
2239 partitioned_file: PartitionedFile,
2240 metadata_size_hint: Option<usize>,
2241 metrics: &ExecutionPlanMetricsSet,
2242 ) -> Result<Box<dyn parquet::arrow::async_reader::AsyncFileReader + Send>>
2243 {
2244 self.metadata_size_hint_calls
2245 .lock()
2246 .unwrap()
2247 .push(metadata_size_hint);
2248 self.inner.create_reader(
2249 partition_index,
2250 partitioned_file,
2251 metadata_size_hint,
2252 metrics,
2253 )
2254 }
2255 }
2256
2257 #[tokio::test]
2259 async fn test_metadata_size_hint() {
2260 let store =
2261 Arc::new(object_store::memory::InMemory::new()) as Arc<dyn ObjectStore>;
2262 let store_url = ObjectStoreUrl::parse("memory://test").unwrap();
2263
2264 let ctx = SessionContext::new();
2265 ctx.register_object_store(store_url.as_ref(), store.clone());
2266
2267 let c1: ArrayRef = Arc::new(Int32Array::from(vec![Some(1)]));
2269 let batch = create_batch(vec![("c1", c1)]);
2270 let schema = batch.schema();
2271 let name_1 = "test1.parquet";
2272 let name_2 = "test2.parquet";
2273 let total_size_1 = write_batch(name_1, store.clone(), batch.clone()).await;
2274 let total_size_2 = write_batch(name_2, store.clone(), batch.clone()).await;
2275
2276 let reader_factory =
2277 Arc::new(TrackingParquetFileReaderFactory::new(store.clone()));
2278
2279 let size_hint_calls = reader_factory.metadata_size_hint_calls.clone();
2280
2281 let source = Arc::new(
2282 ParquetSource::default()
2283 .with_parquet_file_reader_factory(reader_factory)
2284 .with_metadata_size_hint(456),
2285 );
2286 let config = FileScanConfigBuilder::new(store_url, schema, source)
2287 .with_file(
2288 PartitionedFile {
2289 object_meta: ObjectMeta {
2290 location: Path::from(name_1),
2291 last_modified: Utc::now(),
2292 size: total_size_1,
2293 e_tag: None,
2294 version: None,
2295 },
2296 partition_values: vec![],
2297 range: None,
2298 statistics: None,
2299 extensions: None,
2300 metadata_size_hint: None,
2301 }
2302 .with_metadata_size_hint(123),
2303 )
2304 .with_file(PartitionedFile {
2305 object_meta: ObjectMeta {
2306 location: Path::from(name_2),
2307 last_modified: Utc::now(),
2308 size: total_size_2,
2309 e_tag: None,
2310 version: None,
2311 },
2312 partition_values: vec![],
2313 range: None,
2314 statistics: None,
2315 extensions: None,
2316 metadata_size_hint: None,
2317 })
2318 .build();
2319
2320 let exec = DataSourceExec::from_data_source(config);
2321
2322 let res = collect(exec, ctx.task_ctx()).await.unwrap();
2323 assert_eq!(res.len(), 2);
2324
2325 let calls = size_hint_calls.lock().unwrap().clone();
2326 assert_eq!(calls.len(), 2);
2327 assert_eq!(calls, vec![Some(123), Some(456)]);
2328 }
2329}