datafusion_catalog_listing/
helpers.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Helper functions for the table implementation
19
20use std::mem;
21use std::sync::Arc;
22
23use datafusion_catalog::Session;
24use datafusion_common::internal_err;
25use datafusion_common::{HashMap, Result, ScalarValue};
26use datafusion_datasource::ListingTableUrl;
27use datafusion_datasource::PartitionedFile;
28use datafusion_expr::{BinaryExpr, Operator};
29
30use arrow::{
31    array::{Array, ArrayRef, AsArray, StringBuilder},
32    compute::{and, cast, prep_null_mask_filter},
33    datatypes::{DataType, Field, Fields, Schema},
34    record_batch::RecordBatch,
35};
36use datafusion_expr::execution_props::ExecutionProps;
37use futures::stream::FuturesUnordered;
38use futures::{stream::BoxStream, StreamExt, TryStreamExt};
39use log::{debug, trace};
40
41use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
42use datafusion_common::{Column, DFSchema, DataFusionError};
43use datafusion_expr::{Expr, Volatility};
44use datafusion_physical_expr::create_physical_expr;
45use object_store::path::Path;
46use object_store::{ObjectMeta, ObjectStore};
47
48/// Check whether the given expression can be resolved using only the columns `col_names`.
49/// This means that if this function returns true:
50/// - the table provider can filter the table partition values with this expression
51/// - the expression can be marked as `TableProviderFilterPushDown::Exact` once this filtering
52///   was performed
53pub fn expr_applicable_for_cols(col_names: &[&str], expr: &Expr) -> bool {
54    let mut is_applicable = true;
55    expr.apply_with_lambdas_params(|expr, lambdas_params| match expr {
56        Expr::Column(col) => {
57            is_applicable &= col_names.contains(&col.name()) || col.is_lambda_parameter(lambdas_params);
58            if is_applicable {
59                Ok(TreeNodeRecursion::Jump)
60            } else {
61                Ok(TreeNodeRecursion::Stop)
62            }
63        }
64        Expr::Literal(_, _)
65        | Expr::Alias(_)
66        | Expr::OuterReferenceColumn(_, _)
67        | Expr::ScalarVariable(_, _)
68        | Expr::Not(_)
69        | Expr::IsNotNull(_)
70        | Expr::IsNull(_)
71        | Expr::IsTrue(_)
72        | Expr::IsFalse(_)
73        | Expr::IsUnknown(_)
74        | Expr::IsNotTrue(_)
75        | Expr::IsNotFalse(_)
76        | Expr::IsNotUnknown(_)
77        | Expr::Negative(_)
78        | Expr::Cast(_)
79        | Expr::TryCast(_)
80        | Expr::BinaryExpr(_)
81        | Expr::Between(_)
82        | Expr::Like(_)
83        | Expr::SimilarTo(_)
84        | Expr::InList(_)
85        | Expr::Exists(_)
86        | Expr::InSubquery(_)
87        | Expr::ScalarSubquery(_)
88        | Expr::GroupingSet(_)
89        | Expr::Case(_)
90        | Expr::Lambda(_) => Ok(TreeNodeRecursion::Continue),
91
92        Expr::ScalarFunction(scalar_function) => {
93            match scalar_function.func.signature().volatility {
94                Volatility::Immutable => Ok(TreeNodeRecursion::Continue),
95                // TODO: Stable functions could be `applicable`, but that would require access to the context
96                Volatility::Stable | Volatility::Volatile => {
97                    is_applicable = false;
98                    Ok(TreeNodeRecursion::Stop)
99                }
100            }
101        }
102
103        // TODO other expressions are not handled yet:
104        // - AGGREGATE and WINDOW should not end up in filter conditions, except maybe in some edge cases
105        // - Can `Wildcard` be considered as a `Literal`?
106        // - ScalarVariable could be `applicable`, but that would require access to the context
107        // TODO: remove the next line after `Expr::Wildcard` is removed
108        #[expect(deprecated)]
109        Expr::AggregateFunction { .. }
110        | Expr::WindowFunction { .. }
111        | Expr::Wildcard { .. }
112        | Expr::Unnest { .. }
113        | Expr::Placeholder(_) => {
114            is_applicable = false;
115            Ok(TreeNodeRecursion::Stop)
116        }
117    })
118    .unwrap();
119    is_applicable
120}
121
122/// The maximum number of concurrent listing requests
123const CONCURRENCY_LIMIT: usize = 100;
124
125/// Partition the list of files into `n` groups
126#[deprecated(since = "47.0.0", note = "use `FileGroup::split_files` instead")]
127pub fn split_files(
128    mut partitioned_files: Vec<PartitionedFile>,
129    n: usize,
130) -> Vec<Vec<PartitionedFile>> {
131    if partitioned_files.is_empty() {
132        return vec![];
133    }
134
135    // ObjectStore::list does not guarantee any consistent order and for some
136    // implementations such as LocalFileSystem, it may be inconsistent. Thus
137    // Sort files by path to ensure consistent plans when run more than once.
138    partitioned_files.sort_by(|a, b| a.path().cmp(b.path()));
139
140    // effectively this is div with rounding up instead of truncating
141    let chunk_size = partitioned_files.len().div_ceil(n);
142    let mut chunks = Vec::with_capacity(n);
143    let mut current_chunk = Vec::with_capacity(chunk_size);
144    for file in partitioned_files.drain(..) {
145        current_chunk.push(file);
146        if current_chunk.len() == chunk_size {
147            let full_chunk =
148                mem::replace(&mut current_chunk, Vec::with_capacity(chunk_size));
149            chunks.push(full_chunk);
150        }
151    }
152
153    if !current_chunk.is_empty() {
154        chunks.push(current_chunk)
155    }
156
157    chunks
158}
159
160#[derive(Debug)]
161pub struct Partition {
162    /// The path to the partition, including the table prefix
163    path: Path,
164    /// How many path segments below the table prefix `path` contains
165    /// or equivalently the number of partition values in `path`
166    depth: usize,
167    /// The files contained as direct children of this `Partition` if known
168    files: Option<Vec<ObjectMeta>>,
169}
170
171impl Partition {
172    /// List the direct children of this partition updating `self.files` with
173    /// any child files, and returning a list of child "directories"
174    async fn list(mut self, store: &dyn ObjectStore) -> Result<(Self, Vec<Path>)> {
175        trace!("Listing partition {}", self.path);
176        let prefix = Some(&self.path).filter(|p| !p.as_ref().is_empty());
177        let result = store.list_with_delimiter(prefix).await?;
178        self.files = Some(
179            result
180                .objects
181                .into_iter()
182                .filter(|object_meta| object_meta.size > 0)
183                .collect(),
184        );
185        Ok((self, result.common_prefixes))
186    }
187}
188
189/// Returns a recursive list of the partitions in `table_path` up to `max_depth`
190pub async fn list_partitions(
191    store: &dyn ObjectStore,
192    table_path: &ListingTableUrl,
193    max_depth: usize,
194    partition_prefix: Option<Path>,
195) -> Result<Vec<Partition>> {
196    let partition = Partition {
197        path: match partition_prefix {
198            Some(prefix) => Path::from_iter(
199                Path::from(table_path.prefix().as_ref())
200                    .parts()
201                    .chain(Path::from(prefix.as_ref()).parts()),
202            ),
203            None => table_path.prefix().clone(),
204        },
205        depth: 0,
206        files: None,
207    };
208
209    let mut out = Vec::with_capacity(64);
210
211    let mut pending = vec![];
212    let mut futures = FuturesUnordered::new();
213    futures.push(partition.list(store));
214
215    while let Some((partition, paths)) = futures.next().await.transpose()? {
216        // If pending contains a future it implies prior to this iteration
217        // `futures.len == CONCURRENCY_LIMIT`. We can therefore add a single
218        // future from `pending` to the working set
219        if let Some(next) = pending.pop() {
220            futures.push(next)
221        }
222
223        let depth = partition.depth;
224        out.push(partition);
225        for path in paths {
226            let child = Partition {
227                path,
228                depth: depth + 1,
229                files: None,
230            };
231            match depth < max_depth {
232                true => match futures.len() < CONCURRENCY_LIMIT {
233                    true => futures.push(child.list(store)),
234                    false => pending.push(child.list(store)),
235                },
236                false => out.push(child),
237            }
238        }
239    }
240    Ok(out)
241}
242
243async fn prune_partitions(
244    table_path: &ListingTableUrl,
245    partitions: Vec<Partition>,
246    filters: &[Expr],
247    partition_cols: &[(String, DataType)],
248) -> Result<Vec<Partition>> {
249    if filters.is_empty() {
250        // prune partitions which don't contain the partition columns
251        return Ok(partitions
252            .into_iter()
253            .filter(|p| {
254                let cols = partition_cols.iter().map(|x| x.0.as_str());
255                !parse_partitions_for_path(table_path, &p.path, cols)
256                    .unwrap_or_default()
257                    .is_empty()
258            })
259            .collect());
260    }
261
262    let mut builders: Vec<_> = (0..partition_cols.len())
263        .map(|_| StringBuilder::with_capacity(partitions.len(), partitions.len() * 10))
264        .collect();
265
266    for partition in &partitions {
267        let cols = partition_cols.iter().map(|x| x.0.as_str());
268        let parsed = parse_partitions_for_path(table_path, &partition.path, cols)
269            .unwrap_or_default();
270
271        let mut builders = builders.iter_mut();
272        for (p, b) in parsed.iter().zip(&mut builders) {
273            b.append_value(p);
274        }
275        builders.for_each(|b| b.append_null());
276    }
277
278    let arrays = partition_cols
279        .iter()
280        .zip(builders)
281        .map(|((_, d), mut builder)| {
282            let array = builder.finish();
283            cast(&array, d)
284        })
285        .collect::<Result<_, _>>()?;
286
287    let fields: Fields = partition_cols
288        .iter()
289        .map(|(n, d)| Field::new(n, d.clone(), true))
290        .collect();
291    let schema = Arc::new(Schema::new(fields));
292
293    let df_schema = DFSchema::from_unqualified_fields(
294        partition_cols
295            .iter()
296            .map(|(n, d)| Field::new(n, d.clone(), true))
297            .collect(),
298        Default::default(),
299    )?;
300
301    let batch = RecordBatch::try_new(schema, arrays)?;
302
303    // TODO: Plumb this down
304    let props = ExecutionProps::new();
305
306    // Applies `filter` to `batch` returning `None` on error
307    let do_filter = |filter| -> Result<ArrayRef> {
308        let expr = create_physical_expr(filter, &df_schema, &props)?;
309        expr.evaluate(&batch)?.into_array(partitions.len())
310    };
311
312    //.Compute the conjunction of the filters
313    let mask = filters
314        .iter()
315        .map(|f| do_filter(f).map(|a| a.as_boolean().clone()))
316        .reduce(|a, b| Ok(and(&a?, &b?)?));
317
318    let mask = match mask {
319        Some(Ok(mask)) => mask,
320        Some(Err(err)) => return Err(err),
321        None => return Ok(partitions),
322    };
323
324    // Don't retain partitions that evaluated to null
325    let prepared = match mask.null_count() {
326        0 => mask,
327        _ => prep_null_mask_filter(&mask),
328    };
329
330    // Sanity check
331    assert_eq!(prepared.len(), partitions.len());
332
333    let filtered = partitions
334        .into_iter()
335        .zip(prepared.values())
336        .filter_map(|(p, f)| f.then_some(p))
337        .collect();
338
339    Ok(filtered)
340}
341
342#[derive(Debug)]
343enum PartitionValue {
344    Single(String),
345    Multi,
346}
347
348fn populate_partition_values<'a>(
349    partition_values: &mut HashMap<&'a str, PartitionValue>,
350    filter: &'a Expr,
351) {
352    if let Expr::BinaryExpr(BinaryExpr {
353        ref left,
354        op,
355        ref right,
356    }) = filter
357    {
358        match op {
359            Operator::Eq => match (left.as_ref(), right.as_ref()) {
360                (Expr::Column(Column { ref name, .. }), Expr::Literal(val, _))
361                | (Expr::Literal(val, _), Expr::Column(Column { ref name, .. })) => {
362                    if partition_values
363                        .insert(name, PartitionValue::Single(val.to_string()))
364                        .is_some()
365                    {
366                        partition_values.insert(name, PartitionValue::Multi);
367                    }
368                }
369                _ => {}
370            },
371            Operator::And => {
372                populate_partition_values(partition_values, left);
373                populate_partition_values(partition_values, right);
374            }
375            _ => {}
376        }
377    }
378}
379
380pub fn evaluate_partition_prefix<'a>(
381    partition_cols: &'a [(String, DataType)],
382    filters: &'a [Expr],
383) -> Option<Path> {
384    let mut partition_values = HashMap::new();
385    for filter in filters {
386        populate_partition_values(&mut partition_values, filter);
387    }
388
389    if partition_values.is_empty() {
390        return None;
391    }
392
393    let mut parts = vec![];
394    for (p, _) in partition_cols {
395        match partition_values.get(p.as_str()) {
396            Some(PartitionValue::Single(val)) => {
397                // if a partition only has a single literal value, then it can be added to the
398                // prefix
399                parts.push(format!("{p}={val}"));
400            }
401            _ => {
402                // break on the first unconstrainted partition to create a common prefix
403                // for all covered partitions.
404                break;
405            }
406        }
407    }
408
409    if parts.is_empty() {
410        None
411    } else {
412        Some(Path::from_iter(parts))
413    }
414}
415
416/// Discover the partitions on the given path and prune out files
417/// that belong to irrelevant partitions using `filters` expressions.
418/// `filters` should only contain expressions that can be evaluated
419/// using only the partition columns.
420pub async fn pruned_partition_list<'a>(
421    ctx: &'a dyn Session,
422    store: &'a dyn ObjectStore,
423    table_path: &'a ListingTableUrl,
424    filters: &'a [Expr],
425    file_extension: &'a str,
426    partition_cols: &'a [(String, DataType)],
427) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
428    // if no partition col => simply list all the files
429    if partition_cols.is_empty() {
430        if !filters.is_empty() {
431            return internal_err!(
432                "Got partition filters for unpartitioned table {}",
433                table_path
434            );
435        }
436        return Ok(Box::pin(
437            table_path
438                .list_all_files(ctx, store, file_extension)
439                .await?
440                .try_filter(|object_meta| futures::future::ready(object_meta.size > 0))
441                .map_ok(|object_meta| object_meta.into()),
442        ));
443    }
444
445    let partition_prefix = evaluate_partition_prefix(partition_cols, filters);
446
447    let partitions =
448        list_partitions(store, table_path, partition_cols.len(), partition_prefix)
449            .await?;
450    debug!("Listed {} partitions", partitions.len());
451
452    let pruned =
453        prune_partitions(table_path, partitions, filters, partition_cols).await?;
454
455    debug!("Pruning yielded {} partitions", pruned.len());
456
457    let stream = futures::stream::iter(pruned)
458        .map(move |partition: Partition| async move {
459            let cols = partition_cols.iter().map(|x| x.0.as_str());
460            let parsed = parse_partitions_for_path(table_path, &partition.path, cols);
461
462            let partition_values = parsed
463                .into_iter()
464                .flatten()
465                .zip(partition_cols)
466                .map(|(parsed, (_, datatype))| {
467                    ScalarValue::try_from_string(parsed.to_string(), datatype)
468                })
469                .collect::<Result<Vec<_>>>()?;
470
471            let files = match partition.files {
472                Some(files) => files,
473                None => {
474                    trace!("Recursively listing partition {}", partition.path);
475                    store.list(Some(&partition.path)).try_collect().await?
476                }
477            };
478            let files = files.into_iter().filter(move |o| {
479                let extension_match = o.location.as_ref().ends_with(file_extension);
480                // here need to scan subdirectories(`listing_table_ignore_subdirectory` = false)
481                let glob_match = table_path.contains(&o.location, false);
482                extension_match && glob_match
483            });
484
485            let stream = futures::stream::iter(files.map(move |object_meta| {
486                Ok(PartitionedFile {
487                    object_meta,
488                    partition_values: partition_values.clone(),
489                    range: None,
490                    statistics: None,
491                    extensions: None,
492                    metadata_size_hint: None,
493                })
494            }));
495
496            Ok::<_, DataFusionError>(stream)
497        })
498        .buffer_unordered(CONCURRENCY_LIMIT)
499        .try_flatten()
500        .boxed();
501    Ok(stream)
502}
503
504/// Extract the partition values for the given `file_path` (in the given `table_path`)
505/// associated to the partitions defined by `table_partition_cols`
506pub fn parse_partitions_for_path<'a, I>(
507    table_path: &ListingTableUrl,
508    file_path: &'a Path,
509    table_partition_cols: I,
510) -> Option<Vec<&'a str>>
511where
512    I: IntoIterator<Item = &'a str>,
513{
514    let subpath = table_path.strip_prefix(file_path)?;
515
516    let mut part_values = vec![];
517    for (part, expected_partition) in subpath.zip(table_partition_cols) {
518        match part.split_once('=') {
519            Some((name, val)) if name == expected_partition => part_values.push(val),
520            _ => {
521                debug!(
522                    "Ignoring file: file_path='{file_path}', table_path='{table_path}', part='{part}', partition_col='{expected_partition}'",
523                );
524                return None;
525            }
526        }
527    }
528    Some(part_values)
529}
530/// Describe a partition as a (path, depth, files) tuple for easier assertions
531pub fn describe_partition(partition: &Partition) -> (&str, usize, Vec<&str>) {
532    (
533        partition.path.as_ref(),
534        partition.depth,
535        partition
536            .files
537            .as_ref()
538            .map(|f| f.iter().map(|f| f.location.filename().unwrap()).collect())
539            .unwrap_or_default(),
540    )
541}
542
543#[cfg(test)]
544mod tests {
545    use async_trait::async_trait;
546    use datafusion_common::config::TableOptions;
547    use datafusion_datasource::file_groups::FileGroup;
548    use datafusion_execution::config::SessionConfig;
549    use datafusion_execution::runtime_env::RuntimeEnv;
550    use futures::FutureExt;
551    use object_store::memory::InMemory;
552    use std::any::Any;
553    use std::ops::Not;
554
555    use super::*;
556    use datafusion_expr::{
557        case, col, lit, AggregateUDF, Expr, LogicalPlan, ScalarUDF, WindowUDF,
558    };
559    use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
560    use datafusion_physical_plan::ExecutionPlan;
561
562    #[test]
563    fn test_split_files() {
564        let new_partitioned_file = |path: &str| PartitionedFile::new(path.to_owned(), 10);
565        let files = FileGroup::new(vec![
566            new_partitioned_file("a"),
567            new_partitioned_file("b"),
568            new_partitioned_file("c"),
569            new_partitioned_file("d"),
570            new_partitioned_file("e"),
571        ]);
572
573        let chunks = files.clone().split_files(1);
574        assert_eq!(1, chunks.len());
575        assert_eq!(5, chunks[0].len());
576
577        let chunks = files.clone().split_files(2);
578        assert_eq!(2, chunks.len());
579        assert_eq!(3, chunks[0].len());
580        assert_eq!(2, chunks[1].len());
581
582        let chunks = files.clone().split_files(5);
583        assert_eq!(5, chunks.len());
584        assert_eq!(1, chunks[0].len());
585        assert_eq!(1, chunks[1].len());
586        assert_eq!(1, chunks[2].len());
587        assert_eq!(1, chunks[3].len());
588        assert_eq!(1, chunks[4].len());
589
590        let chunks = files.clone().split_files(123);
591        assert_eq!(5, chunks.len());
592        assert_eq!(1, chunks[0].len());
593        assert_eq!(1, chunks[1].len());
594        assert_eq!(1, chunks[2].len());
595        assert_eq!(1, chunks[3].len());
596        assert_eq!(1, chunks[4].len());
597
598        let empty_group = FileGroup::default();
599        let chunks = empty_group.split_files(2);
600        assert_eq!(0, chunks.len());
601    }
602
603    #[tokio::test]
604    async fn test_pruned_partition_list_empty() {
605        let (store, state) = make_test_store_and_state(&[
606            ("tablepath/mypartition=val1/notparquetfile", 100),
607            ("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0),
608            ("tablepath/file.parquet", 100),
609            ("tablepath/notapartition/file.parquet", 100),
610            ("tablepath/notmypartition=val1/file.parquet", 100),
611        ]);
612        let filter = Expr::eq(col("mypartition"), lit("val1"));
613        let pruned = pruned_partition_list(
614            state.as_ref(),
615            store.as_ref(),
616            &ListingTableUrl::parse("file:///tablepath/").unwrap(),
617            &[filter],
618            ".parquet",
619            &[(String::from("mypartition"), DataType::Utf8)],
620        )
621        .await
622        .expect("partition pruning failed")
623        .collect::<Vec<_>>()
624        .await;
625
626        assert_eq!(pruned.len(), 0);
627    }
628
629    #[tokio::test]
630    async fn test_pruned_partition_list() {
631        let (store, state) = make_test_store_and_state(&[
632            ("tablepath/mypartition=val1/file.parquet", 100),
633            ("tablepath/mypartition=val2/file.parquet", 100),
634            ("tablepath/mypartition=val1/ignoresemptyfile.parquet", 0),
635            ("tablepath/mypartition=val1/other=val3/file.parquet", 100),
636            ("tablepath/notapartition/file.parquet", 100),
637            ("tablepath/notmypartition=val1/file.parquet", 100),
638        ]);
639        let filter = Expr::eq(col("mypartition"), lit("val1"));
640        let pruned = pruned_partition_list(
641            state.as_ref(),
642            store.as_ref(),
643            &ListingTableUrl::parse("file:///tablepath/").unwrap(),
644            &[filter],
645            ".parquet",
646            &[(String::from("mypartition"), DataType::Utf8)],
647        )
648        .await
649        .expect("partition pruning failed")
650        .try_collect::<Vec<_>>()
651        .await
652        .unwrap();
653
654        assert_eq!(pruned.len(), 2);
655        let f1 = &pruned[0];
656        assert_eq!(
657            f1.object_meta.location.as_ref(),
658            "tablepath/mypartition=val1/file.parquet"
659        );
660        assert_eq!(&f1.partition_values, &[ScalarValue::from("val1")]);
661        let f2 = &pruned[1];
662        assert_eq!(
663            f2.object_meta.location.as_ref(),
664            "tablepath/mypartition=val1/other=val3/file.parquet"
665        );
666        assert_eq!(f2.partition_values, &[ScalarValue::from("val1"),]);
667    }
668
669    #[tokio::test]
670    async fn test_pruned_partition_list_multi() {
671        let (store, state) = make_test_store_and_state(&[
672            ("tablepath/part1=p1v1/file.parquet", 100),
673            ("tablepath/part1=p1v2/part2=p2v1/file1.parquet", 100),
674            ("tablepath/part1=p1v2/part2=p2v1/file2.parquet", 100),
675            ("tablepath/part1=p1v3/part2=p2v1/file2.parquet", 100),
676            ("tablepath/part1=p1v2/part2=p2v2/file2.parquet", 100),
677        ]);
678        let filter1 = Expr::eq(col("part1"), lit("p1v2"));
679        let filter2 = Expr::eq(col("part2"), lit("p2v1"));
680        let pruned = pruned_partition_list(
681            state.as_ref(),
682            store.as_ref(),
683            &ListingTableUrl::parse("file:///tablepath/").unwrap(),
684            &[filter1, filter2],
685            ".parquet",
686            &[
687                (String::from("part1"), DataType::Utf8),
688                (String::from("part2"), DataType::Utf8),
689            ],
690        )
691        .await
692        .expect("partition pruning failed")
693        .try_collect::<Vec<_>>()
694        .await
695        .unwrap();
696
697        assert_eq!(pruned.len(), 2);
698        let f1 = &pruned[0];
699        assert_eq!(
700            f1.object_meta.location.as_ref(),
701            "tablepath/part1=p1v2/part2=p2v1/file1.parquet"
702        );
703        assert_eq!(
704            &f1.partition_values,
705            &[ScalarValue::from("p1v2"), ScalarValue::from("p2v1"),]
706        );
707        let f2 = &pruned[1];
708        assert_eq!(
709            f2.object_meta.location.as_ref(),
710            "tablepath/part1=p1v2/part2=p2v1/file2.parquet"
711        );
712        assert_eq!(
713            &f2.partition_values,
714            &[ScalarValue::from("p1v2"), ScalarValue::from("p2v1")]
715        );
716    }
717
718    #[tokio::test]
719    async fn test_list_partition() {
720        let (store, _) = make_test_store_and_state(&[
721            ("tablepath/part1=p1v1/file.parquet", 100),
722            ("tablepath/part1=p1v2/part2=p2v1/file1.parquet", 100),
723            ("tablepath/part1=p1v2/part2=p2v1/file2.parquet", 100),
724            ("tablepath/part1=p1v3/part2=p2v1/file3.parquet", 100),
725            ("tablepath/part1=p1v2/part2=p2v2/file4.parquet", 100),
726            ("tablepath/part1=p1v2/part2=p2v2/empty.parquet", 0),
727        ]);
728
729        let partitions = list_partitions(
730            store.as_ref(),
731            &ListingTableUrl::parse("file:///tablepath/").unwrap(),
732            0,
733            None,
734        )
735        .await
736        .expect("listing partitions failed");
737
738        assert_eq!(
739            &partitions
740                .iter()
741                .map(describe_partition)
742                .collect::<Vec<_>>(),
743            &vec![
744                ("tablepath", 0, vec![]),
745                ("tablepath/part1=p1v1", 1, vec![]),
746                ("tablepath/part1=p1v2", 1, vec![]),
747                ("tablepath/part1=p1v3", 1, vec![]),
748            ]
749        );
750
751        let partitions = list_partitions(
752            store.as_ref(),
753            &ListingTableUrl::parse("file:///tablepath/").unwrap(),
754            1,
755            None,
756        )
757        .await
758        .expect("listing partitions failed");
759
760        assert_eq!(
761            &partitions
762                .iter()
763                .map(describe_partition)
764                .collect::<Vec<_>>(),
765            &vec![
766                ("tablepath", 0, vec![]),
767                ("tablepath/part1=p1v1", 1, vec!["file.parquet"]),
768                ("tablepath/part1=p1v2", 1, vec![]),
769                ("tablepath/part1=p1v2/part2=p2v1", 2, vec![]),
770                ("tablepath/part1=p1v2/part2=p2v2", 2, vec![]),
771                ("tablepath/part1=p1v3", 1, vec![]),
772                ("tablepath/part1=p1v3/part2=p2v1", 2, vec![]),
773            ]
774        );
775
776        let partitions = list_partitions(
777            store.as_ref(),
778            &ListingTableUrl::parse("file:///tablepath/").unwrap(),
779            2,
780            None,
781        )
782        .await
783        .expect("listing partitions failed");
784
785        assert_eq!(
786            &partitions
787                .iter()
788                .map(describe_partition)
789                .collect::<Vec<_>>(),
790            &vec![
791                ("tablepath", 0, vec![]),
792                ("tablepath/part1=p1v1", 1, vec!["file.parquet"]),
793                ("tablepath/part1=p1v2", 1, vec![]),
794                ("tablepath/part1=p1v3", 1, vec![]),
795                (
796                    "tablepath/part1=p1v2/part2=p2v1",
797                    2,
798                    vec!["file1.parquet", "file2.parquet"]
799                ),
800                ("tablepath/part1=p1v2/part2=p2v2", 2, vec!["file4.parquet"]),
801                ("tablepath/part1=p1v3/part2=p2v1", 2, vec!["file3.parquet"]),
802            ]
803        );
804    }
805
806    #[test]
807    fn test_parse_partitions_for_path() {
808        assert_eq!(
809            Some(vec![]),
810            parse_partitions_for_path(
811                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
812                &Path::from("bucket/mytable/file.csv"),
813                vec![]
814            )
815        );
816        assert_eq!(
817            None,
818            parse_partitions_for_path(
819                &ListingTableUrl::parse("file:///bucket/othertable").unwrap(),
820                &Path::from("bucket/mytable/file.csv"),
821                vec![]
822            )
823        );
824        assert_eq!(
825            None,
826            parse_partitions_for_path(
827                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
828                &Path::from("bucket/mytable/file.csv"),
829                vec!["mypartition"]
830            )
831        );
832        assert_eq!(
833            Some(vec!["v1"]),
834            parse_partitions_for_path(
835                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
836                &Path::from("bucket/mytable/mypartition=v1/file.csv"),
837                vec!["mypartition"]
838            )
839        );
840        assert_eq!(
841            Some(vec!["v1"]),
842            parse_partitions_for_path(
843                &ListingTableUrl::parse("file:///bucket/mytable/").unwrap(),
844                &Path::from("bucket/mytable/mypartition=v1/file.csv"),
845                vec!["mypartition"]
846            )
847        );
848        // Only hive style partitioning supported for now:
849        assert_eq!(
850            None,
851            parse_partitions_for_path(
852                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
853                &Path::from("bucket/mytable/v1/file.csv"),
854                vec!["mypartition"]
855            )
856        );
857        assert_eq!(
858            Some(vec!["v1", "v2"]),
859            parse_partitions_for_path(
860                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
861                &Path::from("bucket/mytable/mypartition=v1/otherpartition=v2/file.csv"),
862                vec!["mypartition", "otherpartition"]
863            )
864        );
865        assert_eq!(
866            Some(vec!["v1"]),
867            parse_partitions_for_path(
868                &ListingTableUrl::parse("file:///bucket/mytable").unwrap(),
869                &Path::from("bucket/mytable/mypartition=v1/otherpartition=v2/file.csv"),
870                vec!["mypartition"]
871            )
872        );
873    }
874
875    #[test]
876    fn test_expr_applicable_for_cols() {
877        assert!(expr_applicable_for_cols(
878            &["c1"],
879            &Expr::eq(col("c1"), lit("value"))
880        ));
881        assert!(!expr_applicable_for_cols(
882            &["c1"],
883            &Expr::eq(col("c2"), lit("value"))
884        ));
885        assert!(!expr_applicable_for_cols(
886            &["c1"],
887            &Expr::eq(col("c1"), col("c2"))
888        ));
889        assert!(expr_applicable_for_cols(
890            &["c1", "c2"],
891            &Expr::eq(col("c1"), col("c2"))
892        ));
893        assert!(expr_applicable_for_cols(
894            &["c1", "c2"],
895            &(Expr::eq(col("c1"), col("c2").alias("c2_alias"))).not()
896        ));
897        assert!(expr_applicable_for_cols(
898            &["c1", "c2"],
899            &(case(col("c1"))
900                .when(lit("v1"), lit(true))
901                .otherwise(lit(false))
902                .expect("valid case expr"))
903        ));
904        // static expression not relevant in this context but we
905        // test it as an edge case anyway in case we want to generalize
906        // this helper function
907        assert!(expr_applicable_for_cols(&[], &lit(true)));
908    }
909
910    #[test]
911    fn test_evaluate_partition_prefix() {
912        let partitions = &[
913            ("a".to_string(), DataType::Utf8),
914            ("b".to_string(), DataType::Int16),
915            ("c".to_string(), DataType::Boolean),
916        ];
917
918        assert_eq!(
919            evaluate_partition_prefix(partitions, &[col("a").eq(lit("foo"))]),
920            Some(Path::from("a=foo")),
921        );
922
923        assert_eq!(
924            evaluate_partition_prefix(partitions, &[lit("foo").eq(col("a"))]),
925            Some(Path::from("a=foo")),
926        );
927
928        assert_eq!(
929            evaluate_partition_prefix(
930                partitions,
931                &[col("a").eq(lit("foo")).and(col("b").eq(lit("bar")))],
932            ),
933            Some(Path::from("a=foo/b=bar")),
934        );
935
936        assert_eq!(
937            evaluate_partition_prefix(
938                partitions,
939                // list of filters should be evaluated as AND
940                &[col("a").eq(lit("foo")), col("b").eq(lit("bar")),],
941            ),
942            Some(Path::from("a=foo/b=bar")),
943        );
944
945        assert_eq!(
946            evaluate_partition_prefix(
947                partitions,
948                &[col("a")
949                    .eq(lit("foo"))
950                    .and(col("b").eq(lit("1")))
951                    .and(col("c").eq(lit("true")))],
952            ),
953            Some(Path::from("a=foo/b=1/c=true")),
954        );
955
956        // no prefix when filter is empty
957        assert_eq!(evaluate_partition_prefix(partitions, &[]), None);
958
959        // b=foo results in no prefix because a is not restricted
960        assert_eq!(
961            evaluate_partition_prefix(partitions, &[Expr::eq(col("b"), lit("foo"))]),
962            None,
963        );
964
965        // a=foo and c=baz only results in preifx a=foo because b is not restricted
966        assert_eq!(
967            evaluate_partition_prefix(
968                partitions,
969                &[col("a").eq(lit("foo")).and(col("c").eq(lit("baz")))],
970            ),
971            Some(Path::from("a=foo")),
972        );
973
974        // partition with multiple values results in no prefix
975        assert_eq!(
976            evaluate_partition_prefix(
977                partitions,
978                &[Expr::and(col("a").eq(lit("foo")), col("a").eq(lit("bar")))],
979            ),
980            None,
981        );
982
983        // no prefix because partition a is not restricted to a single literal
984        assert_eq!(
985            evaluate_partition_prefix(
986                partitions,
987                &[Expr::or(col("a").eq(lit("foo")), col("a").eq(lit("bar")))],
988            ),
989            None,
990        );
991        assert_eq!(
992            evaluate_partition_prefix(partitions, &[col("b").lt(lit(5))],),
993            None,
994        );
995    }
996
997    #[test]
998    fn test_evaluate_date_partition_prefix() {
999        let partitions = &[("a".to_string(), DataType::Date32)];
1000        assert_eq!(
1001            evaluate_partition_prefix(
1002                partitions,
1003                &[col("a").eq(Expr::Literal(ScalarValue::Date32(Some(3)), None))],
1004            ),
1005            Some(Path::from("a=1970-01-04")),
1006        );
1007
1008        let partitions = &[("a".to_string(), DataType::Date64)];
1009        assert_eq!(
1010            evaluate_partition_prefix(
1011                partitions,
1012                &[col("a").eq(Expr::Literal(
1013                    ScalarValue::Date64(Some(4 * 24 * 60 * 60 * 1000)),
1014                    None
1015                )),],
1016            ),
1017            Some(Path::from("a=1970-01-05")),
1018        );
1019    }
1020
1021    pub fn make_test_store_and_state(
1022        files: &[(&str, u64)],
1023    ) -> (Arc<InMemory>, Arc<dyn Session>) {
1024        let memory = InMemory::new();
1025
1026        for (name, size) in files {
1027            memory
1028                .put(&Path::from(*name), vec![0; *size as usize].into())
1029                .now_or_never()
1030                .unwrap()
1031                .unwrap();
1032        }
1033
1034        (Arc::new(memory), Arc::new(MockSession {}))
1035    }
1036
1037    struct MockSession {}
1038
1039    #[async_trait]
1040    impl Session for MockSession {
1041        fn session_id(&self) -> &str {
1042            unimplemented!()
1043        }
1044
1045        fn config(&self) -> &SessionConfig {
1046            unimplemented!()
1047        }
1048
1049        async fn create_physical_plan(
1050            &self,
1051            _logical_plan: &LogicalPlan,
1052        ) -> Result<Arc<dyn ExecutionPlan>> {
1053            unimplemented!()
1054        }
1055
1056        fn create_physical_expr(
1057            &self,
1058            _expr: Expr,
1059            _df_schema: &DFSchema,
1060        ) -> Result<Arc<dyn PhysicalExpr>> {
1061            unimplemented!()
1062        }
1063
1064        fn scalar_functions(&self) -> &std::collections::HashMap<String, Arc<ScalarUDF>> {
1065            unimplemented!()
1066        }
1067
1068        fn aggregate_functions(
1069            &self,
1070        ) -> &std::collections::HashMap<String, Arc<AggregateUDF>> {
1071            unimplemented!()
1072        }
1073
1074        fn window_functions(&self) -> &std::collections::HashMap<String, Arc<WindowUDF>> {
1075            unimplemented!()
1076        }
1077
1078        fn runtime_env(&self) -> &Arc<RuntimeEnv> {
1079            unimplemented!()
1080        }
1081
1082        fn execution_props(&self) -> &ExecutionProps {
1083            unimplemented!()
1084        }
1085
1086        fn as_any(&self) -> &dyn Any {
1087            unimplemented!()
1088        }
1089
1090        fn table_options(&self) -> &TableOptions {
1091            unimplemented!()
1092        }
1093
1094        fn table_options_mut(&mut self) -> &mut TableOptions {
1095            unimplemented!()
1096        }
1097
1098        fn task_ctx(&self) -> Arc<datafusion_execution::TaskContext> {
1099            unimplemented!()
1100        }
1101    }
1102}