datafusion_datasource_parquet/access_plan.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use datafusion_common::{internal_err, Result};
19use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
20use parquet::file::metadata::RowGroupMetaData;
21
22/// A selection of rows and row groups within a ParquetFile to decode.
23///
24/// A `ParquetAccessPlan` is used to limit the row groups and data pages a `DataSourceExec`
25/// will read and decode to improve performance.
26///
27/// Note that page level pruning based on ArrowPredicate is applied after all of
28/// these selections
29///
30/// # Example
31///
32/// For example, given a Parquet file with 4 row groups, a `ParquetAccessPlan`
33/// can be used to specify skipping row group 0 and 2, scanning a range of rows
34/// in row group 1, and scanning all rows in row group 3 as follows:
35///
36/// ```rust
37/// # use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
38/// # use datafusion_datasource_parquet::ParquetAccessPlan;
39/// // Default to scan all row groups
40/// let mut access_plan = ParquetAccessPlan::new_all(4);
41/// access_plan.skip(0); // skip row group
42/// // Use parquet reader RowSelector to specify scanning rows 100-200 and 350-400
43/// // in a row group that has 1000 rows
44/// let row_selection = RowSelection::from(vec![
45/// RowSelector::skip(100),
46/// RowSelector::select(100),
47/// RowSelector::skip(150),
48/// RowSelector::select(50),
49/// RowSelector::skip(600), // skip last 600 rows
50/// ]);
51/// access_plan.scan_selection(1, row_selection);
52/// access_plan.skip(2); // skip row group 2
53/// // row group 3 is scanned by default
54/// ```
55///
56/// The resulting plan would look like:
57///
58/// ```text
59/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
60///
61/// │ │ SKIP
62///
63/// └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
64/// Row Group 0
65/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
66/// ┌────────────────┐ SCAN ONLY ROWS
67/// │└────────────────┘ │ 100-200
68/// ┌────────────────┐ 350-400
69/// │└────────────────┘ │
70/// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
71/// Row Group 1
72/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
73/// SKIP
74/// │ │
75///
76/// └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
77/// Row Group 2
78/// ┌───────────────────┐
79/// │ │ SCAN ALL ROWS
80/// │ │
81/// │ │
82/// └───────────────────┘
83/// Row Group 3
84/// ```
85#[derive(Debug, Clone, PartialEq)]
86pub struct ParquetAccessPlan {
87 /// How to access the i-th row group
88 row_groups: Vec<RowGroupAccess>,
89}
90
91/// Describes how the parquet reader will access a row group
92#[derive(Debug, Clone, PartialEq)]
93pub enum RowGroupAccess {
94 /// Do not read the row group at all
95 Skip,
96 /// Read all rows from the row group
97 Scan,
98 /// Scan only the specified rows within the row group
99 Selection(RowSelection),
100}
101
102impl RowGroupAccess {
103 /// Return true if this row group should be scanned
104 pub fn should_scan(&self) -> bool {
105 match self {
106 RowGroupAccess::Skip => false,
107 RowGroupAccess::Scan | RowGroupAccess::Selection(_) => true,
108 }
109 }
110}
111
112impl ParquetAccessPlan {
113 /// Create a new `ParquetAccessPlan` that scans all row groups
114 pub fn new_all(row_group_count: usize) -> Self {
115 Self {
116 row_groups: vec![RowGroupAccess::Scan; row_group_count],
117 }
118 }
119
120 /// Create a new `ParquetAccessPlan` that scans no row groups
121 pub fn new_none(row_group_count: usize) -> Self {
122 Self {
123 row_groups: vec![RowGroupAccess::Skip; row_group_count],
124 }
125 }
126
127 /// Create a new `ParquetAccessPlan` from the specified [`RowGroupAccess`]es
128 pub fn new(row_groups: Vec<RowGroupAccess>) -> Self {
129 Self { row_groups }
130 }
131
132 /// Set the i-th row group to the specified [`RowGroupAccess`]
133 pub fn set(&mut self, idx: usize, access: RowGroupAccess) {
134 self.row_groups[idx] = access;
135 }
136
137 /// skips the i-th row group (should not be scanned)
138 pub fn skip(&mut self, idx: usize) {
139 self.set(idx, RowGroupAccess::Skip);
140 }
141
142 /// scan the i-th row group
143 pub fn scan(&mut self, idx: usize) {
144 self.set(idx, RowGroupAccess::Scan);
145 }
146
147 /// Return true if the i-th row group should be scanned
148 pub fn should_scan(&self, idx: usize) -> bool {
149 self.row_groups[idx].should_scan()
150 }
151
152 /// Set to scan only the [`RowSelection`] in the specified row group.
153 ///
154 /// Behavior is different depending on the existing access
155 /// * [`RowGroupAccess::Skip`]: does nothing
156 /// * [`RowGroupAccess::Scan`]: Updates to scan only the rows in the `RowSelection`
157 /// * [`RowGroupAccess::Selection`]: Updates to scan only the intersection of the existing selection and the new selection
158 pub fn scan_selection(&mut self, idx: usize, selection: RowSelection) {
159 self.row_groups[idx] = match &self.row_groups[idx] {
160 // already skipping the entire row group
161 RowGroupAccess::Skip => RowGroupAccess::Skip,
162 RowGroupAccess::Scan => RowGroupAccess::Selection(selection),
163 RowGroupAccess::Selection(existing_selection) => {
164 RowGroupAccess::Selection(existing_selection.intersection(&selection))
165 }
166 }
167 }
168
169 /// Return an overall `RowSelection`, if needed
170 ///
171 /// This is used to compute the row selection for the parquet reader. See
172 /// [`ArrowReaderBuilder::with_row_selection`] for more details.
173 ///
174 /// Returns
175 /// * `None` if there are no [`RowGroupAccess::Selection`]
176 /// * `Some(selection)` if there are [`RowGroupAccess::Selection`]s
177 ///
178 /// The returned selection represents which rows to scan across any row
179 /// row groups which are not skipped.
180 ///
181 /// # Notes
182 ///
183 /// If there are no [`RowGroupAccess::Selection`]s, the overall row
184 /// selection is `None` because each row group is either entirely skipped or
185 /// scanned, which is covered by [`Self::row_group_indexes`].
186 ///
187 /// If there are any [`RowGroupAccess::Selection`], an overall row selection
188 /// is returned for *all* the rows in the row groups that are not skipped.
189 /// Thus it includes a `Select` selection for any [`RowGroupAccess::Scan`].
190 ///
191 /// # Errors
192 ///
193 /// Returns an error if any specified row selection does not specify
194 /// the same number of rows as in it's corresponding `row_group_metadata`.
195 ///
196 /// # Example: No Selections
197 ///
198 /// Given an access plan like this
199 ///
200 /// ```text
201 /// RowGroupAccess::Scan (scan all row group 0)
202 /// RowGroupAccess::Skip (skip row group 1)
203 /// RowGroupAccess::Scan (scan all row group 2)
204 /// RowGroupAccess::Scan (scan all row group 3)
205 /// ```
206 ///
207 /// The overall row selection would be `None` because there are no
208 /// [`RowGroupAccess::Selection`]s. The row group indexes
209 /// returned by [`Self::row_group_indexes`] would be `0, 2, 3` .
210 ///
211 /// # Example: With Selections
212 ///
213 /// Given an access plan like this:
214 ///
215 /// ```text
216 /// RowGroupAccess::Scan (scan all row group 0)
217 /// RowGroupAccess::Skip (skip row group 1)
218 /// RowGroupAccess::Select (skip 50, scan 50, skip 900) (scan rows 50-100 in row group 2)
219 /// RowGroupAccess::Scan (scan all row group 3)
220 /// ```
221 ///
222 /// Assuming each row group has 1000 rows, the resulting row selection would
223 /// be the rows to scan in row group 0, 2 and 4:
224 ///
225 /// ```text
226 /// RowSelection::Select(1000) (scan all rows in row group 0)
227 /// RowSelection::Skip(50) (skip first 50 rows in row group 2)
228 /// RowSelection::Select(50) (scan rows 50-100 in row group 2)
229 /// RowSelection::Skip(900) (skip last 900 rows in row group 2)
230 /// RowSelection::Select(1000) (scan all rows in row group 3)
231 /// ```
232 ///
233 /// Note there is no entry for the (entirely) skipped row group 1.
234 ///
235 /// The row group indexes returned by [`Self::row_group_indexes`] would
236 /// still be `0, 2, 3` .
237 ///
238 /// [`ArrowReaderBuilder::with_row_selection`]: parquet::arrow::arrow_reader::ArrowReaderBuilder::with_row_selection
239 pub fn into_overall_row_selection(
240 self,
241 row_group_meta_data: &[RowGroupMetaData],
242 ) -> Result<Option<RowSelection>> {
243 assert_eq!(row_group_meta_data.len(), self.row_groups.len());
244 // Intuition: entire row groups are filtered out using
245 // `row_group_indexes` which come from Skip and Scan. An overall
246 // RowSelection is only useful if there is any parts *within* a row group
247 // which can be filtered out, that is a `Selection`.
248 if !self
249 .row_groups
250 .iter()
251 .any(|rg| matches!(rg, RowGroupAccess::Selection(_)))
252 {
253 return Ok(None);
254 }
255
256 // validate all Selections
257 for (idx, (rg, rg_meta)) in self
258 .row_groups
259 .iter()
260 .zip(row_group_meta_data.iter())
261 .enumerate()
262 {
263 let RowGroupAccess::Selection(selection) = rg else {
264 continue;
265 };
266 let rows_in_selection = selection
267 .iter()
268 .map(|selection| selection.row_count)
269 .sum::<usize>();
270
271 let row_group_row_count = rg_meta.num_rows();
272 if rows_in_selection as i64 != row_group_row_count {
273 return internal_err!(
274 "Invalid ParquetAccessPlan Selection. Row group {idx} has {row_group_row_count} rows \
275 but selection only specifies {rows_in_selection} rows. \
276 Selection: {selection:?}"
277 );
278 }
279 }
280
281 let total_selection: RowSelection = self
282 .row_groups
283 .into_iter()
284 .zip(row_group_meta_data.iter())
285 .flat_map(|(rg, rg_meta)| {
286 match rg {
287 RowGroupAccess::Skip => vec![],
288 RowGroupAccess::Scan => {
289 // need a row group access to scan the entire row group (need row group counts)
290 vec![RowSelector::select(rg_meta.num_rows() as usize)]
291 }
292 RowGroupAccess::Selection(selection) => {
293 let selection: Vec<RowSelector> = selection.into();
294 selection
295 }
296 }
297 })
298 .collect();
299
300 Ok(Some(total_selection))
301 }
302
303 /// Return an iterator over the row group indexes that should be scanned
304 pub fn row_group_index_iter(&self) -> impl Iterator<Item = usize> + '_ {
305 self.row_groups.iter().enumerate().filter_map(|(idx, b)| {
306 if b.should_scan() {
307 Some(idx)
308 } else {
309 None
310 }
311 })
312 }
313
314 /// Return a vec of all row group indexes to scan
315 pub fn row_group_indexes(&self) -> Vec<usize> {
316 self.row_group_index_iter().collect()
317 }
318
319 /// Return the total number of row groups (not the total number or groups to
320 /// scan)
321 pub fn len(&self) -> usize {
322 self.row_groups.len()
323 }
324
325 /// Return true if there are no row groups
326 pub fn is_empty(&self) -> bool {
327 self.row_groups.is_empty()
328 }
329
330 /// Get a reference to the inner accesses
331 pub fn inner(&self) -> &[RowGroupAccess] {
332 &self.row_groups
333 }
334
335 /// Covert into the inner row group accesses
336 pub fn into_inner(self) -> Vec<RowGroupAccess> {
337 self.row_groups
338 }
339}
340
341#[cfg(test)]
342mod test {
343 use super::*;
344 use datafusion_common::assert_contains;
345 use parquet::basic::LogicalType;
346 use parquet::file::metadata::ColumnChunkMetaData;
347 use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};
348 use std::sync::{Arc, LazyLock};
349
350 #[test]
351 fn test_only_scans() {
352 let access_plan = ParquetAccessPlan::new(vec![
353 RowGroupAccess::Scan,
354 RowGroupAccess::Scan,
355 RowGroupAccess::Scan,
356 RowGroupAccess::Scan,
357 ]);
358
359 let row_group_indexes = access_plan.row_group_indexes();
360 let row_selection = access_plan
361 .into_overall_row_selection(&ROW_GROUP_METADATA)
362 .unwrap();
363
364 // scan all row groups, no selection
365 assert_eq!(row_group_indexes, vec![0, 1, 2, 3]);
366 assert_eq!(row_selection, None);
367 }
368
369 #[test]
370 fn test_only_skips() {
371 let access_plan = ParquetAccessPlan::new(vec![
372 RowGroupAccess::Skip,
373 RowGroupAccess::Skip,
374 RowGroupAccess::Skip,
375 RowGroupAccess::Skip,
376 ]);
377
378 let row_group_indexes = access_plan.row_group_indexes();
379 let row_selection = access_plan
380 .into_overall_row_selection(&ROW_GROUP_METADATA)
381 .unwrap();
382
383 // skip all row groups, no selection
384 assert_eq!(row_group_indexes, vec![] as Vec<usize>);
385 assert_eq!(row_selection, None);
386 }
387 #[test]
388 fn test_mixed_1() {
389 let access_plan = ParquetAccessPlan::new(vec![
390 RowGroupAccess::Scan,
391 RowGroupAccess::Selection(
392 // specifies all 20 rows in row group 1
393 vec![
394 RowSelector::select(5),
395 RowSelector::skip(7),
396 RowSelector::select(8),
397 ]
398 .into(),
399 ),
400 RowGroupAccess::Skip,
401 RowGroupAccess::Skip,
402 ]);
403
404 let row_group_indexes = access_plan.row_group_indexes();
405 let row_selection = access_plan
406 .into_overall_row_selection(&ROW_GROUP_METADATA)
407 .unwrap();
408
409 assert_eq!(row_group_indexes, vec![0, 1]);
410 assert_eq!(
411 row_selection,
412 Some(
413 vec![
414 // select the entire first row group
415 RowSelector::select(10),
416 // selectors from the second row group
417 RowSelector::select(5),
418 RowSelector::skip(7),
419 RowSelector::select(8)
420 ]
421 .into()
422 )
423 );
424 }
425
426 #[test]
427 fn test_mixed_2() {
428 let access_plan = ParquetAccessPlan::new(vec![
429 RowGroupAccess::Skip,
430 RowGroupAccess::Scan,
431 RowGroupAccess::Selection(
432 // specify all 30 rows in row group 1
433 vec![
434 RowSelector::select(5),
435 RowSelector::skip(7),
436 RowSelector::select(18),
437 ]
438 .into(),
439 ),
440 RowGroupAccess::Scan,
441 ]);
442
443 let row_group_indexes = access_plan.row_group_indexes();
444 let row_selection = access_plan
445 .into_overall_row_selection(&ROW_GROUP_METADATA)
446 .unwrap();
447
448 assert_eq!(row_group_indexes, vec![1, 2, 3]);
449 assert_eq!(
450 row_selection,
451 Some(
452 vec![
453 // select the entire second row group
454 RowSelector::select(20),
455 // selectors from the third row group
456 RowSelector::select(5),
457 RowSelector::skip(7),
458 RowSelector::select(18),
459 // select the entire fourth row group
460 RowSelector::select(40),
461 ]
462 .into()
463 )
464 );
465 }
466
467 #[test]
468 fn test_invalid_too_few() {
469 let access_plan = ParquetAccessPlan::new(vec![
470 RowGroupAccess::Scan,
471 // specify only 12 rows in selection, but row group 1 has 20
472 RowGroupAccess::Selection(
473 vec![RowSelector::select(5), RowSelector::skip(7)].into(),
474 ),
475 RowGroupAccess::Scan,
476 RowGroupAccess::Scan,
477 ]);
478
479 let row_group_indexes = access_plan.row_group_indexes();
480 let err = access_plan
481 .into_overall_row_selection(&ROW_GROUP_METADATA)
482 .unwrap_err()
483 .to_string();
484 assert_eq!(row_group_indexes, vec![0, 1, 2, 3]);
485 assert_contains!(err, "Internal error: Invalid ParquetAccessPlan Selection. Row group 1 has 20 rows but selection only specifies 12 rows");
486 }
487
488 #[test]
489 fn test_invalid_too_many() {
490 let access_plan = ParquetAccessPlan::new(vec![
491 RowGroupAccess::Scan,
492 // specify 22 rows in selection, but row group 1 has only 20
493 RowGroupAccess::Selection(
494 vec![
495 RowSelector::select(10),
496 RowSelector::skip(2),
497 RowSelector::select(10),
498 ]
499 .into(),
500 ),
501 RowGroupAccess::Scan,
502 RowGroupAccess::Scan,
503 ]);
504
505 let row_group_indexes = access_plan.row_group_indexes();
506 let err = access_plan
507 .into_overall_row_selection(&ROW_GROUP_METADATA)
508 .unwrap_err()
509 .to_string();
510 assert_eq!(row_group_indexes, vec![0, 1, 2, 3]);
511 assert_contains!(err, "Invalid ParquetAccessPlan Selection. Row group 1 has 20 rows but selection only specifies 22 rows");
512 }
513
514 /// [`RowGroupMetaData`] that returns 4 row groups with 10, 20, 30, 40 rows
515 /// respectively
516 static ROW_GROUP_METADATA: LazyLock<Vec<RowGroupMetaData>> = LazyLock::new(|| {
517 let schema_descr = get_test_schema_descr();
518 let row_counts = [10, 20, 30, 40];
519
520 row_counts
521 .into_iter()
522 .map(|num_rows| {
523 let column = ColumnChunkMetaData::builder(schema_descr.column(0))
524 .set_num_values(num_rows)
525 .build()
526 .unwrap();
527
528 RowGroupMetaData::builder(schema_descr.clone())
529 .set_num_rows(num_rows)
530 .set_column_metadata(vec![column])
531 .build()
532 .unwrap()
533 })
534 .collect()
535 });
536
537 /// Single column schema with a single column named "a" of type `BYTE_ARRAY`/`String`
538 fn get_test_schema_descr() -> SchemaDescPtr {
539 use parquet::basic::Type as PhysicalType;
540 use parquet::schema::types::Type as SchemaType;
541 let field = SchemaType::primitive_type_builder("a", PhysicalType::BYTE_ARRAY)
542 .with_logical_type(Some(LogicalType::String))
543 .build()
544 .unwrap();
545 let schema = SchemaType::group_type_builder("schema")
546 .with_fields(vec![Arc::new(field)])
547 .build()
548 .unwrap();
549 Arc::new(SchemaDescriptor::new(Arc::new(schema)))
550 }
551}