datafusion_pruning/
file_pruner.rs1use std::sync::Arc;
21
22use arrow::datatypes::{FieldRef, Schema, SchemaRef};
23use datafusion_common::{
24 pruning::{
25 CompositePruningStatistics, PartitionPruningStatistics, PrunableStatistics,
26 PruningStatistics,
27 },
28 Result,
29};
30use datafusion_datasource::PartitionedFile;
31use datafusion_physical_expr_common::physical_expr::{snapshot_generation, PhysicalExpr};
32use datafusion_physical_plan::metrics::Count;
33use itertools::Itertools;
34use log::debug;
35
36use crate::build_pruning_predicate;
37
38pub struct FilePruner {
40 predicate_generation: Option<u64>,
41 predicate: Arc<dyn PhysicalExpr>,
42 pruning_schema: Arc<Schema>,
45 partitioned_file: PartitionedFile,
46 partition_fields: Vec<FieldRef>,
47 predicate_creation_errors: Count,
48}
49
50impl FilePruner {
51 pub fn new(
52 predicate: Arc<dyn PhysicalExpr>,
53 logical_file_schema: &SchemaRef,
54 partition_fields: Vec<FieldRef>,
55 partitioned_file: PartitionedFile,
56 predicate_creation_errors: Count,
57 ) -> Result<Self> {
58 let pruning_schema = Arc::new(
61 Schema::new(
62 logical_file_schema
63 .fields()
64 .iter()
65 .cloned()
66 .chain(partition_fields.iter().cloned())
67 .collect_vec(),
68 )
69 .with_metadata(logical_file_schema.metadata().clone()),
70 );
71 Ok(Self {
72 predicate_generation: None,
76 predicate,
77 pruning_schema,
78 partitioned_file,
79 partition_fields,
80 predicate_creation_errors,
81 })
82 }
83
84 pub fn should_prune(&mut self) -> Result<bool> {
85 let new_generation = snapshot_generation(&self.predicate);
86 if let Some(current_generation) = self.predicate_generation.as_mut() {
87 if *current_generation == new_generation {
88 return Ok(false);
89 }
90 *current_generation = new_generation;
91 } else {
92 self.predicate_generation = Some(new_generation);
93 }
94 let pruning_predicate = build_pruning_predicate(
95 Arc::clone(&self.predicate),
96 &self.pruning_schema,
97 &self.predicate_creation_errors,
98 );
99 if let Some(pruning_predicate) = pruning_predicate {
100 let mut pruning = Box::new(PartitionPruningStatistics::try_new(
102 vec![self.partitioned_file.partition_values.clone()],
103 self.partition_fields.clone(),
104 )?) as Box<dyn PruningStatistics>;
105 if let Some(stats) = &self.partitioned_file.statistics {
106 let stats_pruning = Box::new(PrunableStatistics::new(
107 vec![Arc::clone(stats)],
108 Arc::clone(&self.pruning_schema),
109 ));
110 pruning = Box::new(CompositePruningStatistics::new(vec![
111 pruning,
112 stats_pruning,
113 ]));
114 }
115 match pruning_predicate.prune(pruning.as_ref()) {
116 Ok(values) => {
117 assert!(values.len() == 1);
118 if values.into_iter().all(|v| !v) {
120 return Ok(true);
121 }
122 }
123 Err(e) => {
125 debug!("Ignoring error building pruning predicate for file: {e}");
126 self.predicate_creation_errors.add(1);
127 }
128 }
129 }
130
131 Ok(false)
132 }
133}