datafusion_datasource_parquet/
page_filter.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Contains code to filter entire pages
19
20use std::collections::HashSet;
21use std::sync::Arc;
22
23use super::metrics::ParquetFileMetrics;
24use crate::ParquetAccessPlan;
25
26use arrow::array::BooleanArray;
27use arrow::{
28    array::ArrayRef,
29    datatypes::{Schema, SchemaRef},
30};
31use datafusion_common::pruning::PruningStatistics;
32use datafusion_common::ScalarValue;
33use datafusion_physical_expr::{split_conjunction, PhysicalExpr};
34use datafusion_pruning::PruningPredicate;
35
36use log::{debug, trace};
37use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
38use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex};
39use parquet::file::page_index::offset_index::PageLocation;
40use parquet::schema::types::SchemaDescriptor;
41use parquet::{
42    arrow::arrow_reader::{RowSelection, RowSelector},
43    file::metadata::{ParquetMetaData, RowGroupMetaData},
44};
45
46/// Filters a [`ParquetAccessPlan`] based on the [Parquet PageIndex], if present
47///
48/// It does so by evaluating statistics from the [`ParquetColumnIndex`] and
49/// [`ParquetOffsetIndex`] and converting them to [`RowSelection`].
50///
51/// [Parquet PageIndex]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
52///
53/// For example, given a row group with two column (chunks) for `A`
54/// and `B` with the following with page level statistics:
55///
56/// ```text
57/// ┏━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━
58///    ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ┃
59/// ┃     ┌──────────────┐  │     ┌──────────────┐  │  ┃
60/// ┃  │  │              │     │  │              │     ┃
61/// ┃     │              │  │     │     Page     │  │
62///    │  │              │     │  │      3       │     ┃
63/// ┃     │              │  │     │   min: "A"   │  │  ┃
64/// ┃  │  │              │     │  │   max: "C"   │     ┃
65/// ┃     │     Page     │  │     │ first_row: 0 │  │
66///    │  │      1       │     │  │              │     ┃
67/// ┃     │   min: 10    │  │     └──────────────┘  │  ┃
68/// ┃  │  │   max: 20    │     │  ┌──────────────┐     ┃
69/// ┃     │ first_row: 0 │  │     │              │  │
70///    │  │              │     │  │     Page     │     ┃
71/// ┃     │              │  │     │      4       │  │  ┃
72/// ┃  │  │              │     │  │   min: "D"   │     ┃
73/// ┃     │              │  │     │   max: "G"   │  │
74///    │  │              │     │  │first_row: 100│     ┃
75/// ┃     └──────────────┘  │     │              │  │  ┃
76/// ┃  │  ┌──────────────┐     │  │              │     ┃
77/// ┃     │              │  │     └──────────────┘  │
78///    │  │     Page     │     │  ┌──────────────┐     ┃
79/// ┃     │      2       │  │     │              │  │  ┃
80/// ┃  │  │   min: 30    │     │  │     Page     │     ┃
81/// ┃     │   max: 40    │  │     │      5       │  │
82///    │  │first_row: 200│     │  │   min: "H"   │     ┃
83/// ┃     │              │  │     │   max: "Z"   │  │  ┃
84/// ┃  │  │              │     │  │first_row: 250│     ┃
85/// ┃     └──────────────┘  │     │              │  │
86///    │                       │  └──────────────┘     ┃
87/// ┃   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃
88/// ┃       ColumnChunk            ColumnChunk         ┃
89/// ┃            A                      B
90///  ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━┛
91///
92///   Total rows: 300
93/// ```
94///
95/// Given the predicate `A > 35 AND B = 'F'`:
96///
97/// Using `A > 35`: can rule out all of values in Page 1 (rows 0 -> 199)
98///
99/// Using `B = 'F'`: can rule out all values in Page 3 and Page 5 (rows 0 -> 99, and 250 -> 299)
100///
101/// So we can entirely skip rows 0->199 and 250->299 as we know they
102/// can not contain rows that match the predicate.
103///
104/// # Implementation notes
105///
106/// Single column predicates are evaluated using the PageIndex information
107/// for that column to determine which row ranges can be skipped based.
108///
109/// The resulting [`RowSelection`]'s are combined into a final
110/// row selection that is added to the [`ParquetAccessPlan`].
111#[derive(Debug)]
112pub struct PagePruningAccessPlanFilter {
113    /// single column predicates (e.g. (`col = 5`) extracted from the overall
114    /// predicate. Must all be true for a row to be included in the result.
115    predicates: Vec<PruningPredicate>,
116}
117
118impl PagePruningAccessPlanFilter {
119    /// Create a new [`PagePruningAccessPlanFilter`] from a physical
120    /// expression.
121    pub fn new(expr: &Arc<dyn PhysicalExpr>, schema: SchemaRef) -> Self {
122        // extract any single column predicates
123        let predicates = split_conjunction(expr)
124            .into_iter()
125            .filter_map(|predicate| {
126                let pp = match PruningPredicate::try_new(
127                    Arc::clone(predicate),
128                    Arc::clone(&schema),
129                ) {
130                    Ok(pp) => pp,
131                    Err(e) => {
132                        debug!("Ignoring error creating page pruning predicate: {e}");
133                        return None;
134                    }
135                };
136
137                if pp.always_true() {
138                    debug!("Ignoring always true page pruning predicate: {predicate}");
139                    return None;
140                }
141
142                if pp.required_columns().single_column().is_none() {
143                    debug!("Ignoring multi-column page pruning predicate: {predicate}");
144                    return None;
145                }
146
147                Some(pp)
148            })
149            .collect::<Vec<_>>();
150        Self { predicates }
151    }
152
153    /// Returns an updated [`ParquetAccessPlan`] by applying predicates to the
154    /// parquet page index, if any
155    pub fn prune_plan_with_page_index(
156        &self,
157        mut access_plan: ParquetAccessPlan,
158        arrow_schema: &Schema,
159        parquet_schema: &SchemaDescriptor,
160        parquet_metadata: &ParquetMetaData,
161        file_metrics: &ParquetFileMetrics,
162    ) -> ParquetAccessPlan {
163        // scoped timer updates on drop
164        let _timer_guard = file_metrics.page_index_eval_time.timer();
165        if self.predicates.is_empty() {
166            return access_plan;
167        }
168
169        let page_index_predicates = &self.predicates;
170        let groups = parquet_metadata.row_groups();
171
172        if groups.is_empty() {
173            return access_plan;
174        }
175
176        if parquet_metadata.offset_index().is_none()
177            || parquet_metadata.column_index().is_none()
178        {
179            debug!(
180                    "Can not prune pages due to lack of indexes. Have offset: {}, column index: {}",
181                    parquet_metadata.offset_index().is_some(), parquet_metadata.column_index().is_some()
182                );
183            return access_plan;
184        };
185
186        // track the total number of rows that should be skipped
187        let mut total_skip = 0;
188        // track the total number of rows that should not be skipped
189        let mut total_select = 0;
190
191        // for each row group specified in the access plan
192        let row_group_indexes = access_plan.row_group_indexes();
193        for row_group_index in row_group_indexes {
194            // The selection for this particular row group
195            let mut overall_selection = None;
196            for predicate in page_index_predicates {
197                let column = predicate
198                    .required_columns()
199                    .single_column()
200                    .expect("Page pruning requires single column predicates");
201
202                let converter = StatisticsConverter::try_new(
203                    column.name(),
204                    arrow_schema,
205                    parquet_schema,
206                );
207
208                let converter = match converter {
209                    Ok(converter) => converter,
210                    Err(e) => {
211                        debug!(
212                            "Could not create statistics converter for column {}: {e}",
213                            column.name()
214                        );
215                        continue;
216                    }
217                };
218
219                let selection = prune_pages_in_one_row_group(
220                    row_group_index,
221                    predicate,
222                    converter,
223                    parquet_metadata,
224                    file_metrics,
225                );
226
227                let Some(selection) = selection else {
228                    trace!("No pages pruned in prune_pages_in_one_row_group");
229                    continue;
230                };
231
232                debug!("Use filter and page index to create RowSelection {:?} from predicate: {:?}",
233                    &selection,
234                    predicate.predicate_expr(),
235                );
236
237                overall_selection = update_selection(overall_selection, selection);
238
239                // if the overall selection has ruled out all rows, no need to
240                // continue with the other predicates
241                let selects_any = overall_selection
242                    .as_ref()
243                    .map(|selection| selection.selects_any())
244                    .unwrap_or(true);
245
246                if !selects_any {
247                    break;
248                }
249            }
250
251            if let Some(overall_selection) = overall_selection {
252                let rows_selected = overall_selection.row_count();
253                if rows_selected > 0 {
254                    let rows_skipped = overall_selection.skipped_row_count();
255                    trace!("Overall selection from predicate skipped {rows_skipped}, selected {rows_selected}: {overall_selection:?}");
256                    total_skip += rows_skipped;
257                    total_select += rows_selected;
258                    access_plan.scan_selection(row_group_index, overall_selection)
259                } else {
260                    // Selection skips all rows, so skip the entire row group
261                    let rows_skipped = groups[row_group_index].num_rows() as usize;
262                    access_plan.skip(row_group_index);
263                    total_skip += rows_skipped;
264                    trace!(
265                        "Overall selection from predicate is empty, \
266                        skipping all {rows_skipped} rows in row group {row_group_index}"
267                    );
268                }
269            }
270        }
271
272        file_metrics.page_index_rows_pruned.add_pruned(total_skip);
273        file_metrics
274            .page_index_rows_pruned
275            .add_matched(total_select);
276        access_plan
277    }
278
279    /// Returns the number of filters in the [`PagePruningAccessPlanFilter`]
280    pub fn filter_number(&self) -> usize {
281        self.predicates.len()
282    }
283}
284
285fn update_selection(
286    current_selection: Option<RowSelection>,
287    row_selection: RowSelection,
288) -> Option<RowSelection> {
289    match current_selection {
290        None => Some(row_selection),
291        Some(current_selection) => Some(current_selection.intersection(&row_selection)),
292    }
293}
294
295/// Returns a [`RowSelection`] for the rows in this row group to scan.
296///
297/// This Row Selection is formed from the page index and the predicate skips row
298/// ranges that can be ruled out based on the predicate.
299///
300/// Returns `None` if there is an error evaluating the predicate or the required
301/// page information is not present.
302fn prune_pages_in_one_row_group(
303    row_group_index: usize,
304    pruning_predicate: &PruningPredicate,
305    converter: StatisticsConverter<'_>,
306    parquet_metadata: &ParquetMetaData,
307    metrics: &ParquetFileMetrics,
308) -> Option<RowSelection> {
309    let pruning_stats =
310        PagesPruningStatistics::try_new(row_group_index, converter, parquet_metadata)?;
311
312    // Each element in values is a boolean indicating whether the page may have
313    // values that match the predicate (true) or could not possibly have values
314    // that match the predicate (false).
315    let values = match pruning_predicate.prune(&pruning_stats) {
316        Ok(values) => values,
317        Err(e) => {
318            debug!("Error evaluating page index predicate values {e}");
319            metrics.predicate_evaluation_errors.add(1);
320            return None;
321        }
322    };
323
324    // Convert the information of which pages to skip into a RowSelection
325    // that describes the ranges of rows to skip.
326    let Some(page_row_counts) = pruning_stats.page_row_counts() else {
327        debug!(
328            "Can not determine page row counts for row group {row_group_index}, skipping"
329        );
330        metrics.predicate_evaluation_errors.add(1);
331        return None;
332    };
333
334    let mut vec = Vec::with_capacity(values.len());
335    assert_eq!(page_row_counts.len(), values.len());
336    let mut sum_row = *page_row_counts.first().unwrap();
337    let mut selected = *values.first().unwrap();
338    trace!("Pruned to {values:?} using {pruning_stats:?}");
339    for (i, &f) in values.iter().enumerate().skip(1) {
340        if f == selected {
341            sum_row += *page_row_counts.get(i).unwrap();
342        } else {
343            let selector = if selected {
344                RowSelector::select(sum_row)
345            } else {
346                RowSelector::skip(sum_row)
347            };
348            vec.push(selector);
349            sum_row = *page_row_counts.get(i).unwrap();
350            selected = f;
351        }
352    }
353
354    let selector = if selected {
355        RowSelector::select(sum_row)
356    } else {
357        RowSelector::skip(sum_row)
358    };
359    vec.push(selector);
360    Some(RowSelection::from(vec))
361}
362
363/// Implement [`PruningStatistics`] for one column's PageIndex (column_index + offset_index)
364#[derive(Debug)]
365struct PagesPruningStatistics<'a> {
366    row_group_index: usize,
367    row_group_metadatas: &'a [RowGroupMetaData],
368    converter: StatisticsConverter<'a>,
369    column_index: &'a ParquetColumnIndex,
370    offset_index: &'a ParquetOffsetIndex,
371    page_offsets: &'a Vec<PageLocation>,
372}
373
374impl<'a> PagesPruningStatistics<'a> {
375    /// Creates a new [`PagesPruningStatistics`] for a column in a row group, if
376    /// possible.
377    ///
378    /// Returns None if the `parquet_metadata` does not have sufficient
379    /// information to create the statistics.
380    fn try_new(
381        row_group_index: usize,
382        converter: StatisticsConverter<'a>,
383        parquet_metadata: &'a ParquetMetaData,
384    ) -> Option<Self> {
385        let Some(parquet_column_index) = converter.parquet_column_index() else {
386            trace!(
387                "Column {:?} not in parquet file, skipping",
388                converter.arrow_field()
389            );
390            return None;
391        };
392
393        let column_index = parquet_metadata.column_index()?;
394        let offset_index = parquet_metadata.offset_index()?;
395        let row_group_metadatas = parquet_metadata.row_groups();
396
397        let Some(row_group_page_offsets) = offset_index.get(row_group_index) else {
398            trace!("No page offsets for row group {row_group_index}, skipping");
399            return None;
400        };
401        let Some(offset_index_metadata) =
402            row_group_page_offsets.get(parquet_column_index)
403        else {
404            trace!(
405                "No page offsets for column {:?} in row group {row_group_index}, skipping",
406                converter.arrow_field()
407            );
408            return None;
409        };
410        let page_offsets = offset_index_metadata.page_locations();
411
412        Some(Self {
413            row_group_index,
414            row_group_metadatas,
415            converter,
416            column_index,
417            offset_index,
418            page_offsets,
419        })
420    }
421
422    /// return the row counts in each data page, if possible.
423    fn page_row_counts(&self) -> Option<Vec<usize>> {
424        let row_group_metadata = self
425            .row_group_metadatas
426            .get(self.row_group_index)
427            // fail fast/panic if row_group_index is out of bounds
428            .unwrap();
429
430        let num_rows_in_row_group = row_group_metadata.num_rows() as usize;
431
432        let page_offsets = self.page_offsets;
433        let mut vec = Vec::with_capacity(page_offsets.len());
434        page_offsets.windows(2).for_each(|x| {
435            let start = x[0].first_row_index as usize;
436            let end = x[1].first_row_index as usize;
437            vec.push(end - start);
438        });
439        vec.push(num_rows_in_row_group - page_offsets.last()?.first_row_index as usize);
440        Some(vec)
441    }
442}
443impl PruningStatistics for PagesPruningStatistics<'_> {
444    fn min_values(&self, _column: &datafusion_common::Column) -> Option<ArrayRef> {
445        match self.converter.data_page_mins(
446            self.column_index,
447            self.offset_index,
448            [&self.row_group_index],
449        ) {
450            Ok(min_values) => Some(min_values),
451            Err(e) => {
452                debug!("Error evaluating data page min values {e}");
453                None
454            }
455        }
456    }
457
458    fn max_values(&self, _column: &datafusion_common::Column) -> Option<ArrayRef> {
459        match self.converter.data_page_maxes(
460            self.column_index,
461            self.offset_index,
462            [&self.row_group_index],
463        ) {
464            Ok(min_values) => Some(min_values),
465            Err(e) => {
466                debug!("Error evaluating data page max values {e}");
467                None
468            }
469        }
470    }
471
472    fn num_containers(&self) -> usize {
473        self.page_offsets.len()
474    }
475
476    fn null_counts(&self, _column: &datafusion_common::Column) -> Option<ArrayRef> {
477        match self.converter.data_page_null_counts(
478            self.column_index,
479            self.offset_index,
480            [&self.row_group_index],
481        ) {
482            Ok(null_counts) => Some(Arc::new(null_counts)),
483            Err(e) => {
484                debug!("Error evaluating data page null counts {e}");
485                None
486            }
487        }
488    }
489
490    fn row_counts(&self, _column: &datafusion_common::Column) -> Option<ArrayRef> {
491        match self.converter.data_page_row_counts(
492            self.offset_index,
493            self.row_group_metadatas,
494            [&self.row_group_index],
495        ) {
496            Ok(row_counts) => row_counts.map(|a| Arc::new(a) as ArrayRef),
497            Err(e) => {
498                debug!("Error evaluating data page row counts {e}");
499                None
500            }
501        }
502    }
503
504    fn contained(
505        &self,
506        _column: &datafusion_common::Column,
507        _values: &HashSet<ScalarValue>,
508    ) -> Option<BooleanArray> {
509        None
510    }
511}