1use crate::{
22 apply_file_schema_type_coercions, coerce_int96_to_resolution, ObjectStoreFetch,
23};
24use arrow::array::{ArrayRef, BooleanArray};
25use arrow::compute::and;
26use arrow::compute::kernels::cmp::eq;
27use arrow::compute::sum;
28use arrow::datatypes::{DataType, Schema, SchemaRef, TimeUnit};
29use datafusion_common::encryption::FileDecryptionProperties;
30use datafusion_common::stats::Precision;
31use datafusion_common::{
32 ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics,
33};
34use datafusion_execution::cache::cache_manager::{FileMetadata, FileMetadataCache};
35use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumulator};
36use datafusion_physical_plan::Accumulator;
37use log::debug;
38use object_store::path::Path;
39use object_store::{ObjectMeta, ObjectStore};
40use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
41use parquet::arrow::parquet_to_arrow_schema;
42use parquet::file::metadata::{
43 PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData,
44};
45use std::any::Any;
46use std::collections::HashMap;
47use std::sync::Arc;
48
49#[derive(Debug)]
57pub struct DFParquetMetadata<'a> {
58 store: &'a dyn ObjectStore,
59 object_meta: &'a ObjectMeta,
60 metadata_size_hint: Option<usize>,
61 decryption_properties: Option<Arc<FileDecryptionProperties>>,
62 file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
63 pub coerce_int96: Option<TimeUnit>,
65}
66
67impl<'a> DFParquetMetadata<'a> {
68 pub fn new(store: &'a dyn ObjectStore, object_meta: &'a ObjectMeta) -> Self {
69 Self {
70 store,
71 object_meta,
72 metadata_size_hint: None,
73 decryption_properties: None,
74 file_metadata_cache: None,
75 coerce_int96: None,
76 }
77 }
78
79 pub fn with_metadata_size_hint(mut self, metadata_size_hint: Option<usize>) -> Self {
81 self.metadata_size_hint = metadata_size_hint;
82 self
83 }
84
85 pub fn with_decryption_properties(
87 mut self,
88 decryption_properties: Option<Arc<FileDecryptionProperties>>,
89 ) -> Self {
90 self.decryption_properties = decryption_properties;
91 self
92 }
93
94 pub fn with_file_metadata_cache(
96 mut self,
97 file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
98 ) -> Self {
99 self.file_metadata_cache = file_metadata_cache;
100 self
101 }
102
103 pub fn with_coerce_int96(mut self, time_unit: Option<TimeUnit>) -> Self {
105 self.coerce_int96 = time_unit;
106 self
107 }
108
109 pub async fn fetch_metadata(&self) -> Result<Arc<ParquetMetaData>> {
111 let Self {
112 store,
113 object_meta,
114 metadata_size_hint,
115 decryption_properties,
116 file_metadata_cache,
117 coerce_int96: _,
118 } = self;
119
120 let fetch = ObjectStoreFetch::new(*store, object_meta);
121
122 let cache_metadata =
124 !cfg!(feature = "parquet_encryption") || decryption_properties.is_none();
125
126 if cache_metadata {
127 if let Some(parquet_metadata) = file_metadata_cache
128 .as_ref()
129 .and_then(|file_metadata_cache| file_metadata_cache.get(object_meta))
130 .and_then(|file_metadata| {
131 file_metadata
132 .as_any()
133 .downcast_ref::<CachedParquetMetaData>()
134 .map(|cached_parquet_metadata| {
135 Arc::clone(cached_parquet_metadata.parquet_metadata())
136 })
137 })
138 {
139 return Ok(parquet_metadata);
140 }
141 }
142
143 let mut reader =
144 ParquetMetaDataReader::new().with_prefetch_hint(*metadata_size_hint);
145
146 #[cfg(feature = "parquet_encryption")]
147 if let Some(decryption_properties) = decryption_properties {
148 reader = reader
149 .with_decryption_properties(Some(Arc::clone(decryption_properties)));
150 }
151
152 if cache_metadata && file_metadata_cache.is_some() {
153 reader = reader.with_page_index_policy(PageIndexPolicy::Optional);
155 }
156
157 let metadata = Arc::new(
158 reader
159 .load_and_finish(fetch, object_meta.size)
160 .await
161 .map_err(DataFusionError::from)?,
162 );
163
164 if cache_metadata {
165 if let Some(file_metadata_cache) = file_metadata_cache {
166 file_metadata_cache.put(
167 object_meta,
168 Arc::new(CachedParquetMetaData::new(Arc::clone(&metadata))),
169 );
170 }
171 }
172
173 Ok(metadata)
174 }
175
176 pub async fn fetch_schema(&self) -> Result<Schema> {
178 let metadata = self.fetch_metadata().await?;
179
180 let file_metadata = metadata.file_metadata();
181 let schema = parquet_to_arrow_schema(
182 file_metadata.schema_descr(),
183 file_metadata.key_value_metadata(),
184 )?;
185 let schema = self
186 .coerce_int96
187 .as_ref()
188 .and_then(|time_unit| {
189 coerce_int96_to_resolution(
190 file_metadata.schema_descr(),
191 &schema,
192 time_unit,
193 )
194 })
195 .unwrap_or(schema);
196 Ok(schema)
197 }
198
199 pub(crate) async fn fetch_schema_with_location(&self) -> Result<(Path, Schema)> {
201 let loc_path = self.object_meta.location.clone();
202 let schema = self.fetch_schema().await?;
203 Ok((loc_path, schema))
204 }
205
206 pub async fn fetch_statistics(&self, table_schema: &SchemaRef) -> Result<Statistics> {
209 let metadata = self.fetch_metadata().await?;
210 Self::statistics_from_parquet_metadata(&metadata, table_schema)
211 }
212
213 pub fn statistics_from_parquet_metadata(
242 metadata: &ParquetMetaData,
243 table_schema: &SchemaRef,
244 ) -> Result<Statistics> {
245 let row_groups_metadata = metadata.row_groups();
246
247 let mut statistics = Statistics::new_unknown(table_schema);
248 let mut has_statistics = false;
249 let mut num_rows = 0_usize;
250 let mut total_byte_size = 0_usize;
251 for row_group_meta in row_groups_metadata {
252 num_rows += row_group_meta.num_rows() as usize;
253 total_byte_size += row_group_meta.total_byte_size() as usize;
254
255 if !has_statistics {
256 has_statistics = row_group_meta
257 .columns()
258 .iter()
259 .any(|column| column.statistics().is_some());
260 }
261 }
262 statistics.num_rows = Precision::Exact(num_rows);
263 statistics.total_byte_size = Precision::Exact(total_byte_size);
264
265 let file_metadata = metadata.file_metadata();
266 let mut file_schema = parquet_to_arrow_schema(
267 file_metadata.schema_descr(),
268 file_metadata.key_value_metadata(),
269 )?;
270
271 if let Some(merged) = apply_file_schema_type_coercions(table_schema, &file_schema)
272 {
273 file_schema = merged;
274 }
275
276 statistics.column_statistics = if has_statistics {
277 let (mut max_accs, mut min_accs) = create_max_min_accs(table_schema);
278 let mut null_counts_array =
279 vec![Precision::Exact(0); table_schema.fields().len()];
280 let mut is_max_value_exact = vec![Some(true); table_schema.fields().len()];
281 let mut is_min_value_exact = vec![Some(true); table_schema.fields().len()];
282 table_schema
283 .fields()
284 .iter()
285 .enumerate()
286 .for_each(|(idx, field)| {
287 match StatisticsConverter::try_new(
288 field.name(),
289 &file_schema,
290 file_metadata.schema_descr(),
291 ) {
292 Ok(stats_converter) => {
293 let mut accumulators = StatisticsAccumulators {
294 min_accs: &mut min_accs,
295 max_accs: &mut max_accs,
296 null_counts_array: &mut null_counts_array,
297 is_min_value_exact: &mut is_min_value_exact,
298 is_max_value_exact: &mut is_max_value_exact,
299 };
300 summarize_min_max_null_counts(
301 &mut accumulators,
302 idx,
303 &stats_converter,
304 row_groups_metadata,
305 )
306 .ok();
307 }
308 Err(e) => {
309 debug!("Failed to create statistics converter: {e}");
310 null_counts_array[idx] = Precision::Exact(num_rows);
311 }
312 }
313 });
314
315 get_col_stats(
316 table_schema,
317 null_counts_array,
318 &mut max_accs,
319 &mut min_accs,
320 &mut is_max_value_exact,
321 &mut is_min_value_exact,
322 )
323 } else {
324 Statistics::unknown_column(table_schema)
325 };
326
327 Ok(statistics)
328 }
329}
330
331fn min_max_aggregate_data_type(input_type: &DataType) -> &DataType {
336 if let DataType::Dictionary(_, value_type) = input_type {
337 value_type.as_ref()
338 } else {
339 input_type
340 }
341}
342
343fn create_max_min_accs(
344 schema: &Schema,
345) -> (Vec<Option<MaxAccumulator>>, Vec<Option<MinAccumulator>>) {
346 let max_values: Vec<Option<MaxAccumulator>> = schema
347 .fields()
348 .iter()
349 .map(|field| {
350 MaxAccumulator::try_new(min_max_aggregate_data_type(field.data_type())).ok()
351 })
352 .collect();
353 let min_values: Vec<Option<MinAccumulator>> = schema
354 .fields()
355 .iter()
356 .map(|field| {
357 MinAccumulator::try_new(min_max_aggregate_data_type(field.data_type())).ok()
358 })
359 .collect();
360 (max_values, min_values)
361}
362
363fn get_col_stats(
364 schema: &Schema,
365 null_counts: Vec<Precision<usize>>,
366 max_values: &mut [Option<MaxAccumulator>],
367 min_values: &mut [Option<MinAccumulator>],
368 is_max_value_exact: &mut [Option<bool>],
369 is_min_value_exact: &mut [Option<bool>],
370) -> Vec<ColumnStatistics> {
371 (0..schema.fields().len())
372 .map(|i| {
373 let max_value = match (
374 max_values.get_mut(i).unwrap(),
375 is_max_value_exact.get(i).unwrap(),
376 ) {
377 (Some(max_value), Some(true)) => {
378 max_value.evaluate().ok().map(Precision::Exact)
379 }
380 (Some(max_value), Some(false)) | (Some(max_value), None) => {
381 max_value.evaluate().ok().map(Precision::Inexact)
382 }
383 (None, _) => None,
384 };
385 let min_value = match (
386 min_values.get_mut(i).unwrap(),
387 is_min_value_exact.get(i).unwrap(),
388 ) {
389 (Some(min_value), Some(true)) => {
390 min_value.evaluate().ok().map(Precision::Exact)
391 }
392 (Some(min_value), Some(false)) | (Some(min_value), None) => {
393 min_value.evaluate().ok().map(Precision::Inexact)
394 }
395 (None, _) => None,
396 };
397 ColumnStatistics {
398 null_count: null_counts[i],
399 max_value: max_value.unwrap_or(Precision::Absent),
400 min_value: min_value.unwrap_or(Precision::Absent),
401 sum_value: Precision::Absent,
402 distinct_count: Precision::Absent,
403 }
404 })
405 .collect()
406}
407
408struct StatisticsAccumulators<'a> {
410 min_accs: &'a mut [Option<MinAccumulator>],
411 max_accs: &'a mut [Option<MaxAccumulator>],
412 null_counts_array: &'a mut [Precision<usize>],
413 is_min_value_exact: &'a mut [Option<bool>],
414 is_max_value_exact: &'a mut [Option<bool>],
415}
416
417fn summarize_min_max_null_counts(
418 accumulators: &mut StatisticsAccumulators,
419 arrow_schema_index: usize,
420 stats_converter: &StatisticsConverter,
421 row_groups_metadata: &[RowGroupMetaData],
422) -> Result<()> {
423 let max_values = stats_converter.row_group_maxes(row_groups_metadata)?;
424 let min_values = stats_converter.row_group_mins(row_groups_metadata)?;
425 let null_counts = stats_converter.row_group_null_counts(row_groups_metadata)?;
426 let is_max_value_exact_stat =
427 stats_converter.row_group_is_max_value_exact(row_groups_metadata)?;
428 let is_min_value_exact_stat =
429 stats_converter.row_group_is_min_value_exact(row_groups_metadata)?;
430
431 if let Some(max_acc) = &mut accumulators.max_accs[arrow_schema_index] {
432 max_acc.update_batch(&[Arc::clone(&max_values)])?;
433 let mut cur_max_acc = max_acc.clone();
434 accumulators.is_max_value_exact[arrow_schema_index] = has_any_exact_match(
435 cur_max_acc.evaluate()?,
436 max_values,
437 is_max_value_exact_stat,
438 );
439 }
440
441 if let Some(min_acc) = &mut accumulators.min_accs[arrow_schema_index] {
442 min_acc.update_batch(&[Arc::clone(&min_values)])?;
443 let mut cur_min_acc = min_acc.clone();
444 accumulators.is_min_value_exact[arrow_schema_index] = has_any_exact_match(
445 cur_min_acc.evaluate()?,
446 min_values,
447 is_min_value_exact_stat,
448 );
449 }
450
451 accumulators.null_counts_array[arrow_schema_index] = match sum(&null_counts) {
452 Some(null_count) => Precision::Exact(null_count as usize),
453 None => match null_counts.len() {
454 0 => Precision::Exact(0),
456 _ => Precision::Absent,
457 },
458 };
459
460 Ok(())
461}
462
463fn has_any_exact_match(
478 value: ScalarValue,
479 array: ArrayRef,
480 exactness: BooleanArray,
481) -> Option<bool> {
482 let scalar_array = value.to_scalar().ok()?;
483 let eq_mask = eq(&scalar_array, &array).ok()?;
484 let combined_mask = and(&eq_mask, &exactness).ok()?;
485 Some(combined_mask.true_count() > 0)
486}
487
488pub struct CachedParquetMetaData(Arc<ParquetMetaData>);
490
491impl CachedParquetMetaData {
492 pub fn new(metadata: Arc<ParquetMetaData>) -> Self {
493 Self(metadata)
494 }
495
496 pub fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
497 &self.0
498 }
499}
500
501impl FileMetadata for CachedParquetMetaData {
502 fn as_any(&self) -> &dyn Any {
503 self
504 }
505
506 fn memory_size(&self) -> usize {
507 self.0.memory_size()
508 }
509
510 fn extra_info(&self) -> HashMap<String, String> {
511 let page_index =
512 self.0.column_index().is_some() && self.0.offset_index().is_some();
513 HashMap::from([("page_index".to_owned(), page_index.to_string())])
514 }
515}
516
517#[cfg(test)]
518mod tests {
519 use super::*;
520 use arrow::array::{ArrayRef, BooleanArray, Int32Array};
521 use datafusion_common::ScalarValue;
522 use std::sync::Arc;
523
524 #[test]
525 fn test_has_any_exact_match() {
526 {
528 let computed_min = ScalarValue::Int32(Some(0));
529 let row_group_mins =
530 Arc::new(Int32Array::from(vec![0, 1, 0, 3, 0, 5])) as ArrayRef;
531 let exactness =
532 BooleanArray::from(vec![true, false, false, false, false, false]);
533
534 let result = has_any_exact_match(computed_min, row_group_mins, exactness);
535 assert_eq!(result, Some(true));
536 }
537 {
539 let computed_min = ScalarValue::Int32(Some(0));
540 let row_group_mins =
541 Arc::new(Int32Array::from(vec![0, 1, 0, 3, 0, 5])) as ArrayRef;
542 let exactness =
543 BooleanArray::from(vec![false, false, false, false, false, false]);
544
545 let result = has_any_exact_match(computed_min, row_group_mins, exactness);
546 assert_eq!(result, Some(false));
547 }
548 {
550 let computed_max = ScalarValue::Int32(Some(5));
551 let row_group_maxes =
552 Arc::new(Int32Array::from(vec![1, 5, 3, 5, 2, 5])) as ArrayRef;
553 let exactness =
554 BooleanArray::from(vec![false, true, true, true, false, true]);
555
556 let result = has_any_exact_match(computed_max, row_group_maxes, exactness);
557 assert_eq!(result, Some(true));
558 }
559 {
561 let computed_max = ScalarValue::Int32(None);
562 let row_group_maxes =
563 Arc::new(Int32Array::from(vec![None, None, None, None])) as ArrayRef;
564 let exactness = BooleanArray::from(vec![None, Some(true), None, Some(false)]);
565
566 let result = has_any_exact_match(computed_max, row_group_maxes, exactness);
567 assert_eq!(result, Some(false));
568 }
569 }
570}