datafusion_datasource_parquet/
page_filter.rs1use std::collections::HashSet;
21use std::sync::Arc;
22
23use super::metrics::ParquetFileMetrics;
24use crate::ParquetAccessPlan;
25
26use arrow::array::BooleanArray;
27use arrow::{
28 array::ArrayRef,
29 datatypes::{Schema, SchemaRef},
30};
31use datafusion_common::pruning::PruningStatistics;
32use datafusion_common::ScalarValue;
33use datafusion_physical_expr::{split_conjunction, PhysicalExpr};
34use datafusion_pruning::PruningPredicate;
35
36use log::{debug, trace};
37use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
38use parquet::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex};
39use parquet::file::page_index::offset_index::PageLocation;
40use parquet::schema::types::SchemaDescriptor;
41use parquet::{
42 arrow::arrow_reader::{RowSelection, RowSelector},
43 file::metadata::{ParquetMetaData, RowGroupMetaData},
44};
45
46#[derive(Debug)]
112pub struct PagePruningAccessPlanFilter {
113 predicates: Vec<PruningPredicate>,
116}
117
118impl PagePruningAccessPlanFilter {
119 pub fn new(expr: &Arc<dyn PhysicalExpr>, schema: SchemaRef) -> Self {
122 let predicates = split_conjunction(expr)
124 .into_iter()
125 .filter_map(|predicate| {
126 let pp = match PruningPredicate::try_new(
127 Arc::clone(predicate),
128 Arc::clone(&schema),
129 ) {
130 Ok(pp) => pp,
131 Err(e) => {
132 debug!("Ignoring error creating page pruning predicate: {e}");
133 return None;
134 }
135 };
136
137 if pp.always_true() {
138 debug!("Ignoring always true page pruning predicate: {predicate}");
139 return None;
140 }
141
142 if pp.required_columns().single_column().is_none() {
143 debug!("Ignoring multi-column page pruning predicate: {predicate}");
144 return None;
145 }
146
147 Some(pp)
148 })
149 .collect::<Vec<_>>();
150 Self { predicates }
151 }
152
153 pub fn prune_plan_with_page_index(
156 &self,
157 mut access_plan: ParquetAccessPlan,
158 arrow_schema: &Schema,
159 parquet_schema: &SchemaDescriptor,
160 parquet_metadata: &ParquetMetaData,
161 file_metrics: &ParquetFileMetrics,
162 ) -> ParquetAccessPlan {
163 let _timer_guard = file_metrics.page_index_eval_time.timer();
165 if self.predicates.is_empty() {
166 return access_plan;
167 }
168
169 let page_index_predicates = &self.predicates;
170 let groups = parquet_metadata.row_groups();
171
172 if groups.is_empty() {
173 return access_plan;
174 }
175
176 if parquet_metadata.offset_index().is_none()
177 || parquet_metadata.column_index().is_none()
178 {
179 debug!(
180 "Can not prune pages due to lack of indexes. Have offset: {}, column index: {}",
181 parquet_metadata.offset_index().is_some(), parquet_metadata.column_index().is_some()
182 );
183 return access_plan;
184 };
185
186 let mut total_skip = 0;
188 let mut total_select = 0;
190
191 let row_group_indexes = access_plan.row_group_indexes();
193 for row_group_index in row_group_indexes {
194 let mut overall_selection = None;
196 for predicate in page_index_predicates {
197 let column = predicate
198 .required_columns()
199 .single_column()
200 .expect("Page pruning requires single column predicates");
201
202 let converter = StatisticsConverter::try_new(
203 column.name(),
204 arrow_schema,
205 parquet_schema,
206 );
207
208 let converter = match converter {
209 Ok(converter) => converter,
210 Err(e) => {
211 debug!(
212 "Could not create statistics converter for column {}: {e}",
213 column.name()
214 );
215 continue;
216 }
217 };
218
219 let selection = prune_pages_in_one_row_group(
220 row_group_index,
221 predicate,
222 converter,
223 parquet_metadata,
224 file_metrics,
225 );
226
227 let Some(selection) = selection else {
228 trace!("No pages pruned in prune_pages_in_one_row_group");
229 continue;
230 };
231
232 debug!("Use filter and page index to create RowSelection {:?} from predicate: {:?}",
233 &selection,
234 predicate.predicate_expr(),
235 );
236
237 overall_selection = update_selection(overall_selection, selection);
238
239 let selects_any = overall_selection
242 .as_ref()
243 .map(|selection| selection.selects_any())
244 .unwrap_or(true);
245
246 if !selects_any {
247 break;
248 }
249 }
250
251 if let Some(overall_selection) = overall_selection {
252 let rows_selected = overall_selection.row_count();
253 if rows_selected > 0 {
254 let rows_skipped = overall_selection.skipped_row_count();
255 trace!("Overall selection from predicate skipped {rows_skipped}, selected {rows_selected}: {overall_selection:?}");
256 total_skip += rows_skipped;
257 total_select += rows_selected;
258 access_plan.scan_selection(row_group_index, overall_selection)
259 } else {
260 let rows_skipped = groups[row_group_index].num_rows() as usize;
262 access_plan.skip(row_group_index);
263 total_skip += rows_skipped;
264 trace!(
265 "Overall selection from predicate is empty, \
266 skipping all {rows_skipped} rows in row group {row_group_index}"
267 );
268 }
269 }
270 }
271
272 file_metrics.page_index_rows_pruned.add_pruned(total_skip);
273 file_metrics
274 .page_index_rows_pruned
275 .add_matched(total_select);
276 access_plan
277 }
278
279 pub fn filter_number(&self) -> usize {
281 self.predicates.len()
282 }
283}
284
285fn update_selection(
286 current_selection: Option<RowSelection>,
287 row_selection: RowSelection,
288) -> Option<RowSelection> {
289 match current_selection {
290 None => Some(row_selection),
291 Some(current_selection) => Some(current_selection.intersection(&row_selection)),
292 }
293}
294
295fn prune_pages_in_one_row_group(
303 row_group_index: usize,
304 pruning_predicate: &PruningPredicate,
305 converter: StatisticsConverter<'_>,
306 parquet_metadata: &ParquetMetaData,
307 metrics: &ParquetFileMetrics,
308) -> Option<RowSelection> {
309 let pruning_stats =
310 PagesPruningStatistics::try_new(row_group_index, converter, parquet_metadata)?;
311
312 let values = match pruning_predicate.prune(&pruning_stats) {
316 Ok(values) => values,
317 Err(e) => {
318 debug!("Error evaluating page index predicate values {e}");
319 metrics.predicate_evaluation_errors.add(1);
320 return None;
321 }
322 };
323
324 let Some(page_row_counts) = pruning_stats.page_row_counts() else {
327 debug!(
328 "Can not determine page row counts for row group {row_group_index}, skipping"
329 );
330 metrics.predicate_evaluation_errors.add(1);
331 return None;
332 };
333
334 let mut vec = Vec::with_capacity(values.len());
335 assert_eq!(page_row_counts.len(), values.len());
336 let mut sum_row = *page_row_counts.first().unwrap();
337 let mut selected = *values.first().unwrap();
338 trace!("Pruned to {values:?} using {pruning_stats:?}");
339 for (i, &f) in values.iter().enumerate().skip(1) {
340 if f == selected {
341 sum_row += *page_row_counts.get(i).unwrap();
342 } else {
343 let selector = if selected {
344 RowSelector::select(sum_row)
345 } else {
346 RowSelector::skip(sum_row)
347 };
348 vec.push(selector);
349 sum_row = *page_row_counts.get(i).unwrap();
350 selected = f;
351 }
352 }
353
354 let selector = if selected {
355 RowSelector::select(sum_row)
356 } else {
357 RowSelector::skip(sum_row)
358 };
359 vec.push(selector);
360 Some(RowSelection::from(vec))
361}
362
363#[derive(Debug)]
365struct PagesPruningStatistics<'a> {
366 row_group_index: usize,
367 row_group_metadatas: &'a [RowGroupMetaData],
368 converter: StatisticsConverter<'a>,
369 column_index: &'a ParquetColumnIndex,
370 offset_index: &'a ParquetOffsetIndex,
371 page_offsets: &'a Vec<PageLocation>,
372}
373
374impl<'a> PagesPruningStatistics<'a> {
375 fn try_new(
381 row_group_index: usize,
382 converter: StatisticsConverter<'a>,
383 parquet_metadata: &'a ParquetMetaData,
384 ) -> Option<Self> {
385 let Some(parquet_column_index) = converter.parquet_column_index() else {
386 trace!(
387 "Column {:?} not in parquet file, skipping",
388 converter.arrow_field()
389 );
390 return None;
391 };
392
393 let column_index = parquet_metadata.column_index()?;
394 let offset_index = parquet_metadata.offset_index()?;
395 let row_group_metadatas = parquet_metadata.row_groups();
396
397 let Some(row_group_page_offsets) = offset_index.get(row_group_index) else {
398 trace!("No page offsets for row group {row_group_index}, skipping");
399 return None;
400 };
401 let Some(offset_index_metadata) =
402 row_group_page_offsets.get(parquet_column_index)
403 else {
404 trace!(
405 "No page offsets for column {:?} in row group {row_group_index}, skipping",
406 converter.arrow_field()
407 );
408 return None;
409 };
410 let page_offsets = offset_index_metadata.page_locations();
411
412 Some(Self {
413 row_group_index,
414 row_group_metadatas,
415 converter,
416 column_index,
417 offset_index,
418 page_offsets,
419 })
420 }
421
422 fn page_row_counts(&self) -> Option<Vec<usize>> {
424 let row_group_metadata = self
425 .row_group_metadatas
426 .get(self.row_group_index)
427 .unwrap();
429
430 let num_rows_in_row_group = row_group_metadata.num_rows() as usize;
431
432 let page_offsets = self.page_offsets;
433 let mut vec = Vec::with_capacity(page_offsets.len());
434 page_offsets.windows(2).for_each(|x| {
435 let start = x[0].first_row_index as usize;
436 let end = x[1].first_row_index as usize;
437 vec.push(end - start);
438 });
439 vec.push(num_rows_in_row_group - page_offsets.last()?.first_row_index as usize);
440 Some(vec)
441 }
442}
443impl PruningStatistics for PagesPruningStatistics<'_> {
444 fn min_values(&self, _column: &datafusion_common::Column) -> Option<ArrayRef> {
445 match self.converter.data_page_mins(
446 self.column_index,
447 self.offset_index,
448 [&self.row_group_index],
449 ) {
450 Ok(min_values) => Some(min_values),
451 Err(e) => {
452 debug!("Error evaluating data page min values {e}");
453 None
454 }
455 }
456 }
457
458 fn max_values(&self, _column: &datafusion_common::Column) -> Option<ArrayRef> {
459 match self.converter.data_page_maxes(
460 self.column_index,
461 self.offset_index,
462 [&self.row_group_index],
463 ) {
464 Ok(min_values) => Some(min_values),
465 Err(e) => {
466 debug!("Error evaluating data page max values {e}");
467 None
468 }
469 }
470 }
471
472 fn num_containers(&self) -> usize {
473 self.page_offsets.len()
474 }
475
476 fn null_counts(&self, _column: &datafusion_common::Column) -> Option<ArrayRef> {
477 match self.converter.data_page_null_counts(
478 self.column_index,
479 self.offset_index,
480 [&self.row_group_index],
481 ) {
482 Ok(null_counts) => Some(Arc::new(null_counts)),
483 Err(e) => {
484 debug!("Error evaluating data page null counts {e}");
485 None
486 }
487 }
488 }
489
490 fn row_counts(&self, _column: &datafusion_common::Column) -> Option<ArrayRef> {
491 match self.converter.data_page_row_counts(
492 self.offset_index,
493 self.row_group_metadatas,
494 [&self.row_group_index],
495 ) {
496 Ok(row_counts) => row_counts.map(|a| Arc::new(a) as ArrayRef),
497 Err(e) => {
498 debug!("Error evaluating data page row counts {e}");
499 None
500 }
501 }
502 }
503
504 fn contained(
505 &self,
506 _column: &datafusion_common::Column,
507 _values: &HashSet<ScalarValue>,
508 ) -> Option<BooleanArray> {
509 None
510 }
511}