datafusion_datasource_parquet/
file_format.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ParquetFormat`]: Parquet [`FileFormat`] abstractions
19
20use std::any::Any;
21use std::cell::RefCell;
22use std::fmt::Debug;
23use std::ops::Range;
24use std::rc::Rc;
25use std::sync::Arc;
26use std::{fmt, vec};
27
28use arrow::array::RecordBatch;
29use arrow::datatypes::{Fields, Schema, SchemaRef, TimeUnit};
30use datafusion_datasource::file_compression_type::FileCompressionType;
31use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
32use datafusion_datasource::write::{
33    get_writer_schema, ObjectWriterBuilder, SharedBuffer,
34};
35
36use datafusion_datasource::file_format::{FileFormat, FileFormatFactory};
37use datafusion_datasource::write::demux::DemuxedStreamReceiver;
38
39use arrow::datatypes::{DataType, Field, FieldRef};
40use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions};
41use datafusion_common::encryption::FileDecryptionProperties;
42use datafusion_common::parsers::CompressionTypeVariant;
43use datafusion_common::{
44    internal_datafusion_err, internal_err, not_impl_err, DataFusionError, GetExt,
45    HashSet, Result, DEFAULT_PARQUET_EXTENSION,
46};
47use datafusion_common::{HashMap, Statistics};
48use datafusion_common_runtime::{JoinSet, SpawnedTask};
49use datafusion_datasource::display::FileGroupDisplay;
50use datafusion_datasource::file::FileSource;
51use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
52use datafusion_datasource::sink::{DataSink, DataSinkExec};
53use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
54use datafusion_execution::{SendableRecordBatchStream, TaskContext};
55use datafusion_expr::dml::InsertOp;
56use datafusion_physical_expr_common::sort_expr::LexRequirement;
57use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
58use datafusion_session::Session;
59
60use crate::metadata::DFParquetMetadata;
61use crate::reader::CachedParquetFileReaderFactory;
62use crate::source::{parse_coerce_int96_string, ParquetSource};
63use async_trait::async_trait;
64use bytes::Bytes;
65use datafusion_datasource::source::DataSourceExec;
66use datafusion_execution::cache::cache_manager::FileMetadataCache;
67use datafusion_execution::runtime_env::RuntimeEnv;
68use futures::future::BoxFuture;
69use futures::{FutureExt, StreamExt, TryStreamExt};
70use object_store::buffered::BufWriter;
71use object_store::path::Path;
72use object_store::{ObjectMeta, ObjectStore};
73use parquet::arrow::arrow_writer::{
74    compute_leaves, ArrowColumnChunk, ArrowColumnWriter, ArrowLeafColumn,
75    ArrowRowGroupWriterFactory, ArrowWriterOptions,
76};
77use parquet::arrow::async_reader::MetadataFetch;
78use parquet::arrow::{ArrowWriter, AsyncArrowWriter};
79use parquet::basic::Type;
80#[cfg(feature = "parquet_encryption")]
81use parquet::encryption::encrypt::FileEncryptionProperties;
82use parquet::errors::ParquetError;
83use parquet::file::metadata::ParquetMetaData;
84use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
85use parquet::file::writer::SerializedFileWriter;
86use parquet::schema::types::SchemaDescriptor;
87use tokio::io::{AsyncWrite, AsyncWriteExt};
88use tokio::sync::mpsc::{self, Receiver, Sender};
89
90/// Initial writing buffer size. Note this is just a size hint for efficiency. It
91/// will grow beyond the set value if needed.
92const INITIAL_BUFFER_BYTES: usize = 1048576;
93
94/// When writing parquet files in parallel, if the buffered Parquet data exceeds
95/// this size, it is flushed to object store
96const BUFFER_FLUSH_BYTES: usize = 1024000;
97
98#[derive(Default)]
99/// Factory struct used to create [ParquetFormat]
100pub struct ParquetFormatFactory {
101    /// inner options for parquet
102    pub options: Option<TableParquetOptions>,
103}
104
105impl ParquetFormatFactory {
106    /// Creates an instance of [ParquetFormatFactory]
107    pub fn new() -> Self {
108        Self { options: None }
109    }
110
111    /// Creates an instance of [ParquetFormatFactory] with customized default options
112    pub fn new_with_options(options: TableParquetOptions) -> Self {
113        Self {
114            options: Some(options),
115        }
116    }
117}
118
119impl FileFormatFactory for ParquetFormatFactory {
120    fn create(
121        &self,
122        state: &dyn Session,
123        format_options: &std::collections::HashMap<String, String>,
124    ) -> Result<Arc<dyn FileFormat>> {
125        let parquet_options = match &self.options {
126            None => {
127                let mut table_options = state.default_table_options();
128                table_options.set_config_format(ConfigFileType::PARQUET);
129                table_options.alter_with_string_hash_map(format_options)?;
130                table_options.parquet
131            }
132            Some(parquet_options) => {
133                let mut parquet_options = parquet_options.clone();
134                for (k, v) in format_options {
135                    parquet_options.set(k, v)?;
136                }
137                parquet_options
138            }
139        };
140
141        Ok(Arc::new(
142            ParquetFormat::default().with_options(parquet_options),
143        ))
144    }
145
146    fn default(&self) -> Arc<dyn FileFormat> {
147        Arc::new(ParquetFormat::default())
148    }
149
150    fn as_any(&self) -> &dyn Any {
151        self
152    }
153}
154
155impl GetExt for ParquetFormatFactory {
156    fn get_ext(&self) -> String {
157        // Removes the dot, i.e. ".parquet" -> "parquet"
158        DEFAULT_PARQUET_EXTENSION[1..].to_string()
159    }
160}
161
162impl Debug for ParquetFormatFactory {
163    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
164        f.debug_struct("ParquetFormatFactory")
165            .field("ParquetFormatFactory", &self.options)
166            .finish()
167    }
168}
169/// The Apache Parquet `FileFormat` implementation
170#[derive(Debug, Default)]
171pub struct ParquetFormat {
172    options: TableParquetOptions,
173}
174
175impl ParquetFormat {
176    /// Construct a new Format with no local overrides
177    pub fn new() -> Self {
178        Self::default()
179    }
180
181    /// Activate statistics based row group level pruning
182    /// - If `None`, defaults to value on `config_options`
183    pub fn with_enable_pruning(mut self, enable: bool) -> Self {
184        self.options.global.pruning = enable;
185        self
186    }
187
188    /// Return `true` if pruning is enabled
189    pub fn enable_pruning(&self) -> bool {
190        self.options.global.pruning
191    }
192
193    /// Provide a hint to the size of the file metadata. If a hint is provided
194    /// the reader will try and fetch the last `size_hint` bytes of the parquet file optimistically.
195    /// Without a hint, two read are required. One read to fetch the 8-byte parquet footer and then
196    /// another read to fetch the metadata length encoded in the footer.
197    ///
198    /// - If `None`, defaults to value on `config_options`
199    pub fn with_metadata_size_hint(mut self, size_hint: Option<usize>) -> Self {
200        self.options.global.metadata_size_hint = size_hint;
201        self
202    }
203
204    /// Return the metadata size hint if set
205    pub fn metadata_size_hint(&self) -> Option<usize> {
206        self.options.global.metadata_size_hint
207    }
208
209    /// Tell the parquet reader to skip any metadata that may be in
210    /// the file Schema. This can help avoid schema conflicts due to
211    /// metadata.
212    ///
213    /// - If `None`, defaults to value on `config_options`
214    pub fn with_skip_metadata(mut self, skip_metadata: bool) -> Self {
215        self.options.global.skip_metadata = skip_metadata;
216        self
217    }
218
219    /// Returns `true` if schema metadata will be cleared prior to
220    /// schema merging.
221    pub fn skip_metadata(&self) -> bool {
222        self.options.global.skip_metadata
223    }
224
225    /// Set Parquet options for the ParquetFormat
226    pub fn with_options(mut self, options: TableParquetOptions) -> Self {
227        self.options = options;
228        self
229    }
230
231    /// Parquet options
232    pub fn options(&self) -> &TableParquetOptions {
233        &self.options
234    }
235
236    /// Return `true` if should use view types.
237    ///
238    /// If this returns true, DataFusion will instruct the parquet reader
239    /// to read string / binary columns using view `StringView` or `BinaryView`
240    /// if the table schema specifies those types, regardless of any embedded metadata
241    /// that may specify an alternate Arrow type. The parquet reader is optimized
242    /// for reading `StringView` and `BinaryView` and such queries are significantly faster.
243    ///
244    /// If this returns false, the parquet reader will read the columns according to the
245    /// defaults or any embedded Arrow type information. This may result in reading
246    /// `StringArrays` and then casting to `StringViewArray` which is less efficient.
247    pub fn force_view_types(&self) -> bool {
248        self.options.global.schema_force_view_types
249    }
250
251    /// If true, will use view types. See [`Self::force_view_types`] for details
252    pub fn with_force_view_types(mut self, use_views: bool) -> Self {
253        self.options.global.schema_force_view_types = use_views;
254        self
255    }
256
257    /// Return `true` if binary types will be read as strings.
258    ///
259    /// If this returns true, DataFusion will instruct the parquet reader
260    /// to read binary columns such as `Binary` or `BinaryView` as the
261    /// corresponding string type such as `Utf8` or `LargeUtf8`.
262    /// The parquet reader has special optimizations for `Utf8` and `LargeUtf8`
263    /// validation, and such queries are significantly faster than reading
264    /// binary columns and then casting to string columns.
265    pub fn binary_as_string(&self) -> bool {
266        self.options.global.binary_as_string
267    }
268
269    /// If true, will read binary types as strings. See [`Self::binary_as_string`] for details
270    pub fn with_binary_as_string(mut self, binary_as_string: bool) -> Self {
271        self.options.global.binary_as_string = binary_as_string;
272        self
273    }
274
275    pub fn coerce_int96(&self) -> Option<String> {
276        self.options.global.coerce_int96.clone()
277    }
278
279    pub fn with_coerce_int96(mut self, time_unit: Option<String>) -> Self {
280        self.options.global.coerce_int96 = time_unit;
281        self
282    }
283}
284
285/// Clears all metadata (Schema level and field level) on an iterator
286/// of Schemas
287fn clear_metadata(
288    schemas: impl IntoIterator<Item = Schema>,
289) -> impl Iterator<Item = Schema> {
290    schemas.into_iter().map(|schema| {
291        let fields = schema
292            .fields()
293            .iter()
294            .map(|field| {
295                field.as_ref().clone().with_metadata(Default::default()) // clear meta
296            })
297            .collect::<Fields>();
298        Schema::new(fields)
299    })
300}
301
302#[cfg(feature = "parquet_encryption")]
303async fn get_file_decryption_properties(
304    state: &dyn Session,
305    options: &TableParquetOptions,
306    file_path: &Path,
307) -> Result<Option<Arc<FileDecryptionProperties>>> {
308    Ok(match &options.crypto.file_decryption {
309        Some(cfd) => Some(Arc::new(FileDecryptionProperties::from(cfd.clone()))),
310        None => match &options.crypto.factory_id {
311            Some(factory_id) => {
312                let factory =
313                    state.runtime_env().parquet_encryption_factory(factory_id)?;
314                factory
315                    .get_file_decryption_properties(
316                        &options.crypto.factory_options,
317                        file_path,
318                    )
319                    .await?
320            }
321            None => None,
322        },
323    })
324}
325
326#[cfg(not(feature = "parquet_encryption"))]
327async fn get_file_decryption_properties(
328    _state: &dyn Session,
329    _options: &TableParquetOptions,
330    _file_path: &Path,
331) -> Result<Option<Arc<FileDecryptionProperties>>> {
332    Ok(None)
333}
334
335#[async_trait]
336impl FileFormat for ParquetFormat {
337    fn as_any(&self) -> &dyn Any {
338        self
339    }
340
341    fn get_ext(&self) -> String {
342        ParquetFormatFactory::new().get_ext()
343    }
344
345    fn get_ext_with_compression(
346        &self,
347        file_compression_type: &FileCompressionType,
348    ) -> Result<String> {
349        let ext = self.get_ext();
350        match file_compression_type.get_variant() {
351            CompressionTypeVariant::UNCOMPRESSED => Ok(ext),
352            _ => internal_err!("Parquet FileFormat does not support compression."),
353        }
354    }
355
356    fn compression_type(&self) -> Option<FileCompressionType> {
357        None
358    }
359
360    async fn infer_schema(
361        &self,
362        state: &dyn Session,
363        store: &Arc<dyn ObjectStore>,
364        objects: &[ObjectMeta],
365    ) -> Result<SchemaRef> {
366        let coerce_int96 = match self.coerce_int96() {
367            Some(time_unit) => Some(parse_coerce_int96_string(time_unit.as_str())?),
368            None => None,
369        };
370
371        let file_metadata_cache =
372            state.runtime_env().cache_manager.get_file_metadata_cache();
373
374        let mut schemas: Vec<_> = futures::stream::iter(objects)
375            .map(|object| async {
376                let file_decryption_properties = get_file_decryption_properties(
377                    state,
378                    &self.options,
379                    &object.location,
380                )
381                .await?;
382                let result = DFParquetMetadata::new(store.as_ref(), object)
383                    .with_metadata_size_hint(self.metadata_size_hint())
384                    .with_decryption_properties(file_decryption_properties)
385                    .with_file_metadata_cache(Some(Arc::clone(&file_metadata_cache)))
386                    .with_coerce_int96(coerce_int96)
387                    .fetch_schema_with_location()
388                    .await?;
389                Ok::<_, DataFusionError>(result)
390            })
391            .boxed() // Workaround https://github.com/rust-lang/rust/issues/64552
392            // fetch schemas concurrently, if requested
393            .buffered(state.config_options().execution.meta_fetch_concurrency)
394            .try_collect()
395            .await?;
396
397        // Schema inference adds fields based the order they are seen
398        // which depends on the order the files are processed. For some
399        // object stores (like local file systems) the order returned from list
400        // is not deterministic. Thus, to ensure deterministic schema inference
401        // sort the files first.
402        // https://github.com/apache/datafusion/pull/6629
403        schemas.sort_by(|(location1, _), (location2, _)| location1.cmp(location2));
404
405        let schemas = schemas
406            .into_iter()
407            .map(|(_, schema)| schema)
408            .collect::<Vec<_>>();
409
410        let schema = if self.skip_metadata() {
411            Schema::try_merge(clear_metadata(schemas))
412        } else {
413            Schema::try_merge(schemas)
414        }?;
415
416        let schema = if self.binary_as_string() {
417            transform_binary_to_string(&schema)
418        } else {
419            schema
420        };
421
422        let schema = if self.force_view_types() {
423            transform_schema_to_view(&schema)
424        } else {
425            schema
426        };
427
428        Ok(Arc::new(schema))
429    }
430
431    async fn infer_stats(
432        &self,
433        state: &dyn Session,
434        store: &Arc<dyn ObjectStore>,
435        table_schema: SchemaRef,
436        object: &ObjectMeta,
437    ) -> Result<Statistics> {
438        let file_decryption_properties =
439            get_file_decryption_properties(state, &self.options, &object.location)
440                .await?;
441        let file_metadata_cache =
442            state.runtime_env().cache_manager.get_file_metadata_cache();
443        DFParquetMetadata::new(store, object)
444            .with_metadata_size_hint(self.metadata_size_hint())
445            .with_decryption_properties(file_decryption_properties)
446            .with_file_metadata_cache(Some(file_metadata_cache))
447            .fetch_statistics(&table_schema)
448            .await
449    }
450
451    async fn create_physical_plan(
452        &self,
453        state: &dyn Session,
454        conf: FileScanConfig,
455    ) -> Result<Arc<dyn ExecutionPlan>> {
456        let mut metadata_size_hint = None;
457
458        if let Some(metadata) = self.metadata_size_hint() {
459            metadata_size_hint = Some(metadata);
460        }
461
462        let mut source = ParquetSource::new(self.options.clone());
463
464        // Use the CachedParquetFileReaderFactory
465        let metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();
466        let store = state
467            .runtime_env()
468            .object_store(conf.object_store_url.clone())?;
469        let cached_parquet_read_factory =
470            Arc::new(CachedParquetFileReaderFactory::new(store, metadata_cache));
471        source = source.with_parquet_file_reader_factory(cached_parquet_read_factory);
472
473        if let Some(metadata_size_hint) = metadata_size_hint {
474            source = source.with_metadata_size_hint(metadata_size_hint)
475        }
476
477        source = self.set_source_encryption_factory(source, state)?;
478
479        // Apply schema adapter factory before building the new config
480        let file_source = source.apply_schema_adapter(&conf)?;
481
482        let conf = FileScanConfigBuilder::from(conf)
483            .with_source(file_source)
484            .build();
485        Ok(DataSourceExec::from_data_source(conf))
486    }
487
488    async fn create_writer_physical_plan(
489        &self,
490        input: Arc<dyn ExecutionPlan>,
491        _state: &dyn Session,
492        conf: FileSinkConfig,
493        order_requirements: Option<LexRequirement>,
494    ) -> Result<Arc<dyn ExecutionPlan>> {
495        if conf.insert_op != InsertOp::Append {
496            return not_impl_err!("Overwrites are not implemented yet for Parquet");
497        }
498
499        let sink = Arc::new(ParquetSink::new(conf, self.options.clone()));
500
501        Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
502    }
503
504    fn file_source(&self) -> Arc<dyn FileSource> {
505        Arc::new(ParquetSource::default())
506    }
507}
508
509#[cfg(feature = "parquet_encryption")]
510impl ParquetFormat {
511    fn set_source_encryption_factory(
512        &self,
513        source: ParquetSource,
514        state: &dyn Session,
515    ) -> Result<ParquetSource> {
516        if let Some(encryption_factory_id) = &self.options.crypto.factory_id {
517            Ok(source.with_encryption_factory(
518                state
519                    .runtime_env()
520                    .parquet_encryption_factory(encryption_factory_id)?,
521            ))
522        } else {
523            Ok(source)
524        }
525    }
526}
527
528#[cfg(not(feature = "parquet_encryption"))]
529impl ParquetFormat {
530    fn set_source_encryption_factory(
531        &self,
532        source: ParquetSource,
533        _state: &dyn Session,
534    ) -> Result<ParquetSource> {
535        if let Some(encryption_factory_id) = &self.options.crypto.factory_id {
536            Err(DataFusionError::Configuration(
537                format!("Parquet encryption factory id is set to '{encryption_factory_id}' but the parquet_encryption feature is disabled")))
538        } else {
539            Ok(source)
540        }
541    }
542}
543
544/// Apply necessary schema type coercions to make file schema match table schema.
545///
546/// This function performs two main types of transformations in a single pass:
547/// 1. Binary types to string types conversion - Converts binary data types to their
548///    corresponding string types when the table schema expects string data
549/// 2. Regular to view types conversion - Converts standard string/binary types to
550///    view types when the table schema uses view types
551///
552/// # Arguments
553/// * `table_schema` - The table schema containing the desired types
554/// * `file_schema` - The file schema to be transformed
555///
556/// # Returns
557/// * `Some(Schema)` - If any transformations were applied, returns the transformed schema
558/// * `None` - If no transformations were needed
559pub fn apply_file_schema_type_coercions(
560    table_schema: &Schema,
561    file_schema: &Schema,
562) -> Option<Schema> {
563    let mut needs_view_transform = false;
564    let mut needs_string_transform = false;
565
566    // Create a mapping of table field names to their data types for fast lookup
567    // and simultaneously check if we need any transformations
568    let table_fields: HashMap<_, _> = table_schema
569        .fields()
570        .iter()
571        .map(|f| {
572            let dt = f.data_type();
573            // Check if we need view type transformation
574            if matches!(dt, &DataType::Utf8View | &DataType::BinaryView) {
575                needs_view_transform = true;
576            }
577            // Check if we need string type transformation
578            if matches!(
579                dt,
580                &DataType::Utf8 | &DataType::LargeUtf8 | &DataType::Utf8View
581            ) {
582                needs_string_transform = true;
583            }
584
585            (f.name(), dt)
586        })
587        .collect();
588
589    // Early return if no transformation needed
590    if !needs_view_transform && !needs_string_transform {
591        return None;
592    }
593
594    let transformed_fields: Vec<Arc<Field>> = file_schema
595        .fields()
596        .iter()
597        .map(|field| {
598            let field_name = field.name();
599            let field_type = field.data_type();
600
601            // Look up the corresponding field type in the table schema
602            if let Some(table_type) = table_fields.get(field_name) {
603                match (table_type, field_type) {
604                    // table schema uses string type, coerce the file schema to use string type
605                    (
606                        &DataType::Utf8,
607                        DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
608                    ) => {
609                        return field_with_new_type(field, DataType::Utf8);
610                    }
611                    // table schema uses large string type, coerce the file schema to use large string type
612                    (
613                        &DataType::LargeUtf8,
614                        DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
615                    ) => {
616                        return field_with_new_type(field, DataType::LargeUtf8);
617                    }
618                    // table schema uses string view type, coerce the file schema to use view type
619                    (
620                        &DataType::Utf8View,
621                        DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
622                    ) => {
623                        return field_with_new_type(field, DataType::Utf8View);
624                    }
625                    // Handle view type conversions
626                    (&DataType::Utf8View, DataType::Utf8 | DataType::LargeUtf8) => {
627                        return field_with_new_type(field, DataType::Utf8View);
628                    }
629                    (&DataType::BinaryView, DataType::Binary | DataType::LargeBinary) => {
630                        return field_with_new_type(field, DataType::BinaryView);
631                    }
632                    _ => {}
633                }
634            }
635
636            // If no transformation is needed, keep the original field
637            Arc::clone(field)
638        })
639        .collect();
640
641    Some(Schema::new_with_metadata(
642        transformed_fields,
643        file_schema.metadata.clone(),
644    ))
645}
646
647/// Coerces the file schema's Timestamps to the provided TimeUnit if Parquet schema contains INT96.
648pub fn coerce_int96_to_resolution(
649    parquet_schema: &SchemaDescriptor,
650    file_schema: &Schema,
651    time_unit: &TimeUnit,
652) -> Option<Schema> {
653    // Traverse the parquet_schema columns looking for int96 physical types. If encountered, insert
654    // the field's full path into a set.
655    let int96_fields: HashSet<_> = parquet_schema
656        .columns()
657        .iter()
658        .filter(|f| f.physical_type() == Type::INT96)
659        .map(|f| f.path().string())
660        .collect();
661
662    if int96_fields.is_empty() {
663        // The schema doesn't contain any int96 fields, so skip the remaining logic.
664        return None;
665    }
666
667    // Do a DFS into the schema using a stack, looking for timestamp(nanos) fields that originated
668    // as int96 to coerce to the provided time_unit.
669
670    type NestedFields = Rc<RefCell<Vec<FieldRef>>>;
671    type StackContext<'a> = (
672        Vec<&'a str>, // The Parquet column path (e.g., "c0.list.element.c1") for the current field.
673        &'a FieldRef, // The current field to be processed.
674        NestedFields, // The parent's fields that this field will be (possibly) type-coerced and
675        // inserted into. All fields have a parent, so this is not an Option type.
676        Option<NestedFields>, // Nested types need to create their own vector of fields for their
677                              // children. For primitive types this will remain None. For nested
678                              // types it is None the first time they are processed. Then, we
679                              // instantiate a vector for its children, push the field back onto the
680                              // stack to be processed again, and DFS into its children. The next
681                              // time we process the field, we know we have DFS'd into the children
682                              // because this field is Some.
683    );
684
685    // This is our top-level fields from which we will construct our schema. We pass this into our
686    // initial stack context as the parent fields, and the DFS populates it.
687    let fields = Rc::new(RefCell::new(Vec::with_capacity(file_schema.fields.len())));
688
689    // TODO: It might be possible to only DFS into nested fields that we know contain an int96 if we
690    // use some sort of LPM data structure to check if we're currently DFS'ing nested types that are
691    // in a column path that contains an int96. That can be a future optimization for large schemas.
692    let transformed_schema = {
693        // Populate the stack with our top-level fields.
694        let mut stack: Vec<StackContext> = file_schema
695            .fields()
696            .iter()
697            .rev()
698            .map(|f| (vec![f.name().as_str()], f, Rc::clone(&fields), None))
699            .collect();
700
701        // Pop fields to DFS into until we have exhausted the stack.
702        while let Some((parquet_path, current_field, parent_fields, child_fields)) =
703            stack.pop()
704        {
705            match (current_field.data_type(), child_fields) {
706                (DataType::Struct(unprocessed_children), None) => {
707                    // This is the first time popping off this struct. We don't yet know the
708                    // correct types of its children (i.e., if they need coercing) so we create
709                    // a vector for child_fields, push the struct node back onto the stack to be
710                    // processed again (see below) after processing all its children.
711                    let child_fields = Rc::new(RefCell::new(Vec::with_capacity(
712                        unprocessed_children.len(),
713                    )));
714                    // Note that here we push the struct back onto the stack with its
715                    // parent_fields in the same position, now with Some(child_fields).
716                    stack.push((
717                        parquet_path.clone(),
718                        current_field,
719                        parent_fields,
720                        Some(Rc::clone(&child_fields)),
721                    ));
722                    // Push all the children in reverse to maintain original schema order due to
723                    // stack processing.
724                    for child in unprocessed_children.into_iter().rev() {
725                        let mut child_path = parquet_path.clone();
726                        // Build up a normalized path that we'll use as a key into the original
727                        // int96_fields set above to test if this originated as int96.
728                        child_path.push(".");
729                        child_path.push(child.name());
730                        // Note that here we push the field onto the stack using the struct's
731                        // new child_fields vector as the field's parent_fields.
732                        stack.push((child_path, child, Rc::clone(&child_fields), None));
733                    }
734                }
735                (DataType::Struct(unprocessed_children), Some(processed_children)) => {
736                    // This is the second time popping off this struct. The child_fields vector
737                    // now contains each field that has been DFS'd into, and we can construct
738                    // the resulting struct with correct child types.
739                    let processed_children = processed_children.borrow();
740                    assert_eq!(processed_children.len(), unprocessed_children.len());
741                    let processed_struct = Field::new_struct(
742                        current_field.name(),
743                        processed_children.as_slice(),
744                        current_field.is_nullable(),
745                    );
746                    parent_fields.borrow_mut().push(Arc::new(processed_struct));
747                }
748                (DataType::List(unprocessed_child), None) => {
749                    // This is the first time popping off this list. See struct docs above.
750                    let child_fields = Rc::new(RefCell::new(Vec::with_capacity(1)));
751                    stack.push((
752                        parquet_path.clone(),
753                        current_field,
754                        parent_fields,
755                        Some(Rc::clone(&child_fields)),
756                    ));
757                    let mut child_path = parquet_path.clone();
758                    // Spark uses a definition for arrays/lists that results in a group
759                    // named "list" that is not maintained when parsing to Arrow. We just push
760                    // this name into the path.
761                    child_path.push(".list.");
762                    child_path.push(unprocessed_child.name());
763                    stack.push((
764                        child_path.clone(),
765                        unprocessed_child,
766                        Rc::clone(&child_fields),
767                        None,
768                    ));
769                }
770                (DataType::List(_), Some(processed_children)) => {
771                    // This is the second time popping off this list. See struct docs above.
772                    let processed_children = processed_children.borrow();
773                    assert_eq!(processed_children.len(), 1);
774                    let processed_list = Field::new_list(
775                        current_field.name(),
776                        Arc::clone(&processed_children[0]),
777                        current_field.is_nullable(),
778                    );
779                    parent_fields.borrow_mut().push(Arc::new(processed_list));
780                }
781                (DataType::Map(unprocessed_child, _), None) => {
782                    // This is the first time popping off this map. See struct docs above.
783                    let child_fields = Rc::new(RefCell::new(Vec::with_capacity(1)));
784                    stack.push((
785                        parquet_path.clone(),
786                        current_field,
787                        parent_fields,
788                        Some(Rc::clone(&child_fields)),
789                    ));
790                    let mut child_path = parquet_path.clone();
791                    child_path.push(".");
792                    child_path.push(unprocessed_child.name());
793                    stack.push((
794                        child_path.clone(),
795                        unprocessed_child,
796                        Rc::clone(&child_fields),
797                        None,
798                    ));
799                }
800                (DataType::Map(_, sorted), Some(processed_children)) => {
801                    // This is the second time popping off this map. See struct docs above.
802                    let processed_children = processed_children.borrow();
803                    assert_eq!(processed_children.len(), 1);
804                    let processed_map = Field::new(
805                        current_field.name(),
806                        DataType::Map(Arc::clone(&processed_children[0]), *sorted),
807                        current_field.is_nullable(),
808                    );
809                    parent_fields.borrow_mut().push(Arc::new(processed_map));
810                }
811                (DataType::Timestamp(TimeUnit::Nanosecond, None), None)
812                    if int96_fields.contains(parquet_path.concat().as_str()) =>
813                // We found a timestamp(nanos) and it originated as int96. Coerce it to the correct
814                // time_unit.
815                {
816                    parent_fields.borrow_mut().push(field_with_new_type(
817                        current_field,
818                        DataType::Timestamp(*time_unit, None),
819                    ));
820                }
821                // Other types can be cloned as they are.
822                _ => parent_fields.borrow_mut().push(Arc::clone(current_field)),
823            }
824        }
825        assert_eq!(fields.borrow().len(), file_schema.fields.len());
826        Schema::new_with_metadata(
827            fields.borrow_mut().clone(),
828            file_schema.metadata.clone(),
829        )
830    };
831
832    Some(transformed_schema)
833}
834
835/// Coerces the file schema if the table schema uses a view type.
836#[deprecated(
837    since = "47.0.0",
838    note = "Use `apply_file_schema_type_coercions` instead"
839)]
840pub fn coerce_file_schema_to_view_type(
841    table_schema: &Schema,
842    file_schema: &Schema,
843) -> Option<Schema> {
844    let mut transform = false;
845    let table_fields: HashMap<_, _> = table_schema
846        .fields
847        .iter()
848        .map(|f| {
849            let dt = f.data_type();
850            if dt.equals_datatype(&DataType::Utf8View)
851                || dt.equals_datatype(&DataType::BinaryView)
852            {
853                transform = true;
854            }
855            (f.name(), dt)
856        })
857        .collect();
858
859    if !transform {
860        return None;
861    }
862
863    let transformed_fields: Vec<Arc<Field>> = file_schema
864        .fields
865        .iter()
866        .map(
867            |field| match (table_fields.get(field.name()), field.data_type()) {
868                (Some(DataType::Utf8View), DataType::Utf8 | DataType::LargeUtf8) => {
869                    field_with_new_type(field, DataType::Utf8View)
870                }
871                (
872                    Some(DataType::BinaryView),
873                    DataType::Binary | DataType::LargeBinary,
874                ) => field_with_new_type(field, DataType::BinaryView),
875                _ => Arc::clone(field),
876            },
877        )
878        .collect();
879
880    Some(Schema::new_with_metadata(
881        transformed_fields,
882        file_schema.metadata.clone(),
883    ))
884}
885
886/// If the table schema uses a string type, coerce the file schema to use a string type.
887///
888/// See [ParquetFormat::binary_as_string] for details
889#[deprecated(
890    since = "47.0.0",
891    note = "Use `apply_file_schema_type_coercions` instead"
892)]
893pub fn coerce_file_schema_to_string_type(
894    table_schema: &Schema,
895    file_schema: &Schema,
896) -> Option<Schema> {
897    let mut transform = false;
898    let table_fields: HashMap<_, _> = table_schema
899        .fields
900        .iter()
901        .map(|f| (f.name(), f.data_type()))
902        .collect();
903    let transformed_fields: Vec<Arc<Field>> = file_schema
904        .fields
905        .iter()
906        .map(
907            |field| match (table_fields.get(field.name()), field.data_type()) {
908                // table schema uses string type, coerce the file schema to use string type
909                (
910                    Some(DataType::Utf8),
911                    DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
912                ) => {
913                    transform = true;
914                    field_with_new_type(field, DataType::Utf8)
915                }
916                // table schema uses large string type, coerce the file schema to use large string type
917                (
918                    Some(DataType::LargeUtf8),
919                    DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
920                ) => {
921                    transform = true;
922                    field_with_new_type(field, DataType::LargeUtf8)
923                }
924                // table schema uses string view type, coerce the file schema to use view type
925                (
926                    Some(DataType::Utf8View),
927                    DataType::Binary | DataType::LargeBinary | DataType::BinaryView,
928                ) => {
929                    transform = true;
930                    field_with_new_type(field, DataType::Utf8View)
931                }
932                _ => Arc::clone(field),
933            },
934        )
935        .collect();
936
937    if !transform {
938        None
939    } else {
940        Some(Schema::new_with_metadata(
941            transformed_fields,
942            file_schema.metadata.clone(),
943        ))
944    }
945}
946
947/// Create a new field with the specified data type, copying the other
948/// properties from the input field
949fn field_with_new_type(field: &FieldRef, new_type: DataType) -> FieldRef {
950    Arc::new(field.as_ref().clone().with_data_type(new_type))
951}
952
953/// Transform a schema to use view types for Utf8 and Binary
954///
955/// See [ParquetFormat::force_view_types] for details
956pub fn transform_schema_to_view(schema: &Schema) -> Schema {
957    let transformed_fields: Vec<Arc<Field>> = schema
958        .fields
959        .iter()
960        .map(|field| match field.data_type() {
961            DataType::Utf8 | DataType::LargeUtf8 => {
962                field_with_new_type(field, DataType::Utf8View)
963            }
964            DataType::Binary | DataType::LargeBinary => {
965                field_with_new_type(field, DataType::BinaryView)
966            }
967            _ => Arc::clone(field),
968        })
969        .collect();
970    Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
971}
972
973/// Transform a schema so that any binary types are strings
974pub fn transform_binary_to_string(schema: &Schema) -> Schema {
975    let transformed_fields: Vec<Arc<Field>> = schema
976        .fields
977        .iter()
978        .map(|field| match field.data_type() {
979            DataType::Binary => field_with_new_type(field, DataType::Utf8),
980            DataType::LargeBinary => field_with_new_type(field, DataType::LargeUtf8),
981            DataType::BinaryView => field_with_new_type(field, DataType::Utf8View),
982            _ => Arc::clone(field),
983        })
984        .collect();
985    Schema::new_with_metadata(transformed_fields, schema.metadata.clone())
986}
987
988/// [`MetadataFetch`] adapter for reading bytes from an [`ObjectStore`]
989pub struct ObjectStoreFetch<'a> {
990    store: &'a dyn ObjectStore,
991    meta: &'a ObjectMeta,
992}
993
994impl<'a> ObjectStoreFetch<'a> {
995    pub fn new(store: &'a dyn ObjectStore, meta: &'a ObjectMeta) -> Self {
996        Self { store, meta }
997    }
998}
999
1000impl MetadataFetch for ObjectStoreFetch<'_> {
1001    fn fetch(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes, ParquetError>> {
1002        async {
1003            self.store
1004                .get_range(&self.meta.location, range)
1005                .await
1006                .map_err(ParquetError::from)
1007        }
1008        .boxed()
1009    }
1010}
1011
1012/// Fetches parquet metadata from ObjectStore for given object
1013///
1014/// This component is a subject to **change** in near future and is exposed for low level integrations
1015/// through [`ParquetFileReaderFactory`].
1016///
1017/// [`ParquetFileReaderFactory`]: crate::ParquetFileReaderFactory
1018#[deprecated(
1019    since = "50.0.0",
1020    note = "Use `DFParquetMetadata::fetch_metadata` instead"
1021)]
1022pub async fn fetch_parquet_metadata(
1023    store: &dyn ObjectStore,
1024    object_meta: &ObjectMeta,
1025    size_hint: Option<usize>,
1026    decryption_properties: Option<&FileDecryptionProperties>,
1027    file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
1028) -> Result<Arc<ParquetMetaData>> {
1029    let decryption_properties = decryption_properties.cloned().map(Arc::new);
1030    DFParquetMetadata::new(store, object_meta)
1031        .with_metadata_size_hint(size_hint)
1032        .with_decryption_properties(decryption_properties)
1033        .with_file_metadata_cache(file_metadata_cache)
1034        .fetch_metadata()
1035        .await
1036}
1037
1038/// Read and parse the statistics of the Parquet file at location `path`
1039///
1040/// See [`statistics_from_parquet_meta_calc`] for more details
1041#[deprecated(
1042    since = "50.0.0",
1043    note = "Use `DFParquetMetadata::fetch_statistics` instead"
1044)]
1045pub async fn fetch_statistics(
1046    store: &dyn ObjectStore,
1047    table_schema: SchemaRef,
1048    file: &ObjectMeta,
1049    metadata_size_hint: Option<usize>,
1050    decryption_properties: Option<&FileDecryptionProperties>,
1051    file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
1052) -> Result<Statistics> {
1053    let decryption_properties = decryption_properties.cloned().map(Arc::new);
1054    DFParquetMetadata::new(store, file)
1055        .with_metadata_size_hint(metadata_size_hint)
1056        .with_decryption_properties(decryption_properties)
1057        .with_file_metadata_cache(file_metadata_cache)
1058        .fetch_statistics(&table_schema)
1059        .await
1060}
1061
1062#[deprecated(
1063    since = "50.0.0",
1064    note = "Use `DFParquetMetadata::statistics_from_parquet_metadata` instead"
1065)]
1066pub fn statistics_from_parquet_meta_calc(
1067    metadata: &ParquetMetaData,
1068    table_schema: SchemaRef,
1069) -> Result<Statistics> {
1070    DFParquetMetadata::statistics_from_parquet_metadata(metadata, &table_schema)
1071}
1072
1073/// Implements [`DataSink`] for writing to a parquet file.
1074pub struct ParquetSink {
1075    /// Config options for writing data
1076    config: FileSinkConfig,
1077    /// Underlying parquet options
1078    parquet_options: TableParquetOptions,
1079    /// File metadata from successfully produced parquet files. The Mutex is only used
1080    /// to allow inserting to HashMap from behind borrowed reference in DataSink::write_all.
1081    written: Arc<parking_lot::Mutex<HashMap<Path, ParquetMetaData>>>,
1082}
1083
1084impl Debug for ParquetSink {
1085    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1086        f.debug_struct("ParquetSink").finish()
1087    }
1088}
1089
1090impl DisplayAs for ParquetSink {
1091    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1092        match t {
1093            DisplayFormatType::Default | DisplayFormatType::Verbose => {
1094                write!(f, "ParquetSink(file_groups=",)?;
1095                FileGroupDisplay(&self.config.file_group).fmt_as(t, f)?;
1096                write!(f, ")")
1097            }
1098            DisplayFormatType::TreeRender => {
1099                // TODO: collect info
1100                write!(f, "")
1101            }
1102        }
1103    }
1104}
1105
1106impl ParquetSink {
1107    /// Create from config.
1108    pub fn new(config: FileSinkConfig, parquet_options: TableParquetOptions) -> Self {
1109        Self {
1110            config,
1111            parquet_options,
1112            written: Default::default(),
1113        }
1114    }
1115
1116    /// Retrieve the file metadata for the written files, keyed to the path
1117    /// which may be partitioned (in the case of hive style partitioning).
1118    pub fn written(&self) -> HashMap<Path, ParquetMetaData> {
1119        self.written.lock().clone()
1120    }
1121
1122    /// Create writer properties based upon configuration settings,
1123    /// including partitioning and the inclusion of arrow schema metadata.
1124    async fn create_writer_props(
1125        &self,
1126        runtime: &Arc<RuntimeEnv>,
1127        path: &Path,
1128    ) -> Result<WriterProperties> {
1129        let schema = self.config.output_schema();
1130
1131        // TODO: avoid this clone in follow up PR, where the writer properties & schema
1132        // are calculated once on `ParquetSink::new`
1133        let mut parquet_opts = self.parquet_options.clone();
1134        if !self.parquet_options.global.skip_arrow_metadata {
1135            parquet_opts.arrow_schema(schema);
1136        }
1137
1138        let mut builder = WriterPropertiesBuilder::try_from(&parquet_opts)?;
1139        builder = set_writer_encryption_properties(
1140            builder,
1141            runtime,
1142            parquet_opts,
1143            schema,
1144            path,
1145        )
1146        .await?;
1147        Ok(builder.build())
1148    }
1149
1150    /// Creates an AsyncArrowWriter which serializes a parquet file to an ObjectStore
1151    /// AsyncArrowWriters are used when individual parquet file serialization is not parallelized
1152    async fn create_async_arrow_writer(
1153        &self,
1154        location: &Path,
1155        object_store: Arc<dyn ObjectStore>,
1156        context: &Arc<TaskContext>,
1157        parquet_props: WriterProperties,
1158    ) -> Result<AsyncArrowWriter<BufWriter>> {
1159        let buf_writer = BufWriter::with_capacity(
1160            object_store,
1161            location.clone(),
1162            context
1163                .session_config()
1164                .options()
1165                .execution
1166                .objectstore_writer_buffer_size,
1167        );
1168        let options = ArrowWriterOptions::new()
1169            .with_properties(parquet_props)
1170            .with_skip_arrow_metadata(self.parquet_options.global.skip_arrow_metadata);
1171
1172        let writer = AsyncArrowWriter::try_new_with_options(
1173            buf_writer,
1174            get_writer_schema(&self.config),
1175            options,
1176        )?;
1177        Ok(writer)
1178    }
1179
1180    /// Parquet options
1181    pub fn parquet_options(&self) -> &TableParquetOptions {
1182        &self.parquet_options
1183    }
1184}
1185
1186#[cfg(feature = "parquet_encryption")]
1187async fn set_writer_encryption_properties(
1188    builder: WriterPropertiesBuilder,
1189    runtime: &Arc<RuntimeEnv>,
1190    parquet_opts: TableParquetOptions,
1191    schema: &Arc<Schema>,
1192    path: &Path,
1193) -> Result<WriterPropertiesBuilder> {
1194    if let Some(file_encryption_properties) = parquet_opts.crypto.file_encryption {
1195        // Encryption properties have been specified directly
1196        return Ok(builder.with_file_encryption_properties(Arc::new(
1197            FileEncryptionProperties::from(file_encryption_properties),
1198        )));
1199    } else if let Some(encryption_factory_id) = &parquet_opts.crypto.factory_id.as_ref() {
1200        // Encryption properties will be generated by an encryption factory
1201        let encryption_factory =
1202            runtime.parquet_encryption_factory(encryption_factory_id)?;
1203        let file_encryption_properties = encryption_factory
1204            .get_file_encryption_properties(
1205                &parquet_opts.crypto.factory_options,
1206                schema,
1207                path,
1208            )
1209            .await?;
1210        if let Some(file_encryption_properties) = file_encryption_properties {
1211            return Ok(
1212                builder.with_file_encryption_properties(file_encryption_properties)
1213            );
1214        }
1215    }
1216    Ok(builder)
1217}
1218
1219#[cfg(not(feature = "parquet_encryption"))]
1220async fn set_writer_encryption_properties(
1221    builder: WriterPropertiesBuilder,
1222    _runtime: &Arc<RuntimeEnv>,
1223    _parquet_opts: TableParquetOptions,
1224    _schema: &Arc<Schema>,
1225    _path: &Path,
1226) -> Result<WriterPropertiesBuilder> {
1227    Ok(builder)
1228}
1229
1230#[async_trait]
1231impl FileSink for ParquetSink {
1232    fn config(&self) -> &FileSinkConfig {
1233        &self.config
1234    }
1235
1236    async fn spawn_writer_tasks_and_join(
1237        &self,
1238        context: &Arc<TaskContext>,
1239        demux_task: SpawnedTask<Result<()>>,
1240        mut file_stream_rx: DemuxedStreamReceiver,
1241        object_store: Arc<dyn ObjectStore>,
1242    ) -> Result<u64> {
1243        let parquet_opts = &self.parquet_options;
1244
1245        let mut file_write_tasks: JoinSet<
1246            std::result::Result<(Path, ParquetMetaData), DataFusionError>,
1247        > = JoinSet::new();
1248
1249        let runtime = context.runtime_env();
1250        let parallel_options = ParallelParquetWriterOptions {
1251            max_parallel_row_groups: parquet_opts
1252                .global
1253                .maximum_parallel_row_group_writers,
1254            max_buffered_record_batches_per_stream: parquet_opts
1255                .global
1256                .maximum_buffered_record_batches_per_stream,
1257        };
1258
1259        while let Some((path, mut rx)) = file_stream_rx.recv().await {
1260            let parquet_props = self.create_writer_props(&runtime, &path).await?;
1261            if !parquet_opts.global.allow_single_file_parallelism {
1262                let mut writer = self
1263                    .create_async_arrow_writer(
1264                        &path,
1265                        Arc::clone(&object_store),
1266                        context,
1267                        parquet_props.clone(),
1268                    )
1269                    .await?;
1270                let mut reservation = MemoryConsumer::new(format!("ParquetSink[{path}]"))
1271                    .register(context.memory_pool());
1272                file_write_tasks.spawn(async move {
1273                    while let Some(batch) = rx.recv().await {
1274                        writer.write(&batch).await?;
1275                        reservation.try_resize(writer.memory_size())?;
1276                    }
1277                    let parquet_meta_data = writer
1278                        .close()
1279                        .await
1280                        .map_err(|e| DataFusionError::ParquetError(Box::new(e)))?;
1281                    Ok((path, parquet_meta_data))
1282                });
1283            } else {
1284                let writer = ObjectWriterBuilder::new(
1285                    // Parquet files as a whole are never compressed, since they
1286                    // manage compressed blocks themselves.
1287                    FileCompressionType::UNCOMPRESSED,
1288                    &path,
1289                    Arc::clone(&object_store),
1290                )
1291                .with_buffer_size(Some(
1292                    context
1293                        .session_config()
1294                        .options()
1295                        .execution
1296                        .objectstore_writer_buffer_size,
1297                ))
1298                .build()?;
1299                let schema = get_writer_schema(&self.config);
1300                let props = parquet_props.clone();
1301                let skip_arrow_metadata = self.parquet_options.global.skip_arrow_metadata;
1302                let parallel_options_clone = parallel_options.clone();
1303                let pool = Arc::clone(context.memory_pool());
1304                file_write_tasks.spawn(async move {
1305                    let parquet_meta_data = output_single_parquet_file_parallelized(
1306                        writer,
1307                        rx,
1308                        schema,
1309                        &props,
1310                        skip_arrow_metadata,
1311                        parallel_options_clone,
1312                        pool,
1313                    )
1314                    .await?;
1315                    Ok((path, parquet_meta_data))
1316                });
1317            }
1318        }
1319
1320        let mut row_count = 0;
1321        while let Some(result) = file_write_tasks.join_next().await {
1322            match result {
1323                Ok(r) => {
1324                    let (path, parquet_meta_data) = r?;
1325                    row_count += parquet_meta_data.file_metadata().num_rows();
1326                    let mut written_files = self.written.lock();
1327                    written_files
1328                        .try_insert(path.clone(), parquet_meta_data)
1329                        .map_err(|e| internal_datafusion_err!("duplicate entry detected for partitioned file {path}: {e}"))?;
1330                    drop(written_files);
1331                }
1332                Err(e) => {
1333                    if e.is_panic() {
1334                        std::panic::resume_unwind(e.into_panic());
1335                    } else {
1336                        unreachable!();
1337                    }
1338                }
1339            }
1340        }
1341
1342        demux_task
1343            .join_unwind()
1344            .await
1345            .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
1346
1347        Ok(row_count as u64)
1348    }
1349}
1350
1351#[async_trait]
1352impl DataSink for ParquetSink {
1353    fn as_any(&self) -> &dyn Any {
1354        self
1355    }
1356
1357    fn schema(&self) -> &SchemaRef {
1358        self.config.output_schema()
1359    }
1360
1361    async fn write_all(
1362        &self,
1363        data: SendableRecordBatchStream,
1364        context: &Arc<TaskContext>,
1365    ) -> Result<u64> {
1366        FileSink::write_all(self, data, context).await
1367    }
1368}
1369
1370/// Consumes a stream of [ArrowLeafColumn] via a channel and serializes them using an [ArrowColumnWriter]
1371/// Once the channel is exhausted, returns the ArrowColumnWriter.
1372async fn column_serializer_task(
1373    mut rx: Receiver<ArrowLeafColumn>,
1374    mut writer: ArrowColumnWriter,
1375    mut reservation: MemoryReservation,
1376) -> Result<(ArrowColumnWriter, MemoryReservation)> {
1377    while let Some(col) = rx.recv().await {
1378        writer.write(&col)?;
1379        reservation.try_resize(writer.memory_size())?;
1380    }
1381    Ok((writer, reservation))
1382}
1383
1384type ColumnWriterTask = SpawnedTask<Result<(ArrowColumnWriter, MemoryReservation)>>;
1385type ColSender = Sender<ArrowLeafColumn>;
1386
1387/// Spawns a parallel serialization task for each column
1388/// Returns join handles for each columns serialization task along with a send channel
1389/// to send arrow arrays to each serialization task.
1390fn spawn_column_parallel_row_group_writer(
1391    col_writers: Vec<ArrowColumnWriter>,
1392    max_buffer_size: usize,
1393    pool: &Arc<dyn MemoryPool>,
1394) -> Result<(Vec<ColumnWriterTask>, Vec<ColSender>)> {
1395    let num_columns = col_writers.len();
1396
1397    let mut col_writer_tasks = Vec::with_capacity(num_columns);
1398    let mut col_array_channels = Vec::with_capacity(num_columns);
1399    for writer in col_writers.into_iter() {
1400        // Buffer size of this channel limits the number of arrays queued up for column level serialization
1401        let (send_array, receive_array) =
1402            mpsc::channel::<ArrowLeafColumn>(max_buffer_size);
1403        col_array_channels.push(send_array);
1404
1405        let reservation =
1406            MemoryConsumer::new("ParquetSink(ArrowColumnWriter)").register(pool);
1407        let task = SpawnedTask::spawn(column_serializer_task(
1408            receive_array,
1409            writer,
1410            reservation,
1411        ));
1412        col_writer_tasks.push(task);
1413    }
1414
1415    Ok((col_writer_tasks, col_array_channels))
1416}
1417
1418/// Settings related to writing parquet files in parallel
1419#[derive(Clone)]
1420struct ParallelParquetWriterOptions {
1421    max_parallel_row_groups: usize,
1422    max_buffered_record_batches_per_stream: usize,
1423}
1424
1425/// This is the return type of calling [ArrowColumnWriter].close() on each column
1426/// i.e. the Vec of encoded columns which can be appended to a row group
1427type RBStreamSerializeResult = Result<(Vec<ArrowColumnChunk>, MemoryReservation, usize)>;
1428
1429/// Sends the ArrowArrays in passed [RecordBatch] through the channels to their respective
1430/// parallel column serializers.
1431async fn send_arrays_to_col_writers(
1432    col_array_channels: &[ColSender],
1433    rb: &RecordBatch,
1434    schema: Arc<Schema>,
1435) -> Result<()> {
1436    // Each leaf column has its own channel, increment next_channel for each leaf column sent.
1437    let mut next_channel = 0;
1438    for (array, field) in rb.columns().iter().zip(schema.fields()) {
1439        for c in compute_leaves(field, array)? {
1440            // Do not surface error from closed channel (means something
1441            // else hit an error, and the plan is shutting down).
1442            if col_array_channels[next_channel].send(c).await.is_err() {
1443                return Ok(());
1444            }
1445
1446            next_channel += 1;
1447        }
1448    }
1449
1450    Ok(())
1451}
1452
1453/// Spawns a tokio task which joins the parallel column writer tasks,
1454/// and finalizes the row group
1455fn spawn_rg_join_and_finalize_task(
1456    column_writer_tasks: Vec<ColumnWriterTask>,
1457    rg_rows: usize,
1458    pool: &Arc<dyn MemoryPool>,
1459) -> SpawnedTask<RBStreamSerializeResult> {
1460    let mut rg_reservation =
1461        MemoryConsumer::new("ParquetSink(SerializedRowGroupWriter)").register(pool);
1462
1463    SpawnedTask::spawn(async move {
1464        let num_cols = column_writer_tasks.len();
1465        let mut finalized_rg = Vec::with_capacity(num_cols);
1466        for task in column_writer_tasks.into_iter() {
1467            let (writer, _col_reservation) = task
1468                .join_unwind()
1469                .await
1470                .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
1471            let encoded_size = writer.get_estimated_total_bytes();
1472            rg_reservation.grow(encoded_size);
1473            finalized_rg.push(writer.close()?);
1474        }
1475
1476        Ok((finalized_rg, rg_reservation, rg_rows))
1477    })
1478}
1479
1480/// This task coordinates the serialization of a parquet file in parallel.
1481/// As the query produces RecordBatches, these are written to a RowGroup
1482/// via parallel [ArrowColumnWriter] tasks. Once the desired max rows per
1483/// row group is reached, the parallel tasks are joined on another separate task
1484/// and sent to a concatenation task. This task immediately continues to work
1485/// on the next row group in parallel. So, parquet serialization is parallelized
1486/// across both columns and row_groups, with a theoretical max number of parallel tasks
1487/// given by n_columns * num_row_groups.
1488fn spawn_parquet_parallel_serialization_task(
1489    row_group_writer_factory: ArrowRowGroupWriterFactory,
1490    mut data: Receiver<RecordBatch>,
1491    serialize_tx: Sender<SpawnedTask<RBStreamSerializeResult>>,
1492    schema: Arc<Schema>,
1493    writer_props: Arc<WriterProperties>,
1494    parallel_options: ParallelParquetWriterOptions,
1495    pool: Arc<dyn MemoryPool>,
1496) -> SpawnedTask<Result<(), DataFusionError>> {
1497    SpawnedTask::spawn(async move {
1498        let max_buffer_rb = parallel_options.max_buffered_record_batches_per_stream;
1499        let max_row_group_rows = writer_props.max_row_group_size();
1500        let mut row_group_index = 0;
1501        let col_writers =
1502            row_group_writer_factory.create_column_writers(row_group_index)?;
1503        let (mut column_writer_handles, mut col_array_channels) =
1504            spawn_column_parallel_row_group_writer(col_writers, max_buffer_rb, &pool)?;
1505        let mut current_rg_rows = 0;
1506
1507        while let Some(mut rb) = data.recv().await {
1508            // This loop allows the "else" block to repeatedly split the RecordBatch to handle the case
1509            // when max_row_group_rows < execution.batch_size as an alternative to a recursive async
1510            // function.
1511            loop {
1512                if current_rg_rows + rb.num_rows() < max_row_group_rows {
1513                    send_arrays_to_col_writers(
1514                        &col_array_channels,
1515                        &rb,
1516                        Arc::clone(&schema),
1517                    )
1518                    .await?;
1519                    current_rg_rows += rb.num_rows();
1520                    break;
1521                } else {
1522                    let rows_left = max_row_group_rows - current_rg_rows;
1523                    let a = rb.slice(0, rows_left);
1524                    send_arrays_to_col_writers(
1525                        &col_array_channels,
1526                        &a,
1527                        Arc::clone(&schema),
1528                    )
1529                    .await?;
1530
1531                    // Signal the parallel column writers that the RowGroup is done, join and finalize RowGroup
1532                    // on a separate task, so that we can immediately start on the next RG before waiting
1533                    // for the current one to finish.
1534                    drop(col_array_channels);
1535                    let finalize_rg_task = spawn_rg_join_and_finalize_task(
1536                        column_writer_handles,
1537                        max_row_group_rows,
1538                        &pool,
1539                    );
1540
1541                    // Do not surface error from closed channel (means something
1542                    // else hit an error, and the plan is shutting down).
1543                    if serialize_tx.send(finalize_rg_task).await.is_err() {
1544                        return Ok(());
1545                    }
1546
1547                    current_rg_rows = 0;
1548                    rb = rb.slice(rows_left, rb.num_rows() - rows_left);
1549
1550                    row_group_index += 1;
1551                    let col_writers = row_group_writer_factory
1552                        .create_column_writers(row_group_index)?;
1553                    (column_writer_handles, col_array_channels) =
1554                        spawn_column_parallel_row_group_writer(
1555                            col_writers,
1556                            max_buffer_rb,
1557                            &pool,
1558                        )?;
1559                }
1560            }
1561        }
1562
1563        drop(col_array_channels);
1564        // Handle leftover rows as final rowgroup, which may be smaller than max_row_group_rows
1565        if current_rg_rows > 0 {
1566            let finalize_rg_task = spawn_rg_join_and_finalize_task(
1567                column_writer_handles,
1568                current_rg_rows,
1569                &pool,
1570            );
1571
1572            // Do not surface error from closed channel (means something
1573            // else hit an error, and the plan is shutting down).
1574            if serialize_tx.send(finalize_rg_task).await.is_err() {
1575                return Ok(());
1576            }
1577        }
1578
1579        Ok(())
1580    })
1581}
1582
1583/// Consume RowGroups serialized by other parallel tasks and concatenate them in
1584/// to the final parquet file, while flushing finalized bytes to an [ObjectStore]
1585async fn concatenate_parallel_row_groups(
1586    mut parquet_writer: SerializedFileWriter<SharedBuffer>,
1587    merged_buff: SharedBuffer,
1588    mut serialize_rx: Receiver<SpawnedTask<RBStreamSerializeResult>>,
1589    mut object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
1590    pool: Arc<dyn MemoryPool>,
1591) -> Result<ParquetMetaData> {
1592    let mut file_reservation =
1593        MemoryConsumer::new("ParquetSink(SerializedFileWriter)").register(&pool);
1594
1595    while let Some(task) = serialize_rx.recv().await {
1596        let result = task.join_unwind().await;
1597        let (serialized_columns, mut rg_reservation, _cnt) =
1598            result.map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
1599
1600        let mut rg_out = parquet_writer.next_row_group()?;
1601        for chunk in serialized_columns {
1602            chunk.append_to_row_group(&mut rg_out)?;
1603            rg_reservation.free();
1604
1605            let mut buff_to_flush = merged_buff.buffer.try_lock().unwrap();
1606            file_reservation.try_resize(buff_to_flush.len())?;
1607
1608            if buff_to_flush.len() > BUFFER_FLUSH_BYTES {
1609                object_store_writer
1610                    .write_all(buff_to_flush.as_slice())
1611                    .await?;
1612                buff_to_flush.clear();
1613                file_reservation.try_resize(buff_to_flush.len())?; // will set to zero
1614            }
1615        }
1616        rg_out.close()?;
1617    }
1618
1619    let parquet_meta_data = parquet_writer.close()?;
1620    let final_buff = merged_buff.buffer.try_lock().unwrap();
1621
1622    object_store_writer.write_all(final_buff.as_slice()).await?;
1623    object_store_writer.shutdown().await?;
1624    file_reservation.free();
1625
1626    Ok(parquet_meta_data)
1627}
1628
1629/// Parallelizes the serialization of a single parquet file, by first serializing N
1630/// independent RecordBatch streams in parallel to RowGroups in memory. Another
1631/// task then stitches these independent RowGroups together and streams this large
1632/// single parquet file to an ObjectStore in multiple parts.
1633async fn output_single_parquet_file_parallelized(
1634    object_store_writer: Box<dyn AsyncWrite + Send + Unpin>,
1635    data: Receiver<RecordBatch>,
1636    output_schema: Arc<Schema>,
1637    parquet_props: &WriterProperties,
1638    skip_arrow_metadata: bool,
1639    parallel_options: ParallelParquetWriterOptions,
1640    pool: Arc<dyn MemoryPool>,
1641) -> Result<ParquetMetaData> {
1642    let max_rowgroups = parallel_options.max_parallel_row_groups;
1643    // Buffer size of this channel limits maximum number of RowGroups being worked on in parallel
1644    let (serialize_tx, serialize_rx) =
1645        mpsc::channel::<SpawnedTask<RBStreamSerializeResult>>(max_rowgroups);
1646
1647    let arc_props = Arc::new(parquet_props.clone());
1648    let merged_buff = SharedBuffer::new(INITIAL_BUFFER_BYTES);
1649    let options = ArrowWriterOptions::new()
1650        .with_properties(parquet_props.clone())
1651        .with_skip_arrow_metadata(skip_arrow_metadata);
1652    let writer = ArrowWriter::try_new_with_options(
1653        merged_buff.clone(),
1654        Arc::clone(&output_schema),
1655        options,
1656    )?;
1657    let (writer, row_group_writer_factory) = writer.into_serialized_writer()?;
1658
1659    let launch_serialization_task = spawn_parquet_parallel_serialization_task(
1660        row_group_writer_factory,
1661        data,
1662        serialize_tx,
1663        Arc::clone(&output_schema),
1664        Arc::clone(&arc_props),
1665        parallel_options,
1666        Arc::clone(&pool),
1667    );
1668    let parquet_meta_data = concatenate_parallel_row_groups(
1669        writer,
1670        merged_buff,
1671        serialize_rx,
1672        object_store_writer,
1673        pool,
1674    )
1675    .await?;
1676
1677    launch_serialization_task
1678        .join_unwind()
1679        .await
1680        .map_err(|e| DataFusionError::ExecutionJoin(Box::new(e)))??;
1681    Ok(parquet_meta_data)
1682}
1683
1684#[cfg(test)]
1685mod tests {
1686    use parquet::arrow::parquet_to_arrow_schema;
1687    use std::sync::Arc;
1688
1689    use super::*;
1690
1691    use arrow::datatypes::DataType;
1692    use parquet::schema::parser::parse_message_type;
1693
1694    #[test]
1695    fn coerce_int96_to_resolution_with_mixed_timestamps() {
1696        // Unclear if Spark (or other writer) could generate a file with mixed timestamps like this,
1697        // but we want to test the scenario just in case since it's at least a valid schema as far
1698        // as the Parquet spec is concerned.
1699        let spark_schema = "
1700        message spark_schema {
1701          optional int96 c0;
1702          optional int64 c1 (TIMESTAMP(NANOS,true));
1703          optional int64 c2 (TIMESTAMP(NANOS,false));
1704          optional int64 c3 (TIMESTAMP(MILLIS,true));
1705          optional int64 c4 (TIMESTAMP(MILLIS,false));
1706          optional int64 c5 (TIMESTAMP(MICROS,true));
1707          optional int64 c6 (TIMESTAMP(MICROS,false));
1708        }
1709        ";
1710
1711        let schema = parse_message_type(spark_schema).expect("should parse schema");
1712        let descr = SchemaDescriptor::new(Arc::new(schema));
1713
1714        let arrow_schema = parquet_to_arrow_schema(&descr, None).unwrap();
1715
1716        let result =
1717            coerce_int96_to_resolution(&descr, &arrow_schema, &TimeUnit::Microsecond)
1718                .unwrap();
1719
1720        // Only the first field (c0) should be converted to a microsecond timestamp because it's the
1721        // only timestamp that originated from an INT96.
1722        let expected_schema = Schema::new(vec![
1723            Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None), true),
1724            Field::new(
1725                "c1",
1726                DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
1727                true,
1728            ),
1729            Field::new("c2", DataType::Timestamp(TimeUnit::Nanosecond, None), true),
1730            Field::new(
1731                "c3",
1732                DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".into())),
1733                true,
1734            ),
1735            Field::new("c4", DataType::Timestamp(TimeUnit::Millisecond, None), true),
1736            Field::new(
1737                "c5",
1738                DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
1739                true,
1740            ),
1741            Field::new("c6", DataType::Timestamp(TimeUnit::Microsecond, None), true),
1742        ]);
1743
1744        assert_eq!(result, expected_schema);
1745    }
1746
1747    #[test]
1748    fn coerce_int96_to_resolution_with_nested_types() {
1749        // This schema is derived from Comet's CometFuzzTestSuite ParquetGenerator only using int96
1750        // primitive types with generateStruct, generateArray, and generateMap set to true, with one
1751        // additional field added to c4's struct to make sure all fields in a struct get modified.
1752        // https://github.com/apache/datafusion-comet/blob/main/spark/src/main/scala/org/apache/comet/testing/ParquetGenerator.scala
1753        let spark_schema = "
1754        message spark_schema {
1755          optional int96 c0;
1756          optional group c1 {
1757            optional int96 c0;
1758          }
1759          optional group c2 {
1760            optional group c0 (LIST) {
1761              repeated group list {
1762                optional int96 element;
1763              }
1764            }
1765          }
1766          optional group c3 (LIST) {
1767            repeated group list {
1768              optional int96 element;
1769            }
1770          }
1771          optional group c4 (LIST) {
1772            repeated group list {
1773              optional group element {
1774                optional int96 c0;
1775                optional int96 c1;
1776              }
1777            }
1778          }
1779          optional group c5 (MAP) {
1780            repeated group key_value {
1781              required int96 key;
1782              optional int96 value;
1783            }
1784          }
1785          optional group c6 (LIST) {
1786            repeated group list {
1787              optional group element (MAP) {
1788                repeated group key_value {
1789                  required int96 key;
1790                  optional int96 value;
1791                }
1792              }
1793            }
1794          }
1795        }
1796        ";
1797
1798        let schema = parse_message_type(spark_schema).expect("should parse schema");
1799        let descr = SchemaDescriptor::new(Arc::new(schema));
1800
1801        let arrow_schema = parquet_to_arrow_schema(&descr, None).unwrap();
1802
1803        let result =
1804            coerce_int96_to_resolution(&descr, &arrow_schema, &TimeUnit::Microsecond)
1805                .unwrap();
1806
1807        let expected_schema = Schema::new(vec![
1808            Field::new("c0", DataType::Timestamp(TimeUnit::Microsecond, None), true),
1809            Field::new_struct(
1810                "c1",
1811                vec![Field::new(
1812                    "c0",
1813                    DataType::Timestamp(TimeUnit::Microsecond, None),
1814                    true,
1815                )],
1816                true,
1817            ),
1818            Field::new_struct(
1819                "c2",
1820                vec![Field::new_list(
1821                    "c0",
1822                    Field::new(
1823                        "element",
1824                        DataType::Timestamp(TimeUnit::Microsecond, None),
1825                        true,
1826                    ),
1827                    true,
1828                )],
1829                true,
1830            ),
1831            Field::new_list(
1832                "c3",
1833                Field::new(
1834                    "element",
1835                    DataType::Timestamp(TimeUnit::Microsecond, None),
1836                    true,
1837                ),
1838                true,
1839            ),
1840            Field::new_list(
1841                "c4",
1842                Field::new_struct(
1843                    "element",
1844                    vec![
1845                        Field::new(
1846                            "c0",
1847                            DataType::Timestamp(TimeUnit::Microsecond, None),
1848                            true,
1849                        ),
1850                        Field::new(
1851                            "c1",
1852                            DataType::Timestamp(TimeUnit::Microsecond, None),
1853                            true,
1854                        ),
1855                    ],
1856                    true,
1857                ),
1858                true,
1859            ),
1860            Field::new_map(
1861                "c5",
1862                "key_value",
1863                Field::new(
1864                    "key",
1865                    DataType::Timestamp(TimeUnit::Microsecond, None),
1866                    false,
1867                ),
1868                Field::new(
1869                    "value",
1870                    DataType::Timestamp(TimeUnit::Microsecond, None),
1871                    true,
1872                ),
1873                false,
1874                true,
1875            ),
1876            Field::new_list(
1877                "c6",
1878                Field::new_map(
1879                    "element",
1880                    "key_value",
1881                    Field::new(
1882                        "key",
1883                        DataType::Timestamp(TimeUnit::Microsecond, None),
1884                        false,
1885                    ),
1886                    Field::new(
1887                        "value",
1888                        DataType::Timestamp(TimeUnit::Microsecond, None),
1889                        true,
1890                    ),
1891                    false,
1892                    true,
1893                ),
1894                true,
1895            ),
1896        ]);
1897
1898        assert_eq!(result, expected_schema);
1899    }
1900}