datafusion_datasource_parquet/
reader.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ParquetFileReaderFactory`] and [`DefaultParquetFileReaderFactory`] for
19//! low level control of parquet file readers
20
21use crate::metadata::DFParquetMetadata;
22use crate::ParquetFileMetrics;
23use bytes::Bytes;
24use datafusion_datasource::PartitionedFile;
25use datafusion_execution::cache::cache_manager::FileMetadata;
26use datafusion_execution::cache::cache_manager::FileMetadataCache;
27use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
28use futures::future::BoxFuture;
29use futures::FutureExt;
30use object_store::ObjectStore;
31use parquet::arrow::arrow_reader::ArrowReaderOptions;
32use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
33use parquet::file::metadata::ParquetMetaData;
34use std::any::Any;
35use std::collections::HashMap;
36use std::fmt::Debug;
37use std::ops::Range;
38use std::sync::Arc;
39
40/// Interface for reading parquet files.
41///
42/// The combined implementations of [`ParquetFileReaderFactory`] and
43/// [`AsyncFileReader`] can be used to provide custom data access operations
44/// such as pre-cached metadata, I/O coalescing, etc.
45///
46/// See [`DefaultParquetFileReaderFactory`] for a simple implementation.
47pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static {
48    /// Provides an `AsyncFileReader` for reading data from a parquet file specified
49    ///
50    /// # Notes
51    ///
52    /// If the resulting [`AsyncFileReader`]  returns `ParquetMetaData` without
53    /// page index information, the reader will load it on demand. Thus it is important
54    /// to ensure that the returned `ParquetMetaData` has the necessary information
55    /// if you wish to avoid a subsequent I/O
56    ///
57    /// # Arguments
58    /// * partition_index - Index of the partition (for reporting metrics)
59    /// * file - The file to be read
60    /// * metadata_size_hint - If specified, the first IO reads this many bytes from the footer
61    /// * metrics - Execution metrics
62    fn create_reader(
63        &self,
64        partition_index: usize,
65        partitioned_file: PartitionedFile,
66        metadata_size_hint: Option<usize>,
67        metrics: &ExecutionPlanMetricsSet,
68    ) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>>;
69}
70
71/// Default implementation of [`ParquetFileReaderFactory`]
72///
73/// This implementation:
74/// 1. Reads parquet directly from an underlying [`ObjectStore`] instance.
75/// 2. Reads the footer and page metadata on demand.
76/// 3. Does not cache metadata or coalesce I/O operations.
77#[derive(Debug)]
78pub struct DefaultParquetFileReaderFactory {
79    store: Arc<dyn ObjectStore>,
80}
81
82impl DefaultParquetFileReaderFactory {
83    /// Create a new `DefaultParquetFileReaderFactory`.
84    pub fn new(store: Arc<dyn ObjectStore>) -> Self {
85        Self { store }
86    }
87}
88
89/// Implements [`AsyncFileReader`] for a parquet file in object storage.
90///
91/// This implementation uses the [`ParquetObjectReader`] to read data from the
92/// object store on demand, as required, tracking the number of bytes read.
93///
94/// This implementation does not coalesce I/O operations or cache bytes. Such
95/// optimizations can be done either at the object store level or by providing a
96/// custom implementation of [`ParquetFileReaderFactory`].
97pub struct ParquetFileReader {
98    pub file_metrics: ParquetFileMetrics,
99    pub inner: ParquetObjectReader,
100}
101
102impl AsyncFileReader for ParquetFileReader {
103    fn get_bytes(
104        &mut self,
105        range: Range<u64>,
106    ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
107        let bytes_scanned = range.end - range.start;
108        self.file_metrics.bytes_scanned.add(bytes_scanned as usize);
109        self.inner.get_bytes(range)
110    }
111
112    fn get_byte_ranges(
113        &mut self,
114        ranges: Vec<Range<u64>>,
115    ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>>
116    where
117        Self: Send,
118    {
119        let total: u64 = ranges.iter().map(|r| r.end - r.start).sum();
120        self.file_metrics.bytes_scanned.add(total as usize);
121        self.inner.get_byte_ranges(ranges)
122    }
123
124    fn get_metadata<'a>(
125        &'a mut self,
126        options: Option<&'a ArrowReaderOptions>,
127    ) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
128        self.inner.get_metadata(options)
129    }
130}
131
132impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
133    fn create_reader(
134        &self,
135        partition_index: usize,
136        partitioned_file: PartitionedFile,
137        metadata_size_hint: Option<usize>,
138        metrics: &ExecutionPlanMetricsSet,
139    ) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>> {
140        let file_metrics = ParquetFileMetrics::new(
141            partition_index,
142            partitioned_file.object_meta.location.as_ref(),
143            metrics,
144        );
145        let store = Arc::clone(&self.store);
146        let mut inner = ParquetObjectReader::new(
147            store,
148            partitioned_file.object_meta.location.clone(),
149        )
150        .with_file_size(partitioned_file.object_meta.size);
151
152        if let Some(hint) = metadata_size_hint {
153            inner = inner.with_footer_size_hint(hint)
154        };
155
156        Ok(Box::new(ParquetFileReader {
157            inner,
158            file_metrics,
159        }))
160    }
161}
162
163/// Implementation of [`ParquetFileReaderFactory`] supporting the caching of footer and page
164/// metadata. Reads and updates the [`FileMetadataCache`] with the [`ParquetMetaData`] data.
165/// This reader always loads the entire metadata (including page index, unless the file is
166/// encrypted), even if not required by the current query, to ensure it is always available for
167/// those that need it.
168#[derive(Debug)]
169pub struct CachedParquetFileReaderFactory {
170    store: Arc<dyn ObjectStore>,
171    metadata_cache: Arc<dyn FileMetadataCache>,
172}
173
174impl CachedParquetFileReaderFactory {
175    pub fn new(
176        store: Arc<dyn ObjectStore>,
177        metadata_cache: Arc<dyn FileMetadataCache>,
178    ) -> Self {
179        Self {
180            store,
181            metadata_cache,
182        }
183    }
184}
185
186impl ParquetFileReaderFactory for CachedParquetFileReaderFactory {
187    fn create_reader(
188        &self,
189        partition_index: usize,
190        partitioned_file: PartitionedFile,
191        metadata_size_hint: Option<usize>,
192        metrics: &ExecutionPlanMetricsSet,
193    ) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>> {
194        let file_metrics = ParquetFileMetrics::new(
195            partition_index,
196            partitioned_file.object_meta.location.as_ref(),
197            metrics,
198        );
199        let store = Arc::clone(&self.store);
200
201        let mut inner = ParquetObjectReader::new(
202            store,
203            partitioned_file.object_meta.location.clone(),
204        )
205        .with_file_size(partitioned_file.object_meta.size);
206
207        if let Some(hint) = metadata_size_hint {
208            inner = inner.with_footer_size_hint(hint)
209        };
210
211        Ok(Box::new(CachedParquetFileReader {
212            store: Arc::clone(&self.store),
213            inner,
214            file_metrics,
215            partitioned_file,
216            metadata_cache: Arc::clone(&self.metadata_cache),
217            metadata_size_hint,
218        }))
219    }
220}
221
222/// Implements [`AsyncFileReader`] for a Parquet file in object storage. Reads the file metadata
223/// from the [`FileMetadataCache`], if available, otherwise reads it directly from the file and then
224/// updates the cache.
225pub struct CachedParquetFileReader {
226    pub file_metrics: ParquetFileMetrics,
227    store: Arc<dyn ObjectStore>,
228    pub inner: ParquetObjectReader,
229    partitioned_file: PartitionedFile,
230    metadata_cache: Arc<dyn FileMetadataCache>,
231    metadata_size_hint: Option<usize>,
232}
233
234impl AsyncFileReader for CachedParquetFileReader {
235    fn get_bytes(
236        &mut self,
237        range: Range<u64>,
238    ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
239        let bytes_scanned = range.end - range.start;
240        self.file_metrics.bytes_scanned.add(bytes_scanned as usize);
241        self.inner.get_bytes(range)
242    }
243
244    fn get_byte_ranges(
245        &mut self,
246        ranges: Vec<Range<u64>>,
247    ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>>
248    where
249        Self: Send,
250    {
251        let total: u64 = ranges.iter().map(|r| r.end - r.start).sum();
252        self.file_metrics.bytes_scanned.add(total as usize);
253        self.inner.get_byte_ranges(ranges)
254    }
255
256    fn get_metadata<'a>(
257        &'a mut self,
258        #[allow(unused_variables)] options: Option<&'a ArrowReaderOptions>,
259    ) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
260        let object_meta = self.partitioned_file.object_meta.clone();
261        let metadata_cache = Arc::clone(&self.metadata_cache);
262
263        async move {
264            #[cfg(feature = "parquet_encryption")]
265            let file_decryption_properties = options
266                .and_then(|o| o.file_decryption_properties())
267                .map(Arc::clone);
268
269            #[cfg(not(feature = "parquet_encryption"))]
270            let file_decryption_properties = None;
271
272            DFParquetMetadata::new(&self.store, &object_meta)
273                .with_decryption_properties(file_decryption_properties)
274                .with_file_metadata_cache(Some(Arc::clone(&metadata_cache)))
275                .with_metadata_size_hint(self.metadata_size_hint)
276                .fetch_metadata()
277                .await
278                .map_err(|e| {
279                    parquet::errors::ParquetError::General(format!(
280                        "Failed to fetch metadata for file {}: {e}",
281                        object_meta.location,
282                    ))
283                })
284        }
285        .boxed()
286    }
287}
288
289/// Wrapper to implement [`FileMetadata`] for [`ParquetMetaData`].
290pub struct CachedParquetMetaData(Arc<ParquetMetaData>);
291
292impl CachedParquetMetaData {
293    pub fn new(metadata: Arc<ParquetMetaData>) -> Self {
294        Self(metadata)
295    }
296
297    pub fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
298        &self.0
299    }
300}
301
302impl FileMetadata for CachedParquetMetaData {
303    fn as_any(&self) -> &dyn Any {
304        self
305    }
306
307    fn memory_size(&self) -> usize {
308        self.0.memory_size()
309    }
310
311    fn extra_info(&self) -> HashMap<String, String> {
312        let page_index =
313            self.0.column_index().is_some() && self.0.offset_index().is_some();
314        HashMap::from([("page_index".to_owned(), page_index.to_string())])
315    }
316}