datafusion_pruning/
file_pruner.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! File-level pruning based on partition values and file-level statistics
19
20use std::sync::Arc;
21
22use arrow::datatypes::{FieldRef, Schema, SchemaRef};
23use datafusion_common::{
24    pruning::{
25        CompositePruningStatistics, PartitionPruningStatistics, PrunableStatistics,
26        PruningStatistics,
27    },
28    Result,
29};
30use datafusion_datasource::PartitionedFile;
31use datafusion_physical_expr_common::physical_expr::{snapshot_generation, PhysicalExpr};
32use datafusion_physical_plan::metrics::Count;
33use itertools::Itertools;
34use log::debug;
35
36use crate::build_pruning_predicate;
37
38/// Prune based on partition values and file-level statistics.
39pub struct FilePruner {
40    predicate_generation: Option<u64>,
41    predicate: Arc<dyn PhysicalExpr>,
42    /// Schema used for pruning, which combines the file schema and partition fields.
43    /// Partition fields are always at the end, as they are during scans.
44    pruning_schema: Arc<Schema>,
45    partitioned_file: PartitionedFile,
46    partition_fields: Vec<FieldRef>,
47    predicate_creation_errors: Count,
48}
49
50impl FilePruner {
51    pub fn new(
52        predicate: Arc<dyn PhysicalExpr>,
53        logical_file_schema: &SchemaRef,
54        partition_fields: Vec<FieldRef>,
55        partitioned_file: PartitionedFile,
56        predicate_creation_errors: Count,
57    ) -> Result<Self> {
58        // Build a pruning schema that combines the file fields and partition fields.
59        // Partition fields are always at the end.
60        let pruning_schema = Arc::new(
61            Schema::new(
62                logical_file_schema
63                    .fields()
64                    .iter()
65                    .cloned()
66                    .chain(partition_fields.iter().cloned())
67                    .collect_vec(),
68            )
69            .with_metadata(logical_file_schema.metadata().clone()),
70        );
71        Ok(Self {
72            // Initialize the predicate generation to None so that the first time we call `should_prune` we actually check the predicate
73            // Subsequent calls will only do work if the predicate itself has changed.
74            // See `snapshot_generation` for more info.
75            predicate_generation: None,
76            predicate,
77            pruning_schema,
78            partitioned_file,
79            partition_fields,
80            predicate_creation_errors,
81        })
82    }
83
84    pub fn should_prune(&mut self) -> Result<bool> {
85        let new_generation = snapshot_generation(&self.predicate);
86        if let Some(current_generation) = self.predicate_generation.as_mut() {
87            if *current_generation == new_generation {
88                return Ok(false);
89            }
90            *current_generation = new_generation;
91        } else {
92            self.predicate_generation = Some(new_generation);
93        }
94        let pruning_predicate = build_pruning_predicate(
95            Arc::clone(&self.predicate),
96            &self.pruning_schema,
97            &self.predicate_creation_errors,
98        );
99        if let Some(pruning_predicate) = pruning_predicate {
100            // The partition column schema is the schema of the table - the schema of the file
101            let mut pruning = Box::new(PartitionPruningStatistics::try_new(
102                vec![self.partitioned_file.partition_values.clone()],
103                self.partition_fields.clone(),
104            )?) as Box<dyn PruningStatistics>;
105            if let Some(stats) = &self.partitioned_file.statistics {
106                let stats_pruning = Box::new(PrunableStatistics::new(
107                    vec![Arc::clone(stats)],
108                    Arc::clone(&self.pruning_schema),
109                ));
110                pruning = Box::new(CompositePruningStatistics::new(vec![
111                    pruning,
112                    stats_pruning,
113                ]));
114            }
115            match pruning_predicate.prune(pruning.as_ref()) {
116                Ok(values) => {
117                    assert!(values.len() == 1);
118                    // We expect a single container -> if all containers are false skip this file
119                    if values.into_iter().all(|v| !v) {
120                        return Ok(true);
121                    }
122                }
123                // Stats filter array could not be built, so we can't prune
124                Err(e) => {
125                    debug!("Ignoring error building pruning predicate for file: {e}");
126                    self.predicate_creation_errors.add(1);
127                }
128            }
129        }
130
131        Ok(false)
132    }
133}