datafusion_datasource_parquet/
reader.rs1use crate::metadata::DFParquetMetadata;
22use crate::ParquetFileMetrics;
23use bytes::Bytes;
24use datafusion_datasource::PartitionedFile;
25use datafusion_execution::cache::cache_manager::FileMetadata;
26use datafusion_execution::cache::cache_manager::FileMetadataCache;
27use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
28use futures::future::BoxFuture;
29use futures::FutureExt;
30use object_store::ObjectStore;
31use parquet::arrow::arrow_reader::ArrowReaderOptions;
32use parquet::arrow::async_reader::{AsyncFileReader, ParquetObjectReader};
33use parquet::file::metadata::ParquetMetaData;
34use std::any::Any;
35use std::collections::HashMap;
36use std::fmt::Debug;
37use std::ops::Range;
38use std::sync::Arc;
39
40pub trait ParquetFileReaderFactory: Debug + Send + Sync + 'static {
48 fn create_reader(
63 &self,
64 partition_index: usize,
65 partitioned_file: PartitionedFile,
66 metadata_size_hint: Option<usize>,
67 metrics: &ExecutionPlanMetricsSet,
68 ) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>>;
69}
70
71#[derive(Debug)]
78pub struct DefaultParquetFileReaderFactory {
79 store: Arc<dyn ObjectStore>,
80}
81
82impl DefaultParquetFileReaderFactory {
83 pub fn new(store: Arc<dyn ObjectStore>) -> Self {
85 Self { store }
86 }
87}
88
89pub struct ParquetFileReader {
98 pub file_metrics: ParquetFileMetrics,
99 pub inner: ParquetObjectReader,
100}
101
102impl AsyncFileReader for ParquetFileReader {
103 fn get_bytes(
104 &mut self,
105 range: Range<u64>,
106 ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
107 let bytes_scanned = range.end - range.start;
108 self.file_metrics.bytes_scanned.add(bytes_scanned as usize);
109 self.inner.get_bytes(range)
110 }
111
112 fn get_byte_ranges(
113 &mut self,
114 ranges: Vec<Range<u64>>,
115 ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>>
116 where
117 Self: Send,
118 {
119 let total: u64 = ranges.iter().map(|r| r.end - r.start).sum();
120 self.file_metrics.bytes_scanned.add(total as usize);
121 self.inner.get_byte_ranges(ranges)
122 }
123
124 fn get_metadata<'a>(
125 &'a mut self,
126 options: Option<&'a ArrowReaderOptions>,
127 ) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
128 self.inner.get_metadata(options)
129 }
130}
131
132impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
133 fn create_reader(
134 &self,
135 partition_index: usize,
136 partitioned_file: PartitionedFile,
137 metadata_size_hint: Option<usize>,
138 metrics: &ExecutionPlanMetricsSet,
139 ) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>> {
140 let file_metrics = ParquetFileMetrics::new(
141 partition_index,
142 partitioned_file.object_meta.location.as_ref(),
143 metrics,
144 );
145 let store = Arc::clone(&self.store);
146 let mut inner = ParquetObjectReader::new(
147 store,
148 partitioned_file.object_meta.location.clone(),
149 )
150 .with_file_size(partitioned_file.object_meta.size);
151
152 if let Some(hint) = metadata_size_hint {
153 inner = inner.with_footer_size_hint(hint)
154 };
155
156 Ok(Box::new(ParquetFileReader {
157 inner,
158 file_metrics,
159 }))
160 }
161}
162
163#[derive(Debug)]
169pub struct CachedParquetFileReaderFactory {
170 store: Arc<dyn ObjectStore>,
171 metadata_cache: Arc<dyn FileMetadataCache>,
172}
173
174impl CachedParquetFileReaderFactory {
175 pub fn new(
176 store: Arc<dyn ObjectStore>,
177 metadata_cache: Arc<dyn FileMetadataCache>,
178 ) -> Self {
179 Self {
180 store,
181 metadata_cache,
182 }
183 }
184}
185
186impl ParquetFileReaderFactory for CachedParquetFileReaderFactory {
187 fn create_reader(
188 &self,
189 partition_index: usize,
190 partitioned_file: PartitionedFile,
191 metadata_size_hint: Option<usize>,
192 metrics: &ExecutionPlanMetricsSet,
193 ) -> datafusion_common::Result<Box<dyn AsyncFileReader + Send>> {
194 let file_metrics = ParquetFileMetrics::new(
195 partition_index,
196 partitioned_file.object_meta.location.as_ref(),
197 metrics,
198 );
199 let store = Arc::clone(&self.store);
200
201 let mut inner = ParquetObjectReader::new(
202 store,
203 partitioned_file.object_meta.location.clone(),
204 )
205 .with_file_size(partitioned_file.object_meta.size);
206
207 if let Some(hint) = metadata_size_hint {
208 inner = inner.with_footer_size_hint(hint)
209 };
210
211 Ok(Box::new(CachedParquetFileReader {
212 store: Arc::clone(&self.store),
213 inner,
214 file_metrics,
215 partitioned_file,
216 metadata_cache: Arc::clone(&self.metadata_cache),
217 metadata_size_hint,
218 }))
219 }
220}
221
222pub struct CachedParquetFileReader {
226 pub file_metrics: ParquetFileMetrics,
227 store: Arc<dyn ObjectStore>,
228 pub inner: ParquetObjectReader,
229 partitioned_file: PartitionedFile,
230 metadata_cache: Arc<dyn FileMetadataCache>,
231 metadata_size_hint: Option<usize>,
232}
233
234impl AsyncFileReader for CachedParquetFileReader {
235 fn get_bytes(
236 &mut self,
237 range: Range<u64>,
238 ) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
239 let bytes_scanned = range.end - range.start;
240 self.file_metrics.bytes_scanned.add(bytes_scanned as usize);
241 self.inner.get_bytes(range)
242 }
243
244 fn get_byte_ranges(
245 &mut self,
246 ranges: Vec<Range<u64>>,
247 ) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>>
248 where
249 Self: Send,
250 {
251 let total: u64 = ranges.iter().map(|r| r.end - r.start).sum();
252 self.file_metrics.bytes_scanned.add(total as usize);
253 self.inner.get_byte_ranges(ranges)
254 }
255
256 fn get_metadata<'a>(
257 &'a mut self,
258 #[allow(unused_variables)] options: Option<&'a ArrowReaderOptions>,
259 ) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
260 let object_meta = self.partitioned_file.object_meta.clone();
261 let metadata_cache = Arc::clone(&self.metadata_cache);
262
263 async move {
264 #[cfg(feature = "parquet_encryption")]
265 let file_decryption_properties = options
266 .and_then(|o| o.file_decryption_properties())
267 .map(Arc::clone);
268
269 #[cfg(not(feature = "parquet_encryption"))]
270 let file_decryption_properties = None;
271
272 DFParquetMetadata::new(&self.store, &object_meta)
273 .with_decryption_properties(file_decryption_properties)
274 .with_file_metadata_cache(Some(Arc::clone(&metadata_cache)))
275 .with_metadata_size_hint(self.metadata_size_hint)
276 .fetch_metadata()
277 .await
278 .map_err(|e| {
279 parquet::errors::ParquetError::General(format!(
280 "Failed to fetch metadata for file {}: {e}",
281 object_meta.location,
282 ))
283 })
284 }
285 .boxed()
286 }
287}
288
289pub struct CachedParquetMetaData(Arc<ParquetMetaData>);
291
292impl CachedParquetMetaData {
293 pub fn new(metadata: Arc<ParquetMetaData>) -> Self {
294 Self(metadata)
295 }
296
297 pub fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
298 &self.0
299 }
300}
301
302impl FileMetadata for CachedParquetMetaData {
303 fn as_any(&self) -> &dyn Any {
304 self
305 }
306
307 fn memory_size(&self) -> usize {
308 self.0.memory_size()
309 }
310
311 fn extra_info(&self) -> HashMap<String, String> {
312 let page_index =
313 self.0.column_index().is_some() && self.0.offset_index().is_some();
314 HashMap::from([("page_index".to_owned(), page_index.to_string())])
315 }
316}