datafusion_datasource_parquet/
metadata.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DFParquetMetadata`] for fetching Parquet file metadata, statistics
19//! and schema information.
20
21use crate::{
22    apply_file_schema_type_coercions, coerce_int96_to_resolution, ObjectStoreFetch,
23};
24use arrow::array::{ArrayRef, BooleanArray};
25use arrow::compute::and;
26use arrow::compute::kernels::cmp::eq;
27use arrow::compute::sum;
28use arrow::datatypes::{DataType, Schema, SchemaRef, TimeUnit};
29use datafusion_common::encryption::FileDecryptionProperties;
30use datafusion_common::stats::Precision;
31use datafusion_common::{
32    ColumnStatistics, DataFusionError, Result, ScalarValue, Statistics,
33};
34use datafusion_execution::cache::cache_manager::{FileMetadata, FileMetadataCache};
35use datafusion_functions_aggregate_common::min_max::{MaxAccumulator, MinAccumulator};
36use datafusion_physical_plan::Accumulator;
37use log::debug;
38use object_store::path::Path;
39use object_store::{ObjectMeta, ObjectStore};
40use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
41use parquet::arrow::parquet_to_arrow_schema;
42use parquet::file::metadata::{
43    PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData,
44};
45use std::any::Any;
46use std::collections::HashMap;
47use std::sync::Arc;
48
49/// Handles fetching Parquet file schema, metadata and statistics
50/// from object store.
51///
52/// This component is exposed for low level integrations through
53/// [`ParquetFileReaderFactory`].
54///
55/// [`ParquetFileReaderFactory`]: crate::ParquetFileReaderFactory
56#[derive(Debug)]
57pub struct DFParquetMetadata<'a> {
58    store: &'a dyn ObjectStore,
59    object_meta: &'a ObjectMeta,
60    metadata_size_hint: Option<usize>,
61    decryption_properties: Option<Arc<FileDecryptionProperties>>,
62    file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
63    /// timeunit to coerce INT96 timestamps to
64    pub coerce_int96: Option<TimeUnit>,
65}
66
67impl<'a> DFParquetMetadata<'a> {
68    pub fn new(store: &'a dyn ObjectStore, object_meta: &'a ObjectMeta) -> Self {
69        Self {
70            store,
71            object_meta,
72            metadata_size_hint: None,
73            decryption_properties: None,
74            file_metadata_cache: None,
75            coerce_int96: None,
76        }
77    }
78
79    /// set metadata size hint
80    pub fn with_metadata_size_hint(mut self, metadata_size_hint: Option<usize>) -> Self {
81        self.metadata_size_hint = metadata_size_hint;
82        self
83    }
84
85    /// set decryption properties
86    pub fn with_decryption_properties(
87        mut self,
88        decryption_properties: Option<Arc<FileDecryptionProperties>>,
89    ) -> Self {
90        self.decryption_properties = decryption_properties;
91        self
92    }
93
94    /// set file metadata cache
95    pub fn with_file_metadata_cache(
96        mut self,
97        file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
98    ) -> Self {
99        self.file_metadata_cache = file_metadata_cache;
100        self
101    }
102
103    /// Set timeunit to coerce INT96 timestamps to
104    pub fn with_coerce_int96(mut self, time_unit: Option<TimeUnit>) -> Self {
105        self.coerce_int96 = time_unit;
106        self
107    }
108
109    /// Fetch parquet metadata from the remote object store
110    pub async fn fetch_metadata(&self) -> Result<Arc<ParquetMetaData>> {
111        let Self {
112            store,
113            object_meta,
114            metadata_size_hint,
115            decryption_properties,
116            file_metadata_cache,
117            coerce_int96: _,
118        } = self;
119
120        let fetch = ObjectStoreFetch::new(*store, object_meta);
121
122        // implementation to fetch parquet metadata
123        let cache_metadata =
124            !cfg!(feature = "parquet_encryption") || decryption_properties.is_none();
125
126        if cache_metadata {
127            if let Some(parquet_metadata) = file_metadata_cache
128                .as_ref()
129                .and_then(|file_metadata_cache| file_metadata_cache.get(object_meta))
130                .and_then(|file_metadata| {
131                    file_metadata
132                        .as_any()
133                        .downcast_ref::<CachedParquetMetaData>()
134                        .map(|cached_parquet_metadata| {
135                            Arc::clone(cached_parquet_metadata.parquet_metadata())
136                        })
137                })
138            {
139                return Ok(parquet_metadata);
140            }
141        }
142
143        let mut reader =
144            ParquetMetaDataReader::new().with_prefetch_hint(*metadata_size_hint);
145
146        #[cfg(feature = "parquet_encryption")]
147        if let Some(decryption_properties) = decryption_properties {
148            reader = reader
149                .with_decryption_properties(Some(Arc::clone(decryption_properties)));
150        }
151
152        if cache_metadata && file_metadata_cache.is_some() {
153            // Need to retrieve the entire metadata for the caching to be effective.
154            reader = reader.with_page_index_policy(PageIndexPolicy::Optional);
155        }
156
157        let metadata = Arc::new(
158            reader
159                .load_and_finish(fetch, object_meta.size)
160                .await
161                .map_err(DataFusionError::from)?,
162        );
163
164        if cache_metadata {
165            if let Some(file_metadata_cache) = file_metadata_cache {
166                file_metadata_cache.put(
167                    object_meta,
168                    Arc::new(CachedParquetMetaData::new(Arc::clone(&metadata))),
169                );
170            }
171        }
172
173        Ok(metadata)
174    }
175
176    /// Read and parse the schema of the Parquet file
177    pub async fn fetch_schema(&self) -> Result<Schema> {
178        let metadata = self.fetch_metadata().await?;
179
180        let file_metadata = metadata.file_metadata();
181        let schema = parquet_to_arrow_schema(
182            file_metadata.schema_descr(),
183            file_metadata.key_value_metadata(),
184        )?;
185        let schema = self
186            .coerce_int96
187            .as_ref()
188            .and_then(|time_unit| {
189                coerce_int96_to_resolution(
190                    file_metadata.schema_descr(),
191                    &schema,
192                    time_unit,
193                )
194            })
195            .unwrap_or(schema);
196        Ok(schema)
197    }
198
199    /// Return (path, schema) tuple by fetching the schema from Parquet file
200    pub(crate) async fn fetch_schema_with_location(&self) -> Result<(Path, Schema)> {
201        let loc_path = self.object_meta.location.clone();
202        let schema = self.fetch_schema().await?;
203        Ok((loc_path, schema))
204    }
205
206    /// Fetch the metadata from the Parquet file via [`Self::fetch_metadata`] and convert
207    /// the statistics in the metadata using [`Self::statistics_from_parquet_metadata`]
208    pub async fn fetch_statistics(&self, table_schema: &SchemaRef) -> Result<Statistics> {
209        let metadata = self.fetch_metadata().await?;
210        Self::statistics_from_parquet_metadata(&metadata, table_schema)
211    }
212
213    /// Convert statistics in [`ParquetMetaData`] into [`Statistics`] using [`StatisticsConverter`]
214    ///
215    /// The statistics are calculated for each column in the table schema
216    /// using the row group statistics in the parquet metadata.
217    ///
218    /// # Key behaviors:
219    ///
220    /// 1. Extracts row counts and byte sizes from all row groups
221    /// 2. Applies schema type coercions to align file schema with table schema
222    /// 3. Collects and aggregates statistics across row groups when available
223    ///
224    /// # When there are no statistics:
225    ///
226    /// If the Parquet file doesn't contain any statistics (has_statistics is false), the function returns a Statistics object with:
227    /// - Exact row count
228    /// - Exact byte size
229    /// - All column statistics marked as unknown via Statistics::unknown_column(&table_schema)
230    /// # When only some columns have statistics:
231    ///
232    /// For columns with statistics:
233    /// - Min/max values are properly extracted and represented as Precision::Exact
234    /// - Null counts are calculated by summing across row groups
235    ///
236    /// For columns without statistics,
237    /// - For min/max, there are two situations:
238    ///     1. The column isn't in arrow schema, then min/max values are set to Precision::Absent
239    ///     2. The column is in arrow schema, but not in parquet schema due to schema revolution, min/max values are set to Precision::Exact(null)
240    /// - Null counts are set to Precision::Exact(num_rows) (conservatively assuming all values could be null)
241    pub fn statistics_from_parquet_metadata(
242        metadata: &ParquetMetaData,
243        table_schema: &SchemaRef,
244    ) -> Result<Statistics> {
245        let row_groups_metadata = metadata.row_groups();
246
247        let mut statistics = Statistics::new_unknown(table_schema);
248        let mut has_statistics = false;
249        let mut num_rows = 0_usize;
250        let mut total_byte_size = 0_usize;
251        for row_group_meta in row_groups_metadata {
252            num_rows += row_group_meta.num_rows() as usize;
253            total_byte_size += row_group_meta.total_byte_size() as usize;
254
255            if !has_statistics {
256                has_statistics = row_group_meta
257                    .columns()
258                    .iter()
259                    .any(|column| column.statistics().is_some());
260            }
261        }
262        statistics.num_rows = Precision::Exact(num_rows);
263        statistics.total_byte_size = Precision::Exact(total_byte_size);
264
265        let file_metadata = metadata.file_metadata();
266        let mut file_schema = parquet_to_arrow_schema(
267            file_metadata.schema_descr(),
268            file_metadata.key_value_metadata(),
269        )?;
270
271        if let Some(merged) = apply_file_schema_type_coercions(table_schema, &file_schema)
272        {
273            file_schema = merged;
274        }
275
276        statistics.column_statistics = if has_statistics {
277            let (mut max_accs, mut min_accs) = create_max_min_accs(table_schema);
278            let mut null_counts_array =
279                vec![Precision::Exact(0); table_schema.fields().len()];
280            let mut is_max_value_exact = vec![Some(true); table_schema.fields().len()];
281            let mut is_min_value_exact = vec![Some(true); table_schema.fields().len()];
282            table_schema
283                .fields()
284                .iter()
285                .enumerate()
286                .for_each(|(idx, field)| {
287                    match StatisticsConverter::try_new(
288                        field.name(),
289                        &file_schema,
290                        file_metadata.schema_descr(),
291                    ) {
292                        Ok(stats_converter) => {
293                            let mut accumulators = StatisticsAccumulators {
294                                min_accs: &mut min_accs,
295                                max_accs: &mut max_accs,
296                                null_counts_array: &mut null_counts_array,
297                                is_min_value_exact: &mut is_min_value_exact,
298                                is_max_value_exact: &mut is_max_value_exact,
299                            };
300                            summarize_min_max_null_counts(
301                                &mut accumulators,
302                                idx,
303                                &stats_converter,
304                                row_groups_metadata,
305                            )
306                            .ok();
307                        }
308                        Err(e) => {
309                            debug!("Failed to create statistics converter: {e}");
310                            null_counts_array[idx] = Precision::Exact(num_rows);
311                        }
312                    }
313                });
314
315            get_col_stats(
316                table_schema,
317                null_counts_array,
318                &mut max_accs,
319                &mut min_accs,
320                &mut is_max_value_exact,
321                &mut is_min_value_exact,
322            )
323        } else {
324            Statistics::unknown_column(table_schema)
325        };
326
327        Ok(statistics)
328    }
329}
330
331/// Min/max aggregation can take Dictionary encode input but always produces unpacked
332/// (aka non Dictionary) output. We need to adjust the output data type to reflect this.
333/// The reason min/max aggregate produces unpacked output because there is only one
334/// min/max value per group; there is no needs to keep them Dictionary encoded
335fn min_max_aggregate_data_type(input_type: &DataType) -> &DataType {
336    if let DataType::Dictionary(_, value_type) = input_type {
337        value_type.as_ref()
338    } else {
339        input_type
340    }
341}
342
343fn create_max_min_accs(
344    schema: &Schema,
345) -> (Vec<Option<MaxAccumulator>>, Vec<Option<MinAccumulator>>) {
346    let max_values: Vec<Option<MaxAccumulator>> = schema
347        .fields()
348        .iter()
349        .map(|field| {
350            MaxAccumulator::try_new(min_max_aggregate_data_type(field.data_type())).ok()
351        })
352        .collect();
353    let min_values: Vec<Option<MinAccumulator>> = schema
354        .fields()
355        .iter()
356        .map(|field| {
357            MinAccumulator::try_new(min_max_aggregate_data_type(field.data_type())).ok()
358        })
359        .collect();
360    (max_values, min_values)
361}
362
363fn get_col_stats(
364    schema: &Schema,
365    null_counts: Vec<Precision<usize>>,
366    max_values: &mut [Option<MaxAccumulator>],
367    min_values: &mut [Option<MinAccumulator>],
368    is_max_value_exact: &mut [Option<bool>],
369    is_min_value_exact: &mut [Option<bool>],
370) -> Vec<ColumnStatistics> {
371    (0..schema.fields().len())
372        .map(|i| {
373            let max_value = match (
374                max_values.get_mut(i).unwrap(),
375                is_max_value_exact.get(i).unwrap(),
376            ) {
377                (Some(max_value), Some(true)) => {
378                    max_value.evaluate().ok().map(Precision::Exact)
379                }
380                (Some(max_value), Some(false)) | (Some(max_value), None) => {
381                    max_value.evaluate().ok().map(Precision::Inexact)
382                }
383                (None, _) => None,
384            };
385            let min_value = match (
386                min_values.get_mut(i).unwrap(),
387                is_min_value_exact.get(i).unwrap(),
388            ) {
389                (Some(min_value), Some(true)) => {
390                    min_value.evaluate().ok().map(Precision::Exact)
391                }
392                (Some(min_value), Some(false)) | (Some(min_value), None) => {
393                    min_value.evaluate().ok().map(Precision::Inexact)
394                }
395                (None, _) => None,
396            };
397            ColumnStatistics {
398                null_count: null_counts[i],
399                max_value: max_value.unwrap_or(Precision::Absent),
400                min_value: min_value.unwrap_or(Precision::Absent),
401                sum_value: Precision::Absent,
402                distinct_count: Precision::Absent,
403            }
404        })
405        .collect()
406}
407
408/// Holds the accumulator state for collecting statistics from row groups
409struct StatisticsAccumulators<'a> {
410    min_accs: &'a mut [Option<MinAccumulator>],
411    max_accs: &'a mut [Option<MaxAccumulator>],
412    null_counts_array: &'a mut [Precision<usize>],
413    is_min_value_exact: &'a mut [Option<bool>],
414    is_max_value_exact: &'a mut [Option<bool>],
415}
416
417fn summarize_min_max_null_counts(
418    accumulators: &mut StatisticsAccumulators,
419    arrow_schema_index: usize,
420    stats_converter: &StatisticsConverter,
421    row_groups_metadata: &[RowGroupMetaData],
422) -> Result<()> {
423    let max_values = stats_converter.row_group_maxes(row_groups_metadata)?;
424    let min_values = stats_converter.row_group_mins(row_groups_metadata)?;
425    let null_counts = stats_converter.row_group_null_counts(row_groups_metadata)?;
426    let is_max_value_exact_stat =
427        stats_converter.row_group_is_max_value_exact(row_groups_metadata)?;
428    let is_min_value_exact_stat =
429        stats_converter.row_group_is_min_value_exact(row_groups_metadata)?;
430
431    if let Some(max_acc) = &mut accumulators.max_accs[arrow_schema_index] {
432        max_acc.update_batch(&[Arc::clone(&max_values)])?;
433        let mut cur_max_acc = max_acc.clone();
434        accumulators.is_max_value_exact[arrow_schema_index] = has_any_exact_match(
435            cur_max_acc.evaluate()?,
436            max_values,
437            is_max_value_exact_stat,
438        );
439    }
440
441    if let Some(min_acc) = &mut accumulators.min_accs[arrow_schema_index] {
442        min_acc.update_batch(&[Arc::clone(&min_values)])?;
443        let mut cur_min_acc = min_acc.clone();
444        accumulators.is_min_value_exact[arrow_schema_index] = has_any_exact_match(
445            cur_min_acc.evaluate()?,
446            min_values,
447            is_min_value_exact_stat,
448        );
449    }
450
451    accumulators.null_counts_array[arrow_schema_index] = match sum(&null_counts) {
452        Some(null_count) => Precision::Exact(null_count as usize),
453        None => match null_counts.len() {
454            // If sum() returned None we either have no rows or all values are null
455            0 => Precision::Exact(0),
456            _ => Precision::Absent,
457        },
458    };
459
460    Ok(())
461}
462
463/// Checks if any occurrence of `value` in `array` corresponds to a `true`
464/// entry in the `exactness` array.
465///
466/// This is used to determine if a calculated statistic (e.g., min or max)
467/// is exact, by checking if at least one of its source values was exact.
468///
469/// # Example
470/// - `value`: `0`
471/// - `array`: `[0, 1, 0, 3, 0, 5]`
472/// - `exactness`: `[true, false, false, false, false, false]`
473///
474/// The value `0` appears at indices `[0, 2, 4]`. The corresponding exactness
475/// values are `[true, false, false]`. Since at least one is `true`, the
476/// function returns `Some(true)`.
477fn has_any_exact_match(
478    value: ScalarValue,
479    array: ArrayRef,
480    exactness: BooleanArray,
481) -> Option<bool> {
482    let scalar_array = value.to_scalar().ok()?;
483    let eq_mask = eq(&scalar_array, &array).ok()?;
484    let combined_mask = and(&eq_mask, &exactness).ok()?;
485    Some(combined_mask.true_count() > 0)
486}
487
488/// Wrapper to implement [`FileMetadata`] for [`ParquetMetaData`].
489pub struct CachedParquetMetaData(Arc<ParquetMetaData>);
490
491impl CachedParquetMetaData {
492    pub fn new(metadata: Arc<ParquetMetaData>) -> Self {
493        Self(metadata)
494    }
495
496    pub fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
497        &self.0
498    }
499}
500
501impl FileMetadata for CachedParquetMetaData {
502    fn as_any(&self) -> &dyn Any {
503        self
504    }
505
506    fn memory_size(&self) -> usize {
507        self.0.memory_size()
508    }
509
510    fn extra_info(&self) -> HashMap<String, String> {
511        let page_index =
512            self.0.column_index().is_some() && self.0.offset_index().is_some();
513        HashMap::from([("page_index".to_owned(), page_index.to_string())])
514    }
515}
516
517#[cfg(test)]
518mod tests {
519    use super::*;
520    use arrow::array::{ArrayRef, BooleanArray, Int32Array};
521    use datafusion_common::ScalarValue;
522    use std::sync::Arc;
523
524    #[test]
525    fn test_has_any_exact_match() {
526        // Case 1: Mixed exact and inexact matches
527        {
528            let computed_min = ScalarValue::Int32(Some(0));
529            let row_group_mins =
530                Arc::new(Int32Array::from(vec![0, 1, 0, 3, 0, 5])) as ArrayRef;
531            let exactness =
532                BooleanArray::from(vec![true, false, false, false, false, false]);
533
534            let result = has_any_exact_match(computed_min, row_group_mins, exactness);
535            assert_eq!(result, Some(true));
536        }
537        // Case 2: All inexact matches
538        {
539            let computed_min = ScalarValue::Int32(Some(0));
540            let row_group_mins =
541                Arc::new(Int32Array::from(vec![0, 1, 0, 3, 0, 5])) as ArrayRef;
542            let exactness =
543                BooleanArray::from(vec![false, false, false, false, false, false]);
544
545            let result = has_any_exact_match(computed_min, row_group_mins, exactness);
546            assert_eq!(result, Some(false));
547        }
548        // Case 3: All exact matches
549        {
550            let computed_max = ScalarValue::Int32(Some(5));
551            let row_group_maxes =
552                Arc::new(Int32Array::from(vec![1, 5, 3, 5, 2, 5])) as ArrayRef;
553            let exactness =
554                BooleanArray::from(vec![false, true, true, true, false, true]);
555
556            let result = has_any_exact_match(computed_max, row_group_maxes, exactness);
557            assert_eq!(result, Some(true));
558        }
559        // Case 4: All maxes are null values
560        {
561            let computed_max = ScalarValue::Int32(None);
562            let row_group_maxes =
563                Arc::new(Int32Array::from(vec![None, None, None, None])) as ArrayRef;
564            let exactness = BooleanArray::from(vec![None, Some(true), None, Some(false)]);
565
566            let result = has_any_exact_match(computed_max, row_group_maxes, exactness);
567            assert_eq!(result, Some(false));
568        }
569    }
570}