1use std::mem;
21use std::sync::Arc;
22
23use datafusion_catalog::Session;
24use datafusion_common::internal_err;
25use datafusion_common::{HashMap, Result, ScalarValue};
26use datafusion_datasource::ListingTableUrl;
27use datafusion_datasource::PartitionedFile;
28use datafusion_expr::{BinaryExpr, Operator};
29
30use arrow::{
31 array::{Array, ArrayRef, AsArray, StringBuilder},
32 compute::{and, cast, prep_null_mask_filter},
33 datatypes::{DataType, Field, Fields, Schema},
34 record_batch::RecordBatch,
35};
36use datafusion_expr::execution_props::ExecutionProps;
37use futures::stream::FuturesUnordered;
38use futures::{stream::BoxStream, StreamExt, TryStreamExt};
39use log::{debug, trace};
40
41use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
42use datafusion_common::{Column, DFSchema, DataFusionError};
43use datafusion_expr::{Expr, Volatility};
44use datafusion_physical_expr::create_physical_expr;
45use object_store::path::Path;
46use object_store::{ObjectMeta, ObjectStore};
47
48pub fn expr_applicable_for_cols(col_names: &[&str], expr: &Expr) -> bool {
54 let mut is_applicable = true;
55 expr.apply_with_lambdas_params(|expr, lambdas_params| match expr {
56 Expr::Column(col) => {
57 is_applicable &= col_names.contains(&col.name()) || col.is_lambda_parameter(lambdas_params);
58 if is_applicable {
59 Ok(TreeNodeRecursion::Jump)
60 } else {
61 Ok(TreeNodeRecursion::Stop)
62 }
63 }
64 Expr::Literal(_, _)
65 | Expr::Alias(_)
66 | Expr::OuterReferenceColumn(_, _)
67 | Expr::ScalarVariable(_, _)
68 | Expr::Not(_)
69 | Expr::IsNotNull(_)
70 | Expr::IsNull(_)
71 | Expr::IsTrue(_)
72 | Expr::IsFalse(_)
73 | Expr::IsUnknown(_)
74 | Expr::IsNotTrue(_)
75 | Expr::IsNotFalse(_)
76 | Expr::IsNotUnknown(_)
77 | Expr::Negative(_)
78 | Expr::Cast(_)
79 | Expr::TryCast(_)
80 | Expr::BinaryExpr(_)
81 | Expr::Between(_)
82 | Expr::Like(_)
83 | Expr::SimilarTo(_)
84 | Expr::InList(_)
85 | Expr::Exists(_)
86 | Expr::InSubquery(_)
87 | Expr::ScalarSubquery(_)
88 | Expr::GroupingSet(_)
89 | Expr::Case(_)
90 | Expr::Lambda(_) => Ok(TreeNodeRecursion::Continue),
91
92 Expr::ScalarFunction(scalar_function) => {
93 match scalar_function.func.signature().volatility {
94 Volatility::Immutable => Ok(TreeNodeRecursion::Continue),
95 Volatility::Stable | Volatility::Volatile => {
97 is_applicable = false;
98 Ok(TreeNodeRecursion::Stop)
99 }
100 }
101 }
102
103 #[expect(deprecated)]
109 Expr::AggregateFunction { .. }
110 | Expr::WindowFunction { .. }
111 | Expr::Wildcard { .. }
112 | Expr::Unnest { .. }
113 | Expr::Placeholder(_) => {
114 is_applicable = false;
115 Ok(TreeNodeRecursion::Stop)
116 }
117 })
118 .unwrap();
119 is_applicable
120}
121
122const CONCURRENCY_LIMIT: usize = 100;
124
125#[deprecated(since = "47.0.0", note = "use `FileGroup::split_files` instead")]
127pub fn split_files(
128 mut partitioned_files: Vec<PartitionedFile>,
129 n: usize,
130) -> Vec<Vec<PartitionedFile>> {
131 if partitioned_files.is_empty() {
132 return vec![];
133 }
134
135 partitioned_files.sort_by(|a, b| a.path().cmp(b.path()));
139
140 let chunk_size = partitioned_files.len().div_ceil(n);
142 let mut chunks = Vec::with_capacity(n);
143 let mut current_chunk = Vec::with_capacity(chunk_size);
144 for file in partitioned_files.drain(..) {
145 current_chunk.push(file);
146 if current_chunk.len() == chunk_size {
147 let full_chunk =
148 mem::replace(&mut current_chunk, Vec::with_capacity(chunk_size));
149 chunks.push(full_chunk);
150 }
151 }
152
153 if !current_chunk.is_empty() {
154 chunks.push(current_chunk)
155 }
156
157 chunks
158}
159
160#[derive(Debug)]
161pub struct Partition {
162 path: Path,
164 depth: usize,
167 files: Option<Vec<ObjectMeta>>,
169}
170
171impl Partition {
172 async fn list(mut self, store: &dyn ObjectStore) -> Result<(Self, Vec<Path>)> {
175 trace!("Listing partition {}", self.path);
176 let prefix = Some(&self.path).filter(|p| !p.as_ref().is_empty());
177 let result = store.list_with_delimiter(prefix).await?;
178 self.files = Some(
179 result
180 .objects
181 .into_iter()
182 .filter(|object_meta| object_meta.size > 0)
183 .collect(),
184 );
185 Ok((self, result.common_prefixes))
186 }
187}
188
189pub async fn list_partitions(
191 store: &dyn ObjectStore,
192 table_path: &ListingTableUrl,
193 max_depth: usize,
194 partition_prefix: Option<Path>,
195) -> Result<Vec<Partition>> {
196 let partition = Partition {
197 path: match partition_prefix {
198 Some(prefix) => Path::from_iter(
199 Path::from(table_path.prefix().as_ref())
200 .parts()
201 .chain(Path::from(prefix.as_ref()).parts()),
202 ),
203 None => table_path.prefix().clone(),
204 },
205 depth: 0,
206 files: None,
207 };
208
209 let mut out = Vec::with_capacity(64);
210
211 let mut pending = vec![];
212 let mut futures = FuturesUnordered::new();
213 futures.push(partition.list(store));
214
215 while let Some((partition, paths)) = futures.next().await.transpose()? {
216 if let Some(next) = pending.pop() {
220 futures.push(next)
221 }
222
223 let depth = partition.depth;
224 out.push(partition);
225 for path in paths {
226 let child = Partition {
227 path,
228 depth: depth + 1,
229 files: None,
230 };
231 match depth < max_depth {
232 true => match futures.len() < CONCURRENCY_LIMIT {
233 true => futures.push(child.list(store)),
234 false => pending.push(child.list(store)),
235 },
236 false => out.push(child),
237 }
238 }
239 }
240 Ok(out)
241}
242
243async fn prune_partitions(
244 table_path: &ListingTableUrl,
245 partitions: Vec<Partition>,
246 filters: &[Expr],
247 partition_cols: &[(String, DataType)],
248) -> Result<Vec<Partition>> {
249 if filters.is_empty() {
250 return Ok(partitions
252 .into_iter()
253 .filter(|p| {
254 let cols = partition_cols.iter().map(|x| x.0.as_str());
255 !parse_partitions_for_path(table_path, &p.path, cols)
256 .unwrap_or_default()
257 .is_empty()
258 })
259 .collect());
260 }
261
262 let mut builders: Vec<_> = (0..partition_cols.len())
263 .map(|_| StringBuilder::with_capacity(partitions.len(), partitions.len() * 10))
264 .collect();
265
266 for partition in &partitions {
267 let cols = partition_cols.iter().map(|x| x.0.as_str());
268 let parsed = parse_partitions_for_path(table_path, &partition.path, cols)
269 .unwrap_or_default();
270
271 let mut builders = builders.iter_mut();
272 for (p, b) in parsed.iter().zip(&mut builders) {
273 b.append_value(p);
274 }
275 builders.for_each(|b| b.append_null());
276 }
277
278 let arrays = partition_cols
279 .iter()
280 .zip(builders)
281 .map(|((_, d), mut builder)| {
282 let array = builder.finish();
283 cast(&array, d)
284 })
285 .collect::<Result<_, _>>()?;
286
287 let fields: Fields = partition_cols
288 .iter()
289 .map(|(n, d)| Field::new(n, d.clone(), true))
290 .collect();
291 let schema = Arc::new(Schema::new(fields));
292
293 let df_schema = DFSchema::from_unqualified_fields(
294 partition_cols
295 .iter()
296 .map(|(n, d)| Field::new(n, d.clone(), true))
297 .collect(),
298 Default::default(),
299 )?;
300
301 let batch = RecordBatch::try_new(schema, arrays)?;
302
303 let props = ExecutionProps::new();
305
306 let do_filter = |filter| -> Result<ArrayRef> {
308 let expr = create_physical_expr(filter, &df_schema, &props)?;
309 expr.evaluate(&batch)?.into_array(partitions.len())
310 };
311
312 let mask = filters
314 .iter()
315 .map(|f| do_filter(f).map(|a| a.as_boolean().clone()))
316 .reduce(|a, b| Ok(and(&a?, &b?)?));
317
318 let mask = match mask {
319 Some(Ok(mask)) => mask,
320 Some(Err(err)) => return Err(err),
321 None => return Ok(partitions),
322 };
323
324 let prepared = match mask.null_count() {
326 0 => mask,
327 _ => prep_null_mask_filter(&mask),
328 };
329
330 assert_eq!(prepared.len(), partitions.len());
332
333 let filtered = partitions
334 .into_iter()
335 .zip(prepared.values())
336 .filter_map(|(p, f)| f.then_some(p))
337 .collect();
338
339 Ok(filtered)
340}
341
342#[derive(Debug)]
343enum PartitionValue {
344 Single(String),
345 Multi,
346}
347
348fn populate_partition_values<'a>(
349 partition_values: &mut HashMap<&'a str, PartitionValue>,
350 filter: &'a Expr,
351) {
352 if let Expr::BinaryExpr(BinaryExpr {
353 ref left,
354 op,
355 ref right,
356 }) = filter
357 {
358 match op {
359 Operator::Eq => match (left.as_ref(), right.as_ref()) {
360 (Expr::Column(Column { ref name, .. }), Expr::Literal(val, _))
361 | (Expr::Literal(val, _), Expr::Column(Column { ref name, .. })) => {
362 if partition_values
363 .insert(name, PartitionValue::Single(val.to_string()))
364 .is_some()
365 {
366 partition_values.insert(name, PartitionValue::Multi);
367 }
368 }
369 _ => {}
370 },
371 Operator::And => {
372 populate_partition_values(partition_values, left);
373 populate_partition_values(partition_values, right);
374 }
375 _ => {}
376 }
377 }
378}
379
380pub fn evaluate_partition_prefix<'a>(
381 partition_cols: &'a [(String, DataType)],
382 filters: &'a [Expr],
383) -> Option<Path> {
384 let mut partition_values = HashMap::new();
385 for filter in filters {
386 populate_partition_values(&mut partition_values, filter);
387 }
388
389 if partition_values.is_empty() {
390 return None;
391 }
392
393 let mut parts = vec![];
394 for (p, _) in partition_cols {
395 match partition_values.get(p.as_str()) {
396 Some(PartitionValue::Single(val)) => {
397 parts.push(format!("{p}={val}"));
400 }
401 _ => {
402 break;
405 }
406 }
407 }
408
409 if parts.is_empty() {
410 None
411 } else {
412 Some(Path::from_iter(parts))
413 }
414}
415
416pub async fn pruned_partition_list<'a>(
421 ctx: &'a dyn Session,
422 store: &'a dyn ObjectStore,
423 table_path: &'a ListingTableUrl,
424 filters: &'a [Expr],
425 file_extension: &'a str,
426 partition_cols: &'a [(String, DataType)],
427) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
428 if partition_cols.is_empty() {
430 if !filters.is_empty() {
431 return internal_err!(
432 "Got partition filters for unpartitioned table {}",
433 table_path
434 );
435 }
436 return Ok(Box::pin(
437 table_path
438 .list_all_files(ctx, store, file_extension)
439 .await?
440 .try_filter(|object_meta| futures::future::ready(object_meta.size > 0))
441 .map_ok(|object_meta| object_meta.into()),
442 ));
443 }
444
445 let partition_prefix = evaluate_partition_prefix(partition_cols, filters);
446
447 let partitions =
448 list_partitions(store, table_path, partition_cols.len(), partition_prefix)
449 .await?;
450 debug!("Listed {} partitions", partitions.len());
451
452 let pruned =
453 prune_partitions(table_path, partitions, filters, partition_cols).await?;
454
455 debug!("Pruning yielded {} partitions", pruned.len());
456
457 let stream = futures::stream::iter(pruned)
458 .map(move |partition: Partition| async move {
459 let cols = partition_cols.iter().map(|x| x.0.as_str());
460 let parsed = parse_partitions_for_path(table_path, &partition.path, cols);
461
462 let partition_values = parsed
463 .into_iter()
464 .flatten()
465 .zip(partition_cols)
466 .map(|(parsed, (_, datatype))| {
467 ScalarValue::try_from_string(parsed.to_string(), datatype)
468 })
469 .collect::<Result<Vec<_>>>()?;
470
471 let files = match partition.files {
472 Some(files) => files,
473 None => {
474 trace!("Recursively listing partition {}", partition.path);
475 store.list(Some(&partition.path)).try_collect().await?
476 }
477 };
478 let files = files.into_iter().filter(move |o| {
479 let extension_match = o.location.as_ref().ends_with(file_extension);
480 let glob_match = table_path.contains(&o.location, false);
482 extension_match && glob_match
483 });
484
485 let stream = futures::stream::iter(files.map(move |object_meta| {
486 Ok(PartitionedFile {
487 object_meta,
488 partition_values: partition_values.clone(),
489 range: None,
490 statistics: None,
491 extensions: None,
492 metadata_size_hint: None,
493 })
494 }));
495
496 Ok::<_, DataFusionError>(stream)
497 })
498 .buffer_unordered(CONCURRENCY_LIMIT)
499 .try_flatten()
500 .boxed();
501 Ok(stream)
502}
503
504pub fn parse_partitions_for_path<'a, I>(
507 table_path: &ListingTableUrl,
508 file_path: &'a Path,
509 table_partition_cols: I,
510) -> Option<Vec<&'a str>>
511where
512 I: IntoIterator<Item = &'a str>,
513{
514 let subpath = table_path.strip_prefix(file_path)?;
515
516 let mut part_values = vec![];
517 for (part, expected_partition) in subpath.zip(table_partition_cols) {
518 match part.split_once('=') {
519 Some((name, val)) if name == expected_partition => part_values.push(val),
520 _ => {
521 debug!(
522 "Ignoring file: file_path='{file_path}', table_path='{table_path}', part='{part}', partition_col='{expected_partition}'",
523 );
524 return None;
525 }
526 }
527 }
528 Some(part_values)
529}
530pub fn describe_partition(partition: &Partition) -> (&str, usize, Vec<&str>) {
532 (
533 partition.path.as_ref(),
534 partition.depth,
535 partition
536 .files
537 .as_ref()
538 .map(|f| f.iter().map(|f| f.location.filename().unwrap()).collect())
539 .unwrap_or_default(),
540 )
541}
542
543#[cfg(test)]
544mod tests {
545 use async_trait::async_trait;
546 use datafusion_common::config::TableOptions;
547 use datafusion_datasource::file_groups::FileGroup;
548 use datafusion_execution::config::SessionConfig;
549 use datafusion_execution::runtime_env::RuntimeEnv;
550 use futures::FutureExt;
551 use object_store::memory::InMemory;
552 use std::any::Any;
553 use std::ops::Not;
554
555 use super::*;
556 use datafusion_expr::{
557 case, col, lit, AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF,
558 };
559 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
560 use datafusion_physical_plan::ExecutionPlan;
561
562 #[test]
563 fn test_split_files() {
564 let new_partitioned_file = |path: &str| PartitionedFile::new(path.to_owned(), 10);
565 let files = FileGroup::new(vec![
566 new_partitioned_file("a"),
567 new_partitioned_file("b"),
568 new_partitioned_file("c"),
569 new_partitioned_file("d"),
570 new_partitioned_file("e"),
571 ]);
572
573 let chunks = files.clone().split_files(1);
574 assert_eq!(1, chunks.len());
575 assert_eq!(5, chunks[0].len());
576
577 let chunks = files.clone().split_files(2);
578 assert_eq!(2, chunks.len());
579 assert_eq!(3, chunks[0].len());
580 assert_eq!(2, chunks[1].len());
581
582 let chunks = files.clone().split_files(5);
583 assert_eq!(5, chunks.len());
584 assert_eq!(1, chunks[0].len());
585 assert_eq!(1, chunks[1].len());
586 assert_eq!(1, chunks[2].len());
587 assert_eq!(1, chunks[3].len());
588 assert_eq!(1, chunks[4].len());
589
590 let chunks = files.clone().split_files(123);
591 assert_eq!(5, chunks.len());
592 assert_eq!(1, chunks[0].len());
593 assert_eq!(1, chunks[1].len());
594 assert_eq!(1, chunks[2].len());
595 assert_eq!(1, chunks[3].len());
596 assert_eq!(1, chunks[4].len());
597
598 let empty_group = FileGroup::default();
599 let chunks = empty_group.split_files(2);
600 assert_eq!(0, chunks.len());
601 }
602
603 #[tokio::test]
604 async fn test_pruned_partition_list_empty() {
605 let (store, state) = make_test_store_and_state(&[
606 ("tablepath/mypartition=val1/notparquetfile", 100),
607 ("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0),
608 ("tablepath/file.parquet", 100),
609 ("tablepath/notapartition/file.parquet", 100),
610 ("tablepath/notmypartition=val1/file.parquet", 100),
611 ]);
612 let filter = Expr::eq(col("mypartition"), lit("val1"));
613 let pruned = pruned_partition_list(
614 state.as_ref(),
615 store.as_ref(),
616 &ListingTableUrl::parse("file:///tablepath/").unwrap(),
617 &[filter],
618 ".parquet",
619 &[(String::from("mypartition"), DataType::Utf8)],
620 )
621 .await
622 .expect("partition pruning failed")
623 .collect::<Vec<_>>()
624 .await;
625
626 assert_eq!(pruned.len(), 0);
627 }
628
629 #[tokio::test]
630 async fn test_pruned_partition_list() {
631 let (store, state) = make_test_store_and_state(&[
632 ("tablepath/mypartition=val1/file.parquet", 100),
633 ("tablepath/mypartition=val2/file.parquet", 100),
634 ("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0),
635 ("tablepath/mypartition=val1/other=val3/file.parquet", 100),
636 ("tablepath/notapartition/file.parquet", 100),
637 ("tablepath/notmypartition=val1/file.parquet", 100),
638 ]);
639 let filter = Expr::eq(col("mypartition"), lit("val1"));
640 let pruned = pruned_partition_list(
641 state.as_ref(),
642 store.as_ref(),
643 &ListingTableUrl::parse("file:///tablepath/").unwrap(),
644 &[filter],
645 ".parquet",
646 &[(String::from("mypartition"), DataType::Utf8)],
647 )
648 .await
649 .expect("partition pruning failed")
650 .try_collect::<Vec<_>>()
651 .await
652 .unwrap();
653
654 assert_eq!(pruned.len(), 2);
655 let f1 = &pruned[0];
656 assert_eq!(
657 f1.object_meta.location.as_ref(),
658 "tablepath/mypartition=val1/file.parquet"
659 );
660 assert_eq!(&f1.partition_values, &[ScalarValue::from("val1")]);
661 let f2 = &pruned[1];
662 assert_eq!(
663 f2.object_meta.location.as_ref(),
664 "tablepath/mypartition=val1/other=val3/file.parquet"
665 );
666 assert_eq!(f2.partition_values, &[ScalarValue::from("val1"),]);
667 }
668
669 #[tokio::test]
670 async fn test_pruned_partition_list_multi() {
671 let (store, state) = make_test_store_and_state(&[
672 ("tablepath/part1=p1v1/file.parquet", 100),
673 ("tablepath/part1=p1v2/part2=p2v1/file1.parquet", 100),
674 ("tablepath/part1=p1v2/part2=p2v1/file2.parquet", 100),
675 ("tablepath/part1=p1v3/part2=p2v1/file2.parquet", 100),
676 ("tablepath/part1=p1v2/part2=p2v2/file2.parquet", 100),
677 ]);
678 let filter1 = Expr::eq(col("part1"), lit("p1v2"));
679 let filter2 = Expr::eq(col("part2"), lit("p2v1"));
680 let pruned = pruned_partition_list(
681 state.as_ref(),
682 store.as_ref(),
683 &ListingTableUrl::parse("file:///tablepath/").unwrap(),
684 &[filter1, filter2],
685 ".parquet",
686 &[
687 (String::from("part1"), DataType::Utf8),
688 (String::from("part2"), DataType::Utf8),
689 ],
690 )
691 .await
692 .expect("partition pruning failed")
693 .try_collect::<Vec<_>>()
694 .await
695 .unwrap();
696
697 assert_eq!(pruned.len(), 2);
698 let f1 = &pruned[0];
699 assert_eq!(
700 f1.object_meta.location.as_ref(),
701 "tablepath/part1=p1v2/part2=p2v1/file1.parquet"
702 );
703 assert_eq!(
704 &f1.partition_values,
705 &[ScalarValue::from("p1v2"), ScalarValue::from("p2v1"),]
706 );
707 let f2 = &pruned[1];
708 assert_eq!(
709 f2.object_meta.location.as_ref(),
710 "tablepath/part1=p1v2/part2=p2v1/file2.parquet"
711 );
712 assert_eq!(
713 &f2.partition_values,
714 &[ScalarValue::from("p1v2"), ScalarValue::from("p2v1")]
715 );
716 }
717
718 #[tokio::test]
719 async fn test_list_partition() {
720 let (store, _) = make_test_store_and_state(&[
721 ("tablepath/part1=p1v1/file.parquet", 100),
722 ("tablepath/part1=p1v2/part2=p2v1/file1.parquet", 100),
723 ("tablepath/part1=p1v2/part2=p2v1/file2.parquet", 100),
724 ("tablepath/part1=p1v3/part2=p2v1/file3.parquet", 100),
725 ("tablepath/part1=p1v2/part2=p2v2/file4.parquet", 100),
726 ("tablepath/part1=p1v2/part2=p2v2/empty.parquet", 0),
727 ]);
728
729 let partitions = list_partitions(
730 store.as_ref(),
731 &ListingTableUrl::parse("file:///tablepath/").unwrap(),
732 0,
733 None,
734 )
735 .await
736 .expect("listing partitions failed");
737
738 assert_eq!(
739 &partitions
740 .iter()
741 .map(describe_partition)
742 .collect::<Vec<_>>(),
743 &vec![
744 ("tablepath", 0, vec![]),
745 ("tablepath/part1=p1v1", 1, vec![]),
746 ("tablepath/part1=p1v2", 1, vec![]),
747 ("tablepath/part1=p1v3", 1, vec![]),
748 ]
749 );
750
751 let partitions = list_partitions(
752 store.as_ref(),
753 &ListingTableUrl::parse("file:///tablepath/").unwrap(),
754 1,
755 None,
756 )
757 .await
758 .expect("listing partitions failed");
759
760 assert_eq!(
761 &partitions
762 .iter()
763 .map(describe_partition)
764 .collect::<Vec<_>>(),
765 &vec![
766 ("tablepath", 0, vec![]),
767 ("tablepath/part1=p1v1", 1, vec!["file.parquet"]),
768 ("tablepath/part1=p1v2", 1, vec![]),
769 ("tablepath/part1=p1v2/part2=p2v1", 2, vec![]),
770 ("tablepath/part1=p1v2/part2=p2v2", 2, vec![]),
771 ("tablepath/part1=p1v3", 1, vec![]),
772 ("tablepath/part1=p1v3/part2=p2v1", 2, vec![]),
773 ]
774 );
775
776 let partitions = list_partitions(
777 store.as_ref(),
778 &ListingTableUrl::parse("file:///tablepath/").unwrap(),
779 2,
780 None,
781 )
782 .await
783 .expect("listing partitions failed");
784
785 assert_eq!(
786 &partitions
787 .iter()
788 .map(describe_partition)
789 .collect::<Vec<_>>(),
790 &vec![
791 ("tablepath", 0, vec![]),
792 ("tablepath/part1=p1v1", 1, vec!["file.parquet"]),
793 ("tablepath/part1=p1v2", 1, vec![]),
794 ("tablepath/part1=p1v3", 1, vec![]),
795 (
796 "tablepath/part1=p1v2/part2=p2v1",
797 2,
798 vec!["file1.parquet", "file2.parquet"]
799 ),
800 ("tablepath/part1=p1v2/part2=p2v2", 2, vec!["file4.parquet"]),
801 ("tablepath/part1=p1v3/part2=p2v1", 2, vec!["file3.parquet"]),
802 ]
803 );
804 }
805
806 #[test]
807 fn test_parse_partitions_for_path() {
808 assert_eq!(
809 Some(vec![]),
810 parse_partitions_for_path(
811 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
812 &Path::from("bucket/mytable/file.csv"),
813 vec![]
814 )
815 );
816 assert_eq!(
817 None,
818 parse_partitions_for_path(
819 &ListingTableUrl::parse("file:///bucket/othertable").unwrap(),
820 &Path::from("bucket/mytable/file.csv"),
821 vec![]
822 )
823 );
824 assert_eq!(
825 None,
826 parse_partitions_for_path(
827 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
828 &Path::from("bucket/mytable/file.csv"),
829 vec!["mypartition"]
830 )
831 );
832 assert_eq!(
833 Some(vec!["v1"]),
834 parse_partitions_for_path(
835 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
836 &Path::from("bucket/mytable/mypartition=v1/file.csv"),
837 vec!["mypartition"]
838 )
839 );
840 assert_eq!(
841 Some(vec!["v1"]),
842 parse_partitions_for_path(
843 &ListingTableUrl::parse("file:///bucket/mytable/").unwrap(),
844 &Path::from("bucket/mytable/mypartition=v1/file.csv"),
845 vec!["mypartition"]
846 )
847 );
848 assert_eq!(
850 None,
851 parse_partitions_for_path(
852 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
853 &Path::from("bucket/mytable/v1/file.csv"),
854 vec!["mypartition"]
855 )
856 );
857 assert_eq!(
858 Some(vec!["v1", "v2"]),
859 parse_partitions_for_path(
860 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
861 &Path::from("bucket/mytable/mypartition=v1/otherpartition=v2/file.csv"),
862 vec!["mypartition", "otherpartition"]
863 )
864 );
865 assert_eq!(
866 Some(vec!["v1"]),
867 parse_partitions_for_path(
868 &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
869 &Path::from("bucket/mytable/mypartition=v1/otherpartition=v2/file.csv"),
870 vec!["mypartition"]
871 )
872 );
873 }
874
875 #[test]
876 fn test_expr_applicable_for_cols() {
877 assert!(expr_applicable_for_cols(
878 &["c1"],
879 &Expr::eq(col("c1"), lit("value"))
880 ));
881 assert!(!expr_applicable_for_cols(
882 &["c1"],
883 &Expr::eq(col("c2"), lit("value"))
884 ));
885 assert!(!expr_applicable_for_cols(
886 &["c1"],
887 &Expr::eq(col("c1"), col("c2"))
888 ));
889 assert!(expr_applicable_for_cols(
890 &["c1", "c2"],
891 &Expr::eq(col("c1"), col("c2"))
892 ));
893 assert!(expr_applicable_for_cols(
894 &["c1", "c2"],
895 &(Expr::eq(col("c1"), col("c2").alias("c2_alias"))).not()
896 ));
897 assert!(expr_applicable_for_cols(
898 &["c1", "c2"],
899 &(case(col("c1"))
900 .when(lit("v1"), lit(true))
901 .otherwise(lit(false))
902 .expect("valid case expr"))
903 ));
904 assert!(expr_applicable_for_cols(&[], &lit(true)));
908 }
909
910 #[test]
911 fn test_evaluate_partition_prefix() {
912 let partitions = &[
913 ("a".to_string(), DataType::Utf8),
914 ("b".to_string(), DataType::Int16),
915 ("c".to_string(), DataType::Boolean),
916 ];
917
918 assert_eq!(
919 evaluate_partition_prefix(partitions, &[col("a").eq(lit("foo"))]),
920 Some(Path::from("a=foo")),
921 );
922
923 assert_eq!(
924 evaluate_partition_prefix(partitions, &[lit("foo").eq(col("a"))]),
925 Some(Path::from("a=foo")),
926 );
927
928 assert_eq!(
929 evaluate_partition_prefix(
930 partitions,
931 &[col("a").eq(lit("foo")).and(col("b").eq(lit("bar")))],
932 ),
933 Some(Path::from("a=foo/b=bar")),
934 );
935
936 assert_eq!(
937 evaluate_partition_prefix(
938 partitions,
939 &[col("a").eq(lit("foo")), col("b").eq(lit("bar")),],
941 ),
942 Some(Path::from("a=foo/b=bar")),
943 );
944
945 assert_eq!(
946 evaluate_partition_prefix(
947 partitions,
948 &[col("a")
949 .eq(lit("foo"))
950 .and(col("b").eq(lit("1")))
951 .and(col("c").eq(lit("true")))],
952 ),
953 Some(Path::from("a=foo/b=1/c=true")),
954 );
955
956 assert_eq!(evaluate_partition_prefix(partitions, &[]), None);
958
959 assert_eq!(
961 evaluate_partition_prefix(partitions, &[Expr::eq(col("b"), lit("foo"))]),
962 None,
963 );
964
965 assert_eq!(
967 evaluate_partition_prefix(
968 partitions,
969 &[col("a").eq(lit("foo")).and(col("c").eq(lit("baz")))],
970 ),
971 Some(Path::from("a=foo")),
972 );
973
974 assert_eq!(
976 evaluate_partition_prefix(
977 partitions,
978 &[Expr::and(col("a").eq(lit("foo")), col("a").eq(lit("bar")))],
979 ),
980 None,
981 );
982
983 assert_eq!(
985 evaluate_partition_prefix(
986 partitions,
987 &[Expr::or(col("a").eq(lit("foo")), col("a").eq(lit("bar")))],
988 ),
989 None,
990 );
991 assert_eq!(
992 evaluate_partition_prefix(partitions, &[col("b").lt(lit(5))],),
993 None,
994 );
995 }
996
997 #[test]
998 fn test_evaluate_date_partition_prefix() {
999 let partitions = &[("a".to_string(), DataType::Date32)];
1000 assert_eq!(
1001 evaluate_partition_prefix(
1002 partitions,
1003 &[col("a").eq(Expr::Literal(ScalarValue::Date32(Some(3)), None))],
1004 ),
1005 Some(Path::from("a=1970-01-04")),
1006 );
1007
1008 let partitions = &[("a".to_string(), DataType::Date64)];
1009 assert_eq!(
1010 evaluate_partition_prefix(
1011 partitions,
1012 &[col("a").eq(Expr::Literal(
1013 ScalarValue::Date64(Some(4 * 24 * 60 * 60 * 1000)),
1014 None
1015 )),],
1016 ),
1017 Some(Path::from("a=1970-01-05")),
1018 );
1019 }
1020
1021 pub fn make_test_store_and_state(
1022 files: &[(&str, u64)],
1023 ) -> (Arc<InMemory>, Arc<dyn Session>) {
1024 let memory = InMemory::new();
1025
1026 for (name, size) in files {
1027 memory
1028 .put(&Path::from(*name), vec![0; *size as usize].into())
1029 .now_or_never()
1030 .unwrap()
1031 .unwrap();
1032 }
1033
1034 (Arc::new(memory), Arc::new(MockSession {}))
1035 }
1036
1037 struct MockSession {}
1038
1039 #[async_trait]
1040 impl Session for MockSession {
1041 fn session_id(&self) -> &str {
1042 unimplemented!()
1043 }
1044
1045 fn config(&self) -> &SessionConfig {
1046 unimplemented!()
1047 }
1048
1049 async fn create_physical_plan(
1050 &self,
1051 _logical_plan: &LogicalPlan,
1052 ) -> Result<Arc<dyn ExecutionPlan>> {
1053 unimplemented!()
1054 }
1055
1056 fn create_physical_expr(
1057 &self,
1058 _expr: Expr,
1059 _df_schema: &DFSchema,
1060 ) -> Result<Arc<dyn PhysicalExpr>> {
1061 unimplemented!()
1062 }
1063
1064 fn scalar_functions(&self) -> &std::collections::HashMap<String, Arc<ScalarUDF>> {
1065 unimplemented!()
1066 }
1067
1068 fn aggregate_functions(
1069 &self,
1070 ) -> &std::collections::HashMap<String, Arc<AggregateUDF>> {
1071 unimplemented!()
1072 }
1073
1074 fn window_functions(&self) -> &std::collections::HashMap<String, Arc<WindowUDF>> {
1075 unimplemented!()
1076 }
1077
1078 fn runtime_env(&self) -> &Arc<RuntimeEnv> {
1079 unimplemented!()
1080 }
1081
1082 fn execution_props(&self) -> &ExecutionProps {
1083 unimplemented!()
1084 }
1085
1086 fn as_any(&self) -> &dyn Any {
1087 unimplemented!()
1088 }
1089
1090 fn table_options(&self) -> &TableOptions {
1091 unimplemented!()
1092 }
1093
1094 fn table_options_mut(&mut self) -> &mut TableOptions {
1095 unimplemented!()
1096 }
1097
1098 fn task_ctx(&self) -> Arc<datafusion_execution::TaskContext> {
1099 unimplemented!()
1100 }
1101 }
1102}