datafusion_datasource_parquet/
access_plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use datafusion_common::{internal_err, Result};
19use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
20use parquet::file::metadata::RowGroupMetaData;
21
22/// A selection of rows and row groups within a ParquetFile to decode.
23///
24/// A `ParquetAccessPlan` is used to limit the row groups and data pages a `DataSourceExec`
25/// will read and decode to improve performance.
26///
27/// Note that page level pruning based on ArrowPredicate is applied after all of
28/// these selections
29///
30/// # Example
31///
32/// For example, given a Parquet file with 4 row groups, a `ParquetAccessPlan`
33/// can be used to specify skipping row group 0 and 2, scanning a range of rows
34/// in row group 1, and scanning all rows in row group 3 as follows:
35///
36/// ```rust
37/// # use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
38/// # use datafusion_datasource_parquet::ParquetAccessPlan;
39/// // Default to scan all row groups
40/// let mut access_plan = ParquetAccessPlan::new_all(4);
41/// access_plan.skip(0); // skip row group
42/// // Use parquet reader RowSelector to specify scanning rows 100-200 and 350-400
43/// // in a row group that has 1000 rows
44/// let row_selection = RowSelection::from(vec![
45///    RowSelector::skip(100),
46///    RowSelector::select(100),
47///    RowSelector::skip(150),
48///    RowSelector::select(50),
49///    RowSelector::skip(600),  // skip last 600 rows
50/// ]);
51/// access_plan.scan_selection(1, row_selection);
52/// access_plan.skip(2); // skip row group 2
53/// // row group 3 is scanned by default
54/// ```
55///
56/// The resulting plan would look like:
57///
58/// ```text
59/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
60///
61/// │                   │  SKIP
62///
63/// └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
64///  Row Group 0
65/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
66///  ┌────────────────┐    SCAN ONLY ROWS
67/// │└────────────────┘ │  100-200
68///  ┌────────────────┐    350-400
69/// │└────────────────┘ │
70///  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
71///  Row Group 1
72/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
73///                        SKIP
74/// │                   │
75///
76/// └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
77///  Row Group 2
78/// ┌───────────────────┐
79/// │                   │  SCAN ALL ROWS
80/// │                   │
81/// │                   │
82/// └───────────────────┘
83///  Row Group 3
84/// ```
85#[derive(Debug, Clone, PartialEq)]
86pub struct ParquetAccessPlan {
87    /// How to access the i-th row group
88    row_groups: Vec<RowGroupAccess>,
89}
90
91/// Describes how the parquet reader will access a row group
92#[derive(Debug, Clone, PartialEq)]
93pub enum RowGroupAccess {
94    /// Do not read the row group at all
95    Skip,
96    /// Read all rows from the row group
97    Scan,
98    /// Scan only the specified rows within the row group
99    Selection(RowSelection),
100}
101
102impl RowGroupAccess {
103    /// Return true if this row group should be scanned
104    pub fn should_scan(&self) -> bool {
105        match self {
106            RowGroupAccess::Skip => false,
107            RowGroupAccess::Scan | RowGroupAccess::Selection(_) => true,
108        }
109    }
110}
111
112impl ParquetAccessPlan {
113    /// Create a new `ParquetAccessPlan` that scans all row groups
114    pub fn new_all(row_group_count: usize) -> Self {
115        Self {
116            row_groups: vec![RowGroupAccess::Scan; row_group_count],
117        }
118    }
119
120    /// Create a new `ParquetAccessPlan` that scans no row groups
121    pub fn new_none(row_group_count: usize) -> Self {
122        Self {
123            row_groups: vec![RowGroupAccess::Skip; row_group_count],
124        }
125    }
126
127    /// Create a new `ParquetAccessPlan` from the specified [`RowGroupAccess`]es
128    pub fn new(row_groups: Vec<RowGroupAccess>) -> Self {
129        Self { row_groups }
130    }
131
132    /// Set the i-th row group to the specified [`RowGroupAccess`]
133    pub fn set(&mut self, idx: usize, access: RowGroupAccess) {
134        self.row_groups[idx] = access;
135    }
136
137    /// skips the i-th row group (should not be scanned)
138    pub fn skip(&mut self, idx: usize) {
139        self.set(idx, RowGroupAccess::Skip);
140    }
141
142    /// scan the i-th row group
143    pub fn scan(&mut self, idx: usize) {
144        self.set(idx, RowGroupAccess::Scan);
145    }
146
147    /// Return true if the i-th row group should be scanned
148    pub fn should_scan(&self, idx: usize) -> bool {
149        self.row_groups[idx].should_scan()
150    }
151
152    /// Set to scan only the [`RowSelection`] in the specified row group.
153    ///
154    /// Behavior is different depending on the existing access
155    /// * [`RowGroupAccess::Skip`]: does nothing
156    /// * [`RowGroupAccess::Scan`]: Updates to scan only the rows in the `RowSelection`
157    /// * [`RowGroupAccess::Selection`]: Updates to scan only the intersection of the existing selection and the new selection
158    pub fn scan_selection(&mut self, idx: usize, selection: RowSelection) {
159        self.row_groups[idx] = match &self.row_groups[idx] {
160            // already skipping the entire row group
161            RowGroupAccess::Skip => RowGroupAccess::Skip,
162            RowGroupAccess::Scan => RowGroupAccess::Selection(selection),
163            RowGroupAccess::Selection(existing_selection) => {
164                RowGroupAccess::Selection(existing_selection.intersection(&selection))
165            }
166        }
167    }
168
169    /// Return an overall `RowSelection`, if needed
170    ///
171    /// This is used to compute the row selection for the parquet reader. See
172    /// [`ArrowReaderBuilder::with_row_selection`] for more details.
173    ///
174    /// Returns
175    /// * `None` if there are no  [`RowGroupAccess::Selection`]
176    /// * `Some(selection)` if there are [`RowGroupAccess::Selection`]s
177    ///
178    /// The returned selection represents which rows to scan across any row
179    /// row groups which are not skipped.
180    ///
181    /// # Notes
182    ///
183    /// If there are no [`RowGroupAccess::Selection`]s, the overall row
184    /// selection is `None` because each row group is either entirely skipped or
185    /// scanned, which is covered by [`Self::row_group_indexes`].
186    ///
187    /// If there are any [`RowGroupAccess::Selection`], an overall row selection
188    /// is returned for *all* the rows in the row groups that are not skipped.
189    /// Thus it includes a `Select` selection for any [`RowGroupAccess::Scan`].
190    ///
191    /// # Errors
192    ///
193    /// Returns an error if any specified row selection does not specify
194    /// the same number of rows as in it's corresponding `row_group_metadata`.
195    ///
196    /// # Example: No Selections
197    ///
198    /// Given an access plan like this
199    ///
200    /// ```text
201    ///   RowGroupAccess::Scan (scan all row group 0)
202    ///   RowGroupAccess::Skip (skip row group 1)
203    ///   RowGroupAccess::Scan (scan all row group 2)
204    ///   RowGroupAccess::Scan (scan all row group 3)
205    /// ```
206    ///
207    /// The overall row selection would be `None` because there are no
208    /// [`RowGroupAccess::Selection`]s. The row group indexes
209    /// returned by [`Self::row_group_indexes`] would be `0, 2, 3` .
210    ///
211    /// # Example: With Selections
212    ///
213    /// Given an access plan like this:
214    ///
215    /// ```text
216    ///   RowGroupAccess::Scan (scan all row group 0)
217    ///   RowGroupAccess::Skip (skip row group 1)
218    ///   RowGroupAccess::Select (skip 50, scan 50, skip 900) (scan rows 50-100 in row group 2)
219    ///   RowGroupAccess::Scan (scan all row group 3)
220    /// ```
221    ///
222    /// Assuming each row group has 1000 rows, the resulting row selection would
223    /// be the rows to scan in row group 0, 2 and 4:
224    ///
225    /// ```text
226    ///  RowSelection::Select(1000) (scan all rows in row group 0)
227    ///  RowSelection::Skip(50)     (skip first 50 rows in row group 2)
228    ///  RowSelection::Select(50)   (scan rows 50-100 in row group 2)
229    ///  RowSelection::Skip(900)    (skip last 900 rows in row group 2)
230    ///  RowSelection::Select(1000) (scan all rows in row group 3)
231    /// ```
232    ///
233    /// Note there is no entry for the (entirely) skipped row group 1.
234    ///
235    /// The row group indexes returned by [`Self::row_group_indexes`] would
236    /// still be `0, 2, 3` .
237    ///
238    /// [`ArrowReaderBuilder::with_row_selection`]: parquet::arrow::arrow_reader::ArrowReaderBuilder::with_row_selection
239    pub fn into_overall_row_selection(
240        self,
241        row_group_meta_data: &[RowGroupMetaData],
242    ) -> Result<Option<RowSelection>> {
243        assert_eq!(row_group_meta_data.len(), self.row_groups.len());
244        // Intuition: entire row groups are filtered out using
245        // `row_group_indexes` which come from Skip and Scan. An overall
246        // RowSelection is only useful if there is any parts *within* a row group
247        // which can be filtered out, that is a `Selection`.
248        if !self
249            .row_groups
250            .iter()
251            .any(|rg| matches!(rg, RowGroupAccess::Selection(_)))
252        {
253            return Ok(None);
254        }
255
256        // validate all Selections
257        for (idx, (rg, rg_meta)) in self
258            .row_groups
259            .iter()
260            .zip(row_group_meta_data.iter())
261            .enumerate()
262        {
263            let RowGroupAccess::Selection(selection) = rg else {
264                continue;
265            };
266            let rows_in_selection = selection
267                .iter()
268                .map(|selection| selection.row_count)
269                .sum::<usize>();
270
271            let row_group_row_count = rg_meta.num_rows();
272            if rows_in_selection as i64 != row_group_row_count {
273                return internal_err!(
274                    "Invalid ParquetAccessPlan Selection. Row group {idx} has {row_group_row_count} rows \
275                    but selection only specifies {rows_in_selection} rows. \
276                    Selection: {selection:?}"
277                );
278            }
279        }
280
281        let total_selection: RowSelection = self
282            .row_groups
283            .into_iter()
284            .zip(row_group_meta_data.iter())
285            .flat_map(|(rg, rg_meta)| {
286                match rg {
287                    RowGroupAccess::Skip => vec![],
288                    RowGroupAccess::Scan => {
289                        // need a row group access to scan the entire row group (need row group counts)
290                        vec![RowSelector::select(rg_meta.num_rows() as usize)]
291                    }
292                    RowGroupAccess::Selection(selection) => {
293                        let selection: Vec<RowSelector> = selection.into();
294                        selection
295                    }
296                }
297            })
298            .collect();
299
300        Ok(Some(total_selection))
301    }
302
303    /// Return an iterator over the row group indexes that should be scanned
304    pub fn row_group_index_iter(&self) -> impl Iterator<Item = usize> + '_ {
305        self.row_groups.iter().enumerate().filter_map(|(idx, b)| {
306            if b.should_scan() {
307                Some(idx)
308            } else {
309                None
310            }
311        })
312    }
313
314    /// Return a vec of all row group indexes to scan
315    pub fn row_group_indexes(&self) -> Vec<usize> {
316        self.row_group_index_iter().collect()
317    }
318
319    /// Return the total number of row groups (not the total number or groups to
320    /// scan)
321    pub fn len(&self) -> usize {
322        self.row_groups.len()
323    }
324
325    /// Return true if there are no row groups
326    pub fn is_empty(&self) -> bool {
327        self.row_groups.is_empty()
328    }
329
330    /// Get a reference to the inner accesses
331    pub fn inner(&self) -> &[RowGroupAccess] {
332        &self.row_groups
333    }
334
335    /// Covert into the inner row group accesses
336    pub fn into_inner(self) -> Vec<RowGroupAccess> {
337        self.row_groups
338    }
339}
340
341#[cfg(test)]
342mod test {
343    use super::*;
344    use datafusion_common::assert_contains;
345    use parquet::basic::LogicalType;
346    use parquet::file::metadata::ColumnChunkMetaData;
347    use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
348    use std::sync::{Arc, LazyLock};
349
350    #[test]
351    fn test_only_scans() {
352        let access_plan = ParquetAccessPlan::new(vec![
353            RowGroupAccess::Scan,
354            RowGroupAccess::Scan,
355            RowGroupAccess::Scan,
356            RowGroupAccess::Scan,
357        ]);
358
359        let row_group_indexes = access_plan.row_group_indexes();
360        let row_selection = access_plan
361            .into_overall_row_selection(&ROW_GROUP_METADATA)
362            .unwrap();
363
364        // scan all row groups, no selection
365        assert_eq!(row_group_indexes, vec![0, 1, 2, 3]);
366        assert_eq!(row_selection, None);
367    }
368
369    #[test]
370    fn test_only_skips() {
371        let access_plan = ParquetAccessPlan::new(vec![
372            RowGroupAccess::Skip,
373            RowGroupAccess::Skip,
374            RowGroupAccess::Skip,
375            RowGroupAccess::Skip,
376        ]);
377
378        let row_group_indexes = access_plan.row_group_indexes();
379        let row_selection = access_plan
380            .into_overall_row_selection(&ROW_GROUP_METADATA)
381            .unwrap();
382
383        // skip all row groups, no selection
384        assert_eq!(row_group_indexes, vec![] as Vec<usize>);
385        assert_eq!(row_selection, None);
386    }
387    #[test]
388    fn test_mixed_1() {
389        let access_plan = ParquetAccessPlan::new(vec![
390            RowGroupAccess::Scan,
391            RowGroupAccess::Selection(
392                // specifies all 20 rows in row group 1
393                vec![
394                    RowSelector::select(5),
395                    RowSelector::skip(7),
396                    RowSelector::select(8),
397                ]
398                .into(),
399            ),
400            RowGroupAccess::Skip,
401            RowGroupAccess::Skip,
402        ]);
403
404        let row_group_indexes = access_plan.row_group_indexes();
405        let row_selection = access_plan
406            .into_overall_row_selection(&ROW_GROUP_METADATA)
407            .unwrap();
408
409        assert_eq!(row_group_indexes, vec![0, 1]);
410        assert_eq!(
411            row_selection,
412            Some(
413                vec![
414                    // select the entire first row group
415                    RowSelector::select(10),
416                    // selectors from the second row group
417                    RowSelector::select(5),
418                    RowSelector::skip(7),
419                    RowSelector::select(8)
420                ]
421                .into()
422            )
423        );
424    }
425
426    #[test]
427    fn test_mixed_2() {
428        let access_plan = ParquetAccessPlan::new(vec![
429            RowGroupAccess::Skip,
430            RowGroupAccess::Scan,
431            RowGroupAccess::Selection(
432                // specify all 30 rows in row group 1
433                vec![
434                    RowSelector::select(5),
435                    RowSelector::skip(7),
436                    RowSelector::select(18),
437                ]
438                .into(),
439            ),
440            RowGroupAccess::Scan,
441        ]);
442
443        let row_group_indexes = access_plan.row_group_indexes();
444        let row_selection = access_plan
445            .into_overall_row_selection(&ROW_GROUP_METADATA)
446            .unwrap();
447
448        assert_eq!(row_group_indexes, vec![1, 2, 3]);
449        assert_eq!(
450            row_selection,
451            Some(
452                vec![
453                    // select the entire second row group
454                    RowSelector::select(20),
455                    // selectors from the third row group
456                    RowSelector::select(5),
457                    RowSelector::skip(7),
458                    RowSelector::select(18),
459                    // select the entire fourth row group
460                    RowSelector::select(40),
461                ]
462                .into()
463            )
464        );
465    }
466
467    #[test]
468    fn test_invalid_too_few() {
469        let access_plan = ParquetAccessPlan::new(vec![
470            RowGroupAccess::Scan,
471            // specify only 12 rows in selection, but row group 1 has 20
472            RowGroupAccess::Selection(
473                vec![RowSelector::select(5), RowSelector::skip(7)].into(),
474            ),
475            RowGroupAccess::Scan,
476            RowGroupAccess::Scan,
477        ]);
478
479        let row_group_indexes = access_plan.row_group_indexes();
480        let err = access_plan
481            .into_overall_row_selection(&ROW_GROUP_METADATA)
482            .unwrap_err()
483            .to_string();
484        assert_eq!(row_group_indexes, vec![0, 1, 2, 3]);
485        assert_contains!(err, "Internal error: Invalid ParquetAccessPlan Selection. Row group 1 has 20 rows but selection only specifies 12 rows");
486    }
487
488    #[test]
489    fn test_invalid_too_many() {
490        let access_plan = ParquetAccessPlan::new(vec![
491            RowGroupAccess::Scan,
492            // specify 22 rows in selection, but row group 1 has only 20
493            RowGroupAccess::Selection(
494                vec![
495                    RowSelector::select(10),
496                    RowSelector::skip(2),
497                    RowSelector::select(10),
498                ]
499                .into(),
500            ),
501            RowGroupAccess::Scan,
502            RowGroupAccess::Scan,
503        ]);
504
505        let row_group_indexes = access_plan.row_group_indexes();
506        let err = access_plan
507            .into_overall_row_selection(&ROW_GROUP_METADATA)
508            .unwrap_err()
509            .to_string();
510        assert_eq!(row_group_indexes, vec![0, 1, 2, 3]);
511        assert_contains!(err, "Invalid ParquetAccessPlan Selection. Row group 1 has 20 rows but selection only specifies 22 rows");
512    }
513
514    /// [`RowGroupMetaData`] that returns 4 row groups with 10, 20, 30, 40 rows
515    /// respectively
516    static ROW_GROUP_METADATA: LazyLock<Vec<RowGroupMetaData>> = LazyLock::new(|| {
517        let schema_descr = get_test_schema_descr();
518        let row_counts = [10, 20, 30, 40];
519
520        row_counts
521            .into_iter()
522            .map(|num_rows| {
523                let column = ColumnChunkMetaData::builder(schema_descr.column(0))
524                    .set_num_values(num_rows)
525                    .build()
526                    .unwrap();
527
528                RowGroupMetaData::builder(schema_descr.clone())
529                    .set_num_rows(num_rows)
530                    .set_column_metadata(vec![column])
531                    .build()
532                    .unwrap()
533            })
534            .collect()
535    });
536
537    /// Single column schema with a single column named "a" of type `BYTE_ARRAY`/`String`
538    fn get_test_schema_descr() -> SchemaDescPtr {
539        use parquet::basic::Type as PhysicalType;
540        use parquet::schema::types::Type as SchemaType;
541        let field = SchemaType::primitive_type_builder("a", PhysicalType::BYTE_ARRAY)
542            .with_logical_type(Some(LogicalType::String))
543            .build()
544            .unwrap();
545        let schema = SchemaType::group_type_builder("schema")
546            .with_fields(vec![Arc::new(field)])
547            .build()
548            .unwrap();
549        Arc::new(SchemaDescriptor::new(Arc::new(schema)))
550    }
551}