datafusion_common/
config.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Runtime configuration, via [`ConfigOptions`]
19
20use arrow_ipc::CompressionType;
21
22#[cfg(feature = "parquet_encryption")]
23use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties};
24use crate::error::_config_err;
25use crate::format::{ExplainAnalyzeLevel, ExplainFormat};
26use crate::parsers::CompressionTypeVariant;
27use crate::utils::get_available_parallelism;
28use crate::{DataFusionError, Result};
29#[cfg(feature = "parquet_encryption")]
30use hex;
31use std::any::Any;
32use std::collections::{BTreeMap, HashMap};
33use std::error::Error;
34use std::fmt::{self, Display};
35use std::str::FromStr;
36#[cfg(feature = "parquet_encryption")]
37use std::sync::Arc;
38
39/// A macro that wraps a configuration struct and automatically derives
40/// [`Default`] and [`ConfigField`] for it, allowing it to be used
41/// in the [`ConfigOptions`] configuration tree.
42///
43/// `transform` is used to normalize values before parsing.
44///
45/// For example,
46///
47/// ```ignore
48/// config_namespace! {
49///    /// Amazing config
50///    pub struct MyConfig {
51///        /// Field 1 doc
52///        field1: String, transform = str::to_lowercase, default = "".to_string()
53///
54///        /// Field 2 doc
55///        field2: usize, default = 232
56///
57///        /// Field 3 doc
58///        field3: Option<usize>, default = None
59///    }
60/// }
61/// ```
62///
63/// Will generate
64///
65/// ```ignore
66/// /// Amazing config
67/// #[derive(Debug, Clone)]
68/// #[non_exhaustive]
69/// pub struct MyConfig {
70///     /// Field 1 doc
71///     field1: String,
72///     /// Field 2 doc
73///     field2: usize,
74///     /// Field 3 doc
75///     field3: Option<usize>,
76/// }
77/// impl ConfigField for MyConfig {
78///     fn set(&mut self, key: &str, value: &str) -> Result<()> {
79///         let (key, rem) = key.split_once('.').unwrap_or((key, ""));
80///         match key {
81///             "field1" => {
82///                 let value = str::to_lowercase(value);
83///                 self.field1.set(rem, value.as_ref())
84///             },
85///             "field2" => self.field2.set(rem, value.as_ref()),
86///             "field3" => self.field3.set(rem, value.as_ref()),
87///             _ => _internal_err!(
88///                 "Config value \"{}\" not found on MyConfig",
89///                 key
90///             ),
91///         }
92///     }
93///
94///     fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
95///         let key = format!("{}.field1", key_prefix);
96///         let desc = "Field 1 doc";
97///         self.field1.visit(v, key.as_str(), desc);
98///         let key = format!("{}.field2", key_prefix);
99///         let desc = "Field 2 doc";
100///         self.field2.visit(v, key.as_str(), desc);
101///         let key = format!("{}.field3", key_prefix);
102///         let desc = "Field 3 doc";
103///         self.field3.visit(v, key.as_str(), desc);
104///     }
105/// }
106///
107/// impl Default for MyConfig {
108///     fn default() -> Self {
109///         Self {
110///             field1: "".to_string(),
111///             field2: 232,
112///             field3: None,
113///         }
114///     }
115/// }
116/// ```
117///
118/// NB: Misplaced commas may result in nonsensical errors
119#[macro_export]
120macro_rules! config_namespace {
121    (
122        $(#[doc = $struct_d:tt])* // Struct-level documentation attributes
123        $(#[deprecated($($struct_depr:tt)*)])? // Optional struct-level deprecated attribute
124        $(#[allow($($struct_de:tt)*)])?
125        $vis:vis struct $struct_name:ident {
126            $(
127                $(#[doc = $d:tt])* // Field-level documentation attributes
128                $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
129                $(#[allow($($field_de:tt)*)])?
130                $field_vis:vis $field_name:ident : $field_type:ty,
131                $(warn = $warn:expr,)?
132                $(transform = $transform:expr,)?
133                default = $default:expr
134            )*$(,)*
135        }
136    ) => {
137        $(#[doc = $struct_d])* // Apply struct documentation
138        $(#[deprecated($($struct_depr)*)])? // Apply struct deprecation
139        $(#[allow($($struct_de)*)])?
140        #[derive(Debug, Clone, PartialEq)]
141        $vis struct $struct_name {
142            $(
143                $(#[doc = $d])* // Apply field documentation
144                $(#[deprecated($($field_depr)*)])? // Apply field deprecation
145                $(#[allow($($field_de)*)])?
146                $field_vis $field_name: $field_type,
147            )*
148        }
149
150        impl $crate::config::ConfigField for $struct_name {
151            fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
152                let (key, rem) = key.split_once('.').unwrap_or((key, ""));
153                match key {
154                    $(
155                        stringify!($field_name) => {
156                            // Safely apply deprecated attribute if present
157                            // $(#[allow(deprecated)])?
158                            {
159                                $(let value = $transform(value);)? // Apply transformation if specified
160                                #[allow(deprecated)]
161                                let ret = self.$field_name.set(rem, value.as_ref());
162
163                                $(if !$warn.is_empty() {
164                                    let default: $field_type = $default;
165                                    #[allow(deprecated)]
166                                    if default != self.$field_name {
167                                        log::warn!($warn);
168                                    }
169                                })? // Log warning if specified, and the value is not the default
170                                ret
171                            }
172                        },
173                    )*
174                    _ => return $crate::error::_config_err!(
175                        "Config value \"{}\" not found on {}", key, stringify!($struct_name)
176                    )
177                }
178            }
179
180            fn visit<V: $crate::config::Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
181                $(
182                    let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
183                    let desc = concat!($($d),*).trim();
184                    #[allow(deprecated)]
185                    self.$field_name.visit(v, key.as_str(), desc);
186                )*
187            }
188        }
189        impl Default for $struct_name {
190            fn default() -> Self {
191                #[allow(deprecated)]
192                Self {
193                    $($field_name: $default),*
194                }
195            }
196        }
197    }
198}
199
200config_namespace! {
201    /// Options related to catalog and directory scanning
202    ///
203    /// See also: [`SessionConfig`]
204    ///
205    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
206    pub struct CatalogOptions {
207        /// Whether the default catalog and schema should be created automatically.
208        pub create_default_catalog_and_schema: bool, default = true
209
210        /// The default catalog name - this impacts what SQL queries use if not specified
211        pub default_catalog: String, default = "datafusion".to_string()
212
213        /// The default schema name - this impacts what SQL queries use if not specified
214        pub default_schema: String, default = "public".to_string()
215
216        /// Should DataFusion provide access to `information_schema`
217        /// virtual tables for displaying schema information
218        pub information_schema: bool, default = false
219
220        /// Location scanned to load tables for `default` schema
221        pub location: Option<String>, default = None
222
223        /// Type of `TableProvider` to use when loading `default` schema
224        pub format: Option<String>, default = None
225
226        /// Default value for `format.has_header` for `CREATE EXTERNAL TABLE`
227        /// if not specified explicitly in the statement.
228        pub has_header: bool, default = true
229
230        /// Specifies whether newlines in (quoted) CSV values are supported.
231        ///
232        /// This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE`
233        /// if not specified explicitly in the statement.
234        ///
235        /// Parsing newlines in quoted values may be affected by execution behaviour such as
236        /// parallel file scanning. Setting this to `true` ensures that newlines in values are
237        /// parsed successfully, which may reduce performance.
238        pub newlines_in_values: bool, default = false
239    }
240}
241
242config_namespace! {
243    /// Options related to SQL parser
244    ///
245    /// See also: [`SessionConfig`]
246    ///
247    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
248    pub struct SqlParserOptions {
249        /// When set to true, SQL parser will parse float as decimal type
250        pub parse_float_as_decimal: bool, default = false
251
252        /// When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted)
253        pub enable_ident_normalization: bool, default = true
254
255        /// When set to true, SQL parser will normalize options value (convert value to lowercase).
256        /// Note that this option is ignored and will be removed in the future. All case-insensitive values
257        /// are normalized automatically.
258        pub enable_options_value_normalization: bool, warn = "`enable_options_value_normalization` is deprecated and ignored", default = false
259
260        /// Configure the SQL dialect used by DataFusion's parser; supported values include: Generic,
261        /// MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB and Databricks.
262        pub dialect: Dialect, default = Dialect::Generic
263        // no need to lowercase because `sqlparser::dialect_from_str`] is case-insensitive
264
265        /// If true, permit lengths for `VARCHAR` such as `VARCHAR(20)`, but
266        /// ignore the length. If false, error if a `VARCHAR` with a length is
267        /// specified. The Arrow type system does not have a notion of maximum
268        /// string length and thus DataFusion can not enforce such limits.
269        pub support_varchar_with_length: bool, default = true
270
271        /// If true, string types (VARCHAR, CHAR, Text, and String) are mapped to `Utf8View` during SQL planning.
272        /// If false, they are mapped to `Utf8`.
273        /// Default is true.
274        pub map_string_types_to_utf8view: bool, default = true
275
276        /// When set to true, the source locations relative to the original SQL
277        /// query (i.e. [`Span`](https://docs.rs/sqlparser/latest/sqlparser/tokenizer/struct.Span.html)) will be collected
278        /// and recorded in the logical plan nodes.
279        pub collect_spans: bool, default = false
280
281        /// Specifies the recursion depth limit when parsing complex SQL Queries
282        pub recursion_limit: usize, default = 50
283
284        /// Specifies the default null ordering for query results. There are 4 options:
285        /// - `nulls_max`: Nulls appear last in ascending order.
286        /// - `nulls_min`: Nulls appear first in ascending order.
287        /// - `nulls_first`: Nulls always be first in any order.
288        /// - `nulls_last`: Nulls always be last in any order.
289        ///
290        /// By default, `nulls_max` is used to follow Postgres's behavior.
291        /// postgres rule: <https://www.postgresql.org/docs/current/queries-order.html>
292        pub default_null_ordering: String, default = "nulls_max".to_string()
293    }
294}
295
296/// This is the SQL dialect used by DataFusion's parser.
297/// This mirrors [sqlparser::dialect::Dialect](https://docs.rs/sqlparser/latest/sqlparser/dialect/trait.Dialect.html)
298/// trait in order to offer an easier API and avoid adding the `sqlparser` dependency
299#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
300pub enum Dialect {
301    #[default]
302    Generic,
303    MySQL,
304    PostgreSQL,
305    Hive,
306    SQLite,
307    Snowflake,
308    Redshift,
309    MsSQL,
310    ClickHouse,
311    BigQuery,
312    Ansi,
313    DuckDB,
314    Databricks,
315}
316
317impl AsRef<str> for Dialect {
318    fn as_ref(&self) -> &str {
319        match self {
320            Self::Generic => "generic",
321            Self::MySQL => "mysql",
322            Self::PostgreSQL => "postgresql",
323            Self::Hive => "hive",
324            Self::SQLite => "sqlite",
325            Self::Snowflake => "snowflake",
326            Self::Redshift => "redshift",
327            Self::MsSQL => "mssql",
328            Self::ClickHouse => "clickhouse",
329            Self::BigQuery => "bigquery",
330            Self::Ansi => "ansi",
331            Self::DuckDB => "duckdb",
332            Self::Databricks => "databricks",
333        }
334    }
335}
336
337impl FromStr for Dialect {
338    type Err = DataFusionError;
339
340    fn from_str(s: &str) -> Result<Self, Self::Err> {
341        let value = match s.to_ascii_lowercase().as_str() {
342            "generic" => Self::Generic,
343            "mysql" => Self::MySQL,
344            "postgresql" | "postgres" => Self::PostgreSQL,
345            "hive" => Self::Hive,
346            "sqlite" => Self::SQLite,
347            "snowflake" => Self::Snowflake,
348            "redshift" => Self::Redshift,
349            "mssql" => Self::MsSQL,
350            "clickhouse" => Self::ClickHouse,
351            "bigquery" => Self::BigQuery,
352            "ansi" => Self::Ansi,
353            "duckdb" => Self::DuckDB,
354            "databricks" => Self::Databricks,
355            other => {
356                let error_message = format!(
357                    "Invalid Dialect: {other}. Expected one of: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB, Databricks"
358                );
359                return Err(DataFusionError::Configuration(error_message));
360            }
361        };
362        Ok(value)
363    }
364}
365
366impl ConfigField for Dialect {
367    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
368        v.some(key, self, description)
369    }
370
371    fn set(&mut self, _: &str, value: &str) -> Result<()> {
372        *self = Self::from_str(value)?;
373        Ok(())
374    }
375}
376
377impl Display for Dialect {
378    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
379        let str = self.as_ref();
380        write!(f, "{str}")
381    }
382}
383
384#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
385pub enum SpillCompression {
386    Zstd,
387    Lz4Frame,
388    #[default]
389    Uncompressed,
390}
391
392impl FromStr for SpillCompression {
393    type Err = DataFusionError;
394
395    fn from_str(s: &str) -> Result<Self, Self::Err> {
396        match s.to_ascii_lowercase().as_str() {
397            "zstd" => Ok(Self::Zstd),
398            "lz4_frame" => Ok(Self::Lz4Frame),
399            "uncompressed" | "" => Ok(Self::Uncompressed),
400            other => Err(DataFusionError::Configuration(format!(
401                "Invalid Spill file compression type: {other}. Expected one of: zstd, lz4_frame, uncompressed"
402            ))),
403        }
404    }
405}
406
407impl ConfigField for SpillCompression {
408    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
409        v.some(key, self, description)
410    }
411
412    fn set(&mut self, _: &str, value: &str) -> Result<()> {
413        *self = SpillCompression::from_str(value)?;
414        Ok(())
415    }
416}
417
418impl Display for SpillCompression {
419    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
420        let str = match self {
421            Self::Zstd => "zstd",
422            Self::Lz4Frame => "lz4_frame",
423            Self::Uncompressed => "uncompressed",
424        };
425        write!(f, "{str}")
426    }
427}
428
429impl From<SpillCompression> for Option<CompressionType> {
430    fn from(c: SpillCompression) -> Self {
431        match c {
432            SpillCompression::Zstd => Some(CompressionType::ZSTD),
433            SpillCompression::Lz4Frame => Some(CompressionType::LZ4_FRAME),
434            SpillCompression::Uncompressed => None,
435        }
436    }
437}
438
439config_namespace! {
440    /// Options related to query execution
441    ///
442    /// See also: [`SessionConfig`]
443    ///
444    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
445    pub struct ExecutionOptions {
446        /// Default batch size while creating new batches, it's especially useful for
447        /// buffer-in-memory batches since creating tiny batches would result in too much
448        /// metadata memory consumption
449        pub batch_size: usize, default = 8192
450
451        /// When set to true, record batches will be examined between each operator and
452        /// small batches will be coalesced into larger batches. This is helpful when there
453        /// are highly selective filters or joins that could produce tiny output batches. The
454        /// target batch size is determined by the configuration setting
455        pub coalesce_batches: bool, default = true
456
457        /// Should DataFusion collect statistics when first creating a table.
458        /// Has no effect after the table is created. Applies to the default
459        /// `ListingTableProvider` in DataFusion. Defaults to true.
460        pub collect_statistics: bool, default = true
461
462        /// Number of partitions for query execution. Increasing partitions can increase
463        /// concurrency.
464        ///
465        /// Defaults to the number of CPU cores on the system
466        pub target_partitions: usize, transform = ExecutionOptions::normalized_parallelism, default = get_available_parallelism()
467
468        /// The default time zone
469        ///
470        /// Some functions, e.g. `now` return timestamps in this time zone
471        pub time_zone: Option<String>, default = None
472
473        /// Parquet options
474        pub parquet: ParquetOptions, default = Default::default()
475
476        /// Fan-out during initial physical planning.
477        ///
478        /// This is mostly use to plan `UNION` children in parallel.
479        ///
480        /// Defaults to the number of CPU cores on the system
481        pub planning_concurrency: usize, transform = ExecutionOptions::normalized_parallelism, default = get_available_parallelism()
482
483        /// When set to true, skips verifying that the schema produced by
484        /// planning the input of `LogicalPlan::Aggregate` exactly matches the
485        /// schema of the input plan.
486        ///
487        /// When set to false, if the schema does not match exactly
488        /// (including nullability and metadata), a planning error will be raised.
489        ///
490        /// This is used to workaround bugs in the planner that are now caught by
491        /// the new schema verification step.
492        pub skip_physical_aggregate_schema_check: bool, default = false
493
494        /// Sets the compression codec used when spilling data to disk.
495        ///
496        /// Since datafusion writes spill files using the Arrow IPC Stream format,
497        /// only codecs supported by the Arrow IPC Stream Writer are allowed.
498        /// Valid values are: uncompressed, lz4_frame, zstd.
499        /// Note: lz4_frame offers faster (de)compression, but typically results in
500        /// larger spill files. In contrast, zstd achieves
501        /// higher compression ratios at the cost of slower (de)compression speed.
502        pub spill_compression: SpillCompression, default = SpillCompression::Uncompressed
503
504        /// Specifies the reserved memory for each spillable sort operation to
505        /// facilitate an in-memory merge.
506        ///
507        /// When a sort operation spills to disk, the in-memory data must be
508        /// sorted and merged before being written to a file. This setting reserves
509        /// a specific amount of memory for that in-memory sort/merge process.
510        ///
511        /// Note: This setting is irrelevant if the sort operation cannot spill
512        /// (i.e., if there's no `DiskManager` configured).
513        pub sort_spill_reservation_bytes: usize, default = 10 * 1024 * 1024
514
515        /// When sorting, below what size should data be concatenated
516        /// and sorted in a single RecordBatch rather than sorted in
517        /// batches and merged.
518        pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
519
520        /// Number of files to read in parallel when inferring schema and statistics
521        pub meta_fetch_concurrency: usize, default = 32
522
523        /// Guarantees a minimum level of output files running in parallel.
524        /// RecordBatches will be distributed in round robin fashion to each
525        /// parallel writer. Each writer is closed and a new file opened once
526        /// soft_max_rows_per_output_file is reached.
527        pub minimum_parallel_output_files: usize, default = 4
528
529        /// Target number of rows in output files when writing multiple.
530        /// This is a soft max, so it can be exceeded slightly. There also
531        /// will be one file smaller than the limit if the total
532        /// number of rows written is not roughly divisible by the soft max
533        pub soft_max_rows_per_output_file: usize, default = 50000000
534
535        /// This is the maximum number of RecordBatches buffered
536        /// for each output file being worked. Higher values can potentially
537        /// give faster write performance at the cost of higher peak
538        /// memory consumption
539        pub max_buffered_batches_per_output_file: usize, default = 2
540
541        /// Should sub directories be ignored when scanning directories for data
542        /// files. Defaults to true (ignores subdirectories), consistent with
543        /// Hive. Note that this setting does not affect reading partitioned
544        /// tables (e.g. `/table/year=2021/month=01/data.parquet`).
545        pub listing_table_ignore_subdirectory: bool, default = true
546
547        /// Should a `ListingTable` created through the `ListingTableFactory` infer table
548        /// partitions from Hive compliant directories. Defaults to true (partition columns are
549        /// inferred and will be represented in the table schema).
550        pub listing_table_factory_infer_partitions: bool, default = true
551
552        /// Should DataFusion support recursive CTEs
553        pub enable_recursive_ctes: bool, default = true
554
555        /// Attempt to eliminate sorts by packing & sorting files with non-overlapping
556        /// statistics into the same file groups.
557        /// Currently experimental
558        pub split_file_groups_by_statistics: bool, default = false
559
560        /// Should DataFusion keep the columns used for partition_by in the output RecordBatches
561        pub keep_partition_by_columns: bool, default = false
562
563        /// Aggregation ratio (number of distinct groups / number of input rows)
564        /// threshold for skipping partial aggregation. If the value is greater
565        /// then partial aggregation will skip aggregation for further input
566        pub skip_partial_aggregation_probe_ratio_threshold: f64, default = 0.8
567
568        /// Number of input rows partial aggregation partition should process, before
569        /// aggregation ratio check and trying to switch to skipping aggregation mode
570        pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000
571
572        /// Should DataFusion use row number estimates at the input to decide
573        /// whether increasing parallelism is beneficial or not. By default,
574        /// only exact row numbers (not estimates) are used for this decision.
575        /// Setting this flag to `true` will likely produce better plans.
576        /// if the source of statistics is accurate.
577        /// We plan to make this the default in the future.
578        pub use_row_number_estimates_to_optimize_partitioning: bool, default = false
579
580        /// Should DataFusion enforce batch size in joins or not. By default,
581        /// DataFusion will not enforce batch size in joins. Enforcing batch size
582        /// in joins can reduce memory usage when joining large
583        /// tables with a highly-selective join filter, but is also slightly slower.
584        pub enforce_batch_size_in_joins: bool, default = false
585
586        /// Size (bytes) of data buffer DataFusion uses when writing output files.
587        /// This affects the size of the data chunks that are uploaded to remote
588        /// object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being
589        /// written, it may be necessary to increase this size to avoid errors from
590        /// the remote end point.
591        pub objectstore_writer_buffer_size: usize, default = 10 * 1024 * 1024
592    }
593}
594
595config_namespace! {
596    /// Options for reading and writing parquet files
597    ///
598    /// See also: [`SessionConfig`]
599    ///
600    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
601    pub struct ParquetOptions {
602        // The following options affect reading parquet files
603
604        /// (reading) If true, reads the Parquet data page level metadata (the
605        /// Page Index), if present, to reduce the I/O and number of
606        /// rows decoded.
607        pub enable_page_index: bool, default = true
608
609        /// (reading) If true, the parquet reader attempts to skip entire row groups based
610        /// on the predicate in the query and the metadata (min/max values) stored in
611        /// the parquet file
612        pub pruning: bool, default = true
613
614        /// (reading) If true, the parquet reader skip the optional embedded metadata that may be in
615        /// the file Schema. This setting can help avoid schema conflicts when querying
616        /// multiple parquet files with schemas containing compatible types but different metadata
617        pub skip_metadata: bool, default = true
618
619        /// (reading) If specified, the parquet reader will try and fetch the last `size_hint`
620        /// bytes of the parquet file optimistically. If not specified, two reads are required:
621        /// One read to fetch the 8-byte parquet footer and
622        /// another to fetch the metadata length encoded in the footer
623        /// Default setting to 512 KiB, which should be sufficient for most parquet files,
624        /// it can reduce one I/O operation per parquet file. If the metadata is larger than
625        /// the hint, two reads will still be performed.
626        pub metadata_size_hint: Option<usize>, default = Some(512 * 1024)
627
628        /// (reading) If true, filter expressions are be applied during the parquet decoding operation to
629        /// reduce the number of rows decoded. This optimization is sometimes called "late materialization".
630        pub pushdown_filters: bool, default = false
631
632        /// (reading) If true, filter expressions evaluated during the parquet decoding operation
633        /// will be reordered heuristically to minimize the cost of evaluation. If false,
634        /// the filters are applied in the same order as written in the query
635        pub reorder_filters: bool, default = false
636
637        /// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
638        /// and `Binary/BinaryLarge` with `BinaryView`.
639        pub schema_force_view_types: bool, default = true
640
641        /// (reading) If true, parquet reader will read columns of
642        /// `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`.
643        ///
644        /// Parquet files generated by some legacy writers do not correctly set
645        /// the UTF8 flag for strings, causing string columns to be loaded as
646        /// BLOB instead.
647        pub binary_as_string: bool, default = false
648
649        /// (reading) If true, parquet reader will read columns of
650        /// physical type int96 as originating from a different resolution
651        /// than nanosecond. This is useful for reading data from systems like Spark
652        /// which stores microsecond resolution timestamps in an int96 allowing it
653        /// to write values with a larger date range than 64-bit timestamps with
654        /// nanosecond resolution.
655        pub coerce_int96: Option<String>, transform = str::to_lowercase, default = None
656
657        /// (reading) Use any available bloom filters when reading parquet files
658        pub bloom_filter_on_read: bool, default = true
659
660        /// (reading) The maximum predicate cache size, in bytes. When
661        /// `pushdown_filters` is enabled, sets the maximum memory used to cache
662        /// the results of predicate evaluation between filter evaluation and
663        /// output generation. Decreasing this value will reduce memory usage,
664        /// but may increase IO and CPU usage. None means use the default
665        /// parquet reader setting. 0 means no caching.
666        pub max_predicate_cache_size: Option<usize>, default = None
667
668        // The following options affect writing to parquet files
669        // and map to parquet::file::properties::WriterProperties
670
671        /// (writing) Sets best effort maximum size of data page in bytes
672        pub data_pagesize_limit: usize, default = 1024 * 1024
673
674        /// (writing) Sets write_batch_size in bytes
675        pub write_batch_size: usize, default = 1024
676
677        /// (writing) Sets parquet writer version
678        /// valid values are "1.0" and "2.0"
679        pub writer_version: String, default = "1.0".to_string()
680
681        /// (writing) Skip encoding the embedded arrow metadata in the KV_meta
682        ///
683        /// This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`.
684        /// Refer to <https://docs.rs/parquet/53.3.0/parquet/arrow/arrow_writer/struct.ArrowWriterOptions.html#method.with_skip_arrow_metadata>
685        pub skip_arrow_metadata: bool, default = false
686
687        /// (writing) Sets default parquet compression codec.
688        /// Valid values are: uncompressed, snappy, gzip(level),
689        /// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
690        /// These values are not case sensitive. If NULL, uses
691        /// default parquet writer setting
692        ///
693        /// Note that this default setting is not the same as
694        /// the default parquet writer setting.
695        pub compression: Option<String>, transform = str::to_lowercase, default = Some("zstd(3)".into())
696
697        /// (writing) Sets if dictionary encoding is enabled. If NULL, uses
698        /// default parquet writer setting
699        pub dictionary_enabled: Option<bool>, default = Some(true)
700
701        /// (writing) Sets best effort maximum dictionary page size, in bytes
702        pub dictionary_page_size_limit: usize, default = 1024 * 1024
703
704        /// (writing) Sets if statistics are enabled for any column
705        /// Valid values are: "none", "chunk", and "page"
706        /// These values are not case sensitive. If NULL, uses
707        /// default parquet writer setting
708        pub statistics_enabled: Option<String>, transform = str::to_lowercase, default = Some("page".into())
709
710        /// (writing) Target maximum number of rows in each row group (defaults to 1M
711        /// rows). Writing larger row groups requires more memory to write, but
712        /// can get better compression and be faster to read.
713        pub max_row_group_size: usize, default =  1024 * 1024
714
715        /// (writing) Sets "created by" property
716        pub created_by: String, default = concat!("datafusion version ", env!("CARGO_PKG_VERSION")).into()
717
718        /// (writing) Sets column index truncate length
719        pub column_index_truncate_length: Option<usize>, default = Some(64)
720
721        /// (writing) Sets statistics truncate length. If NULL, uses
722        /// default parquet writer setting
723        pub statistics_truncate_length: Option<usize>, default = Some(64)
724
725        /// (writing) Sets best effort maximum number of rows in data page
726        pub data_page_row_count_limit: usize, default = 20_000
727
728        /// (writing)  Sets default encoding for any column.
729        /// Valid values are: plain, plain_dictionary, rle,
730        /// bit_packed, delta_binary_packed, delta_length_byte_array,
731        /// delta_byte_array, rle_dictionary, and byte_stream_split.
732        /// These values are not case sensitive. If NULL, uses
733        /// default parquet writer setting
734        pub encoding: Option<String>, transform = str::to_lowercase, default = None
735
736        /// (writing) Write bloom filters for all columns when creating parquet files
737        pub bloom_filter_on_write: bool, default = false
738
739        /// (writing) Sets bloom filter false positive probability. If NULL, uses
740        /// default parquet writer setting
741        pub bloom_filter_fpp: Option<f64>, default = None
742
743        /// (writing) Sets bloom filter number of distinct values. If NULL, uses
744        /// default parquet writer setting
745        pub bloom_filter_ndv: Option<u64>, default = None
746
747        /// (writing) Controls whether DataFusion will attempt to speed up writing
748        /// parquet files by serializing them in parallel. Each column
749        /// in each row group in each output file are serialized in parallel
750        /// leveraging a maximum possible core count of n_files*n_row_groups*n_columns.
751        pub allow_single_file_parallelism: bool, default = true
752
753        /// (writing) By default parallel parquet writer is tuned for minimum
754        /// memory usage in a streaming execution plan. You may see
755        /// a performance benefit when writing large parquet files
756        /// by increasing maximum_parallel_row_group_writers and
757        /// maximum_buffered_record_batches_per_stream if your system
758        /// has idle cores and can tolerate additional memory usage.
759        /// Boosting these values is likely worthwhile when
760        /// writing out already in-memory data, such as from a cached
761        /// data frame.
762        pub maximum_parallel_row_group_writers: usize, default = 1
763
764        /// (writing) By default parallel parquet writer is tuned for minimum
765        /// memory usage in a streaming execution plan. You may see
766        /// a performance benefit when writing large parquet files
767        /// by increasing maximum_parallel_row_group_writers and
768        /// maximum_buffered_record_batches_per_stream if your system
769        /// has idle cores and can tolerate additional memory usage.
770        /// Boosting these values is likely worthwhile when
771        /// writing out already in-memory data, such as from a cached
772        /// data frame.
773        pub maximum_buffered_record_batches_per_stream: usize, default = 2
774    }
775}
776
777config_namespace! {
778    /// Options for configuring Parquet Modular Encryption
779    ///
780    /// To use Parquet encryption, you must enable the `parquet_encryption` feature flag, as it is not activated by default.
781    pub struct ParquetEncryptionOptions {
782        /// Optional file decryption properties
783        pub file_decryption: Option<ConfigFileDecryptionProperties>, default = None
784
785        /// Optional file encryption properties
786        pub file_encryption: Option<ConfigFileEncryptionProperties>, default = None
787
788        /// Identifier for the encryption factory to use to create file encryption and decryption properties.
789        /// Encryption factories can be registered in the runtime environment with
790        /// `RuntimeEnv::register_parquet_encryption_factory`.
791        pub factory_id: Option<String>, default = None
792
793        /// Any encryption factory specific options
794        pub factory_options: EncryptionFactoryOptions, default = EncryptionFactoryOptions::default()
795    }
796}
797
798impl ParquetEncryptionOptions {
799    /// Specify the encryption factory to use for Parquet modular encryption, along with its configuration
800    pub fn configure_factory(
801        &mut self,
802        factory_id: &str,
803        config: &impl ExtensionOptions,
804    ) {
805        self.factory_id = Some(factory_id.to_owned());
806        self.factory_options.options.clear();
807        for entry in config.entries() {
808            if let Some(value) = entry.value {
809                self.factory_options.options.insert(entry.key, value);
810            }
811        }
812    }
813}
814
815config_namespace! {
816    /// Options related to query optimization
817    ///
818    /// See also: [`SessionConfig`]
819    ///
820    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
821    pub struct OptimizerOptions {
822        /// When set to true, the optimizer will push a limit operation into
823        /// grouped aggregations which have no aggregate expressions, as a soft limit,
824        /// emitting groups once the limit is reached, before all rows in the group are read.
825        pub enable_distinct_aggregation_soft_limit: bool, default = true
826
827        /// When set to true, the physical plan optimizer will try to add round robin
828        /// repartitioning to increase parallelism to leverage more CPU cores
829        pub enable_round_robin_repartition: bool, default = true
830
831        /// When set to true, the optimizer will attempt to perform limit operations
832        /// during aggregations, if possible
833        pub enable_topk_aggregation: bool, default = true
834
835        /// When set to true, the optimizer will attempt to push limit operations
836        /// past window functions, if possible
837        pub enable_window_limits: bool, default = true
838
839        /// When set to true, the optimizer will attempt to push down TopK dynamic filters
840        /// into the file scan phase.
841        pub enable_topk_dynamic_filter_pushdown: bool, default = true
842
843        /// When set to true, the optimizer will attempt to push down Join dynamic filters
844        /// into the file scan phase.
845        pub enable_join_dynamic_filter_pushdown: bool, default = true
846
847        /// When set to true attempts to push down dynamic filters generated by operators (topk & join) into the file scan phase.
848        /// For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer
849        /// will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans.
850        /// This means that if we already have 10 timestamps in the year 2025
851        /// any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan.
852        /// The config will suppress `enable_join_dynamic_filter_pushdown` & `enable_topk_dynamic_filter_pushdown`
853        /// So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden.
854        pub enable_dynamic_filter_pushdown: bool, default = true
855
856        /// When set to true, the optimizer will insert filters before a join between
857        /// a nullable and non-nullable column to filter out nulls on the nullable side. This
858        /// filter can add additional overhead when the file format does not fully support
859        /// predicate push down.
860        pub filter_null_join_keys: bool, default = false
861
862        /// Should DataFusion repartition data using the aggregate keys to execute aggregates
863        /// in parallel using the provided `target_partitions` level
864        pub repartition_aggregations: bool, default = true
865
866        /// Minimum total files size in bytes to perform file scan repartitioning.
867        pub repartition_file_min_size: usize, default = 10 * 1024 * 1024
868
869        /// Should DataFusion repartition data using the join keys to execute joins in parallel
870        /// using the provided `target_partitions` level
871        pub repartition_joins: bool, default = true
872
873        /// Should DataFusion allow symmetric hash joins for unbounded data sources even when
874        /// its inputs do not have any ordering or filtering If the flag is not enabled,
875        /// the SymmetricHashJoin operator will be unable to prune its internal buffers,
876        /// resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right,
877        /// RightAnti, and RightSemi - being produced only at the end of the execution.
878        /// This is not typical in stream processing. Additionally, without proper design for
879        /// long runner execution, all types of joins may encounter out-of-memory errors.
880        pub allow_symmetric_joins_without_pruning: bool, default = true
881
882        /// When set to `true`, datasource partitions will be repartitioned to achieve maximum parallelism.
883        /// This applies to both in-memory partitions and FileSource's file groups (1 group is 1 partition).
884        ///
885        /// For FileSources, only Parquet and CSV formats are currently supported.
886        ///
887        /// If set to `true` for a FileSource, all files will be repartitioned evenly (i.e., a single large file
888        /// might be partitioned into smaller chunks) for parallel scanning.
889        /// If set to `false` for a FileSource, different files will be read in parallel, but repartitioning won't
890        /// happen within a single file.
891        ///
892        /// If set to `true` for an in-memory source, all memtable's partitions will have their batches
893        /// repartitioned evenly to the desired number of `target_partitions`. Repartitioning can change
894        /// the total number of partitions and batches per partition, but does not slice the initial
895        /// record tables provided to the MemTable on creation.
896        pub repartition_file_scans: bool, default = true
897
898        /// Should DataFusion repartition data using the partitions keys to execute window
899        /// functions in parallel using the provided `target_partitions` level
900        pub repartition_windows: bool, default = true
901
902        /// Should DataFusion execute sorts in a per-partition fashion and merge
903        /// afterwards instead of coalescing first and sorting globally.
904        /// With this flag is enabled, plans in the form below
905        ///
906        /// ```text
907        ///      "SortExec: [a@0 ASC]",
908        ///      "  CoalescePartitionsExec",
909        ///      "    RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
910        /// ```
911        /// would turn into the plan below which performs better in multithreaded environments
912        ///
913        /// ```text
914        ///      "SortPreservingMergeExec: [a@0 ASC]",
915        ///      "  SortExec: [a@0 ASC]",
916        ///      "    RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
917        /// ```
918        pub repartition_sorts: bool, default = true
919
920        /// When true, DataFusion will opportunistically remove sorts when the data is already sorted,
921        /// (i.e. setting `preserve_order` to true on `RepartitionExec`  and
922        /// using `SortPreservingMergeExec`)
923        ///
924        /// When false, DataFusion will maximize plan parallelism using
925        /// `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
926        pub prefer_existing_sort: bool, default = false
927
928        /// When set to true, the logical plan optimizer will produce warning
929        /// messages if any optimization rules produce errors and then proceed to the next
930        /// rule. When set to false, any rules that produce errors will cause the query to fail
931        pub skip_failed_rules: bool, default = false
932
933        /// Number of times that the optimizer will attempt to optimize the plan
934        pub max_passes: usize, default = 3
935
936        /// When set to true, the physical plan optimizer will run a top down
937        /// process to reorder the join keys
938        pub top_down_join_key_reordering: bool, default = true
939
940        /// When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin.
941        /// HashJoin can work more efficiently than SortMergeJoin but consumes more memory
942        pub prefer_hash_join: bool, default = true
943
944        /// When set to true, piecewise merge join is enabled. PiecewiseMergeJoin is currently
945        /// experimental. Physical planner will opt for PiecewiseMergeJoin when there is only
946        /// one range filter.
947        pub enable_piecewise_merge_join: bool, default = false
948
949        /// The maximum estimated size in bytes for one input side of a HashJoin
950        /// will be collected into a single partition
951        pub hash_join_single_partition_threshold: usize, default = 1024 * 1024
952
953        /// The maximum estimated size in rows for one input side of a HashJoin
954        /// will be collected into a single partition
955        pub hash_join_single_partition_threshold_rows: usize, default = 1024 * 128
956
957        /// The default filter selectivity used by Filter Statistics
958        /// when an exact selectivity cannot be determined. Valid values are
959        /// between 0 (no selectivity) and 100 (all rows are selected).
960        pub default_filter_selectivity: u8, default = 20
961
962        /// When set to true, the optimizer will not attempt to convert Union to Interleave
963        pub prefer_existing_union: bool, default = false
964
965        /// When set to true, if the returned type is a view type
966        /// then the output will be coerced to a non-view.
967        /// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
968        pub expand_views_at_output: bool, default = false
969    }
970}
971
972config_namespace! {
973    /// Options controlling explain output
974    ///
975    /// See also: [`SessionConfig`]
976    ///
977    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
978    pub struct ExplainOptions {
979        /// When set to true, the explain statement will only print logical plans
980        pub logical_plan_only: bool, default = false
981
982        /// When set to true, the explain statement will only print physical plans
983        pub physical_plan_only: bool, default = false
984
985        /// When set to true, the explain statement will print operator statistics
986        /// for physical plans
987        pub show_statistics: bool, default = false
988
989        /// When set to true, the explain statement will print the partition sizes
990        pub show_sizes: bool, default = true
991
992        /// When set to true, the explain statement will print schema information
993        pub show_schema: bool, default = false
994
995        /// Display format of explain. Default is "indent".
996        /// When set to "tree", it will print the plan in a tree-rendered format.
997        pub format: ExplainFormat, default = ExplainFormat::Indent
998
999        /// (format=tree only) Maximum total width of the rendered tree.
1000        /// When set to 0, the tree will have no width limit.
1001        pub tree_maximum_render_width: usize, default = 240
1002
1003        /// Verbosity level for "EXPLAIN ANALYZE". Default is "dev"
1004        /// "summary" shows common metrics for high-level insights.
1005        /// "dev" provides deep operator-level introspection for developers.
1006        pub analyze_level: ExplainAnalyzeLevel, default = ExplainAnalyzeLevel::Dev
1007    }
1008}
1009
1010impl ExecutionOptions {
1011    /// Returns the correct parallelism based on the provided `value`.
1012    /// If `value` is `"0"`, returns the default available parallelism, computed with
1013    /// `get_available_parallelism`. Otherwise, returns `value`.
1014    fn normalized_parallelism(value: &str) -> String {
1015        if value.parse::<usize>() == Ok(0) {
1016            get_available_parallelism().to_string()
1017        } else {
1018            value.to_owned()
1019        }
1020    }
1021}
1022
1023config_namespace! {
1024    /// Options controlling the format of output when printing record batches
1025    /// Copies [`arrow::util::display::FormatOptions`]
1026    pub struct FormatOptions {
1027        /// If set to `true` any formatting errors will be written to the output
1028        /// instead of being converted into a [`std::fmt::Error`]
1029        pub safe: bool, default = true
1030        /// Format string for nulls
1031        pub null: String, default = "".into()
1032        /// Date format for date arrays
1033        pub date_format: Option<String>, default = Some("%Y-%m-%d".to_string())
1034        /// Format for DateTime arrays
1035        pub datetime_format: Option<String>, default = Some("%Y-%m-%dT%H:%M:%S%.f".to_string())
1036        /// Timestamp format for timestamp arrays
1037        pub timestamp_format: Option<String>, default = Some("%Y-%m-%dT%H:%M:%S%.f".to_string())
1038        /// Timestamp format for timestamp with timezone arrays. When `None`, ISO 8601 format is used.
1039        pub timestamp_tz_format: Option<String>, default = None
1040        /// Time format for time arrays
1041        pub time_format: Option<String>, default = Some("%H:%M:%S%.f".to_string())
1042        /// Duration format. Can be either `"pretty"` or `"ISO8601"`
1043        pub duration_format: String, transform = str::to_lowercase, default = "pretty".into()
1044        /// Show types in visual representation batches
1045        pub types_info: bool, default = false
1046    }
1047}
1048
1049impl<'a> TryInto<arrow::util::display::FormatOptions<'a>> for &'a FormatOptions {
1050    type Error = DataFusionError;
1051    fn try_into(self) -> Result<arrow::util::display::FormatOptions<'a>> {
1052        let duration_format = match self.duration_format.as_str() {
1053            "pretty" => arrow::util::display::DurationFormat::Pretty,
1054            "iso8601" => arrow::util::display::DurationFormat::ISO8601,
1055            _ => {
1056                return _config_err!(
1057                    "Invalid duration format: {}. Valid values are pretty or iso8601",
1058                    self.duration_format
1059                )
1060            }
1061        };
1062
1063        Ok(arrow::util::display::FormatOptions::new()
1064            .with_display_error(self.safe)
1065            .with_null(&self.null)
1066            .with_date_format(self.date_format.as_deref())
1067            .with_datetime_format(self.datetime_format.as_deref())
1068            .with_timestamp_format(self.timestamp_format.as_deref())
1069            .with_timestamp_tz_format(self.timestamp_tz_format.as_deref())
1070            .with_time_format(self.time_format.as_deref())
1071            .with_duration_format(duration_format)
1072            .with_types_info(self.types_info))
1073    }
1074}
1075
1076/// A key value pair, with a corresponding description
1077#[derive(Debug, Hash, PartialEq, Eq)]
1078pub struct ConfigEntry {
1079    /// A unique string to identify this config value
1080    pub key: String,
1081
1082    /// The value if any
1083    pub value: Option<String>,
1084
1085    /// A description of this configuration entry
1086    pub description: &'static str,
1087}
1088
1089/// Configuration options struct, able to store both built-in configuration and custom options
1090#[derive(Debug, Clone, Default)]
1091#[non_exhaustive]
1092pub struct ConfigOptions {
1093    /// Catalog options
1094    pub catalog: CatalogOptions,
1095    /// Execution options
1096    pub execution: ExecutionOptions,
1097    /// Optimizer options
1098    pub optimizer: OptimizerOptions,
1099    /// SQL parser options
1100    pub sql_parser: SqlParserOptions,
1101    /// Explain options
1102    pub explain: ExplainOptions,
1103    /// Optional extensions registered using [`Extensions::insert`]
1104    pub extensions: Extensions,
1105    /// Formatting options when printing batches
1106    pub format: FormatOptions,
1107}
1108
1109impl ConfigField for ConfigOptions {
1110    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1111        // Extensions are handled in the public `ConfigOptions::set`
1112        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1113        match key {
1114            "catalog" => self.catalog.set(rem, value),
1115            "execution" => self.execution.set(rem, value),
1116            "optimizer" => self.optimizer.set(rem, value),
1117            "explain" => self.explain.set(rem, value),
1118            "sql_parser" => self.sql_parser.set(rem, value),
1119            "format" => self.format.set(rem, value),
1120            _ => _config_err!("Config value \"{key}\" not found on ConfigOptions"),
1121        }
1122    }
1123
1124    fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1125        self.catalog.visit(v, "datafusion.catalog", "");
1126        self.execution.visit(v, "datafusion.execution", "");
1127        self.optimizer.visit(v, "datafusion.optimizer", "");
1128        self.explain.visit(v, "datafusion.explain", "");
1129        self.sql_parser.visit(v, "datafusion.sql_parser", "");
1130        self.format.visit(v, "datafusion.format", "");
1131    }
1132}
1133
1134impl ConfigOptions {
1135    /// Creates a new [`ConfigOptions`] with default values
1136    pub fn new() -> Self {
1137        Self::default()
1138    }
1139
1140    /// Set extensions to provided value
1141    pub fn with_extensions(mut self, extensions: Extensions) -> Self {
1142        self.extensions = extensions;
1143        self
1144    }
1145
1146    /// Set a configuration option
1147    pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
1148        let Some((prefix, key)) = key.split_once('.') else {
1149            return _config_err!("could not find config namespace for key \"{key}\"");
1150        };
1151
1152        if prefix == "datafusion" {
1153            if key == "optimizer.enable_dynamic_filter_pushdown" {
1154                let bool_value = value.parse::<bool>().map_err(|e| {
1155                    DataFusionError::Configuration(format!(
1156                        "Failed to parse '{value}' as bool: {e}",
1157                    ))
1158                })?;
1159
1160                {
1161                    self.optimizer.enable_dynamic_filter_pushdown = bool_value;
1162                    self.optimizer.enable_topk_dynamic_filter_pushdown = bool_value;
1163                    self.optimizer.enable_join_dynamic_filter_pushdown = bool_value;
1164                }
1165                return Ok(());
1166            }
1167            return ConfigField::set(self, key, value);
1168        }
1169
1170        let Some(e) = self.extensions.0.get_mut(prefix) else {
1171            return _config_err!("Could not find config namespace \"{prefix}\"");
1172        };
1173        e.0.set(key, value)
1174    }
1175
1176    /// Create new [`ConfigOptions`], taking values from environment variables
1177    /// where possible.
1178    ///
1179    /// For example, to configure `datafusion.execution.batch_size`
1180    /// ([`ExecutionOptions::batch_size`]) you would set the
1181    /// `DATAFUSION_EXECUTION_BATCH_SIZE` environment variable.
1182    ///
1183    /// The name of the environment variable is the option's key, transformed to
1184    /// uppercase and with periods replaced with underscores.
1185    ///
1186    /// Values are parsed according to the [same rules used in casts from
1187    /// Utf8](https://docs.rs/arrow/latest/arrow/compute/kernels/cast/fn.cast.html).
1188    ///
1189    /// If the value in the environment variable cannot be cast to the type of
1190    /// the configuration option, the default value will be used instead and a
1191    /// warning emitted. Environment variables are read when this method is
1192    /// called, and are not re-read later.
1193    pub fn from_env() -> Result<Self> {
1194        struct Visitor(Vec<String>);
1195
1196        impl Visit for Visitor {
1197            fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
1198                self.0.push(key.to_string())
1199            }
1200
1201            fn none(&mut self, key: &str, _: &'static str) {
1202                self.0.push(key.to_string())
1203            }
1204        }
1205
1206        // Extract the names of all fields and then look up the corresponding
1207        // environment variables. This isn't hugely efficient but avoids
1208        // ambiguity between `a.b` and `a_b` which would both correspond
1209        // to an environment variable of `A_B`
1210
1211        let mut keys = Visitor(vec![]);
1212        let mut ret = Self::default();
1213        ret.visit(&mut keys, "datafusion", "");
1214
1215        for key in keys.0 {
1216            let env = key.to_uppercase().replace('.', "_");
1217            if let Some(var) = std::env::var_os(env) {
1218                let value = var.to_string_lossy();
1219                log::info!("Set {key} to {value} from the environment variable");
1220                ret.set(&key, value.as_ref())?;
1221            }
1222        }
1223
1224        Ok(ret)
1225    }
1226
1227    /// Create new ConfigOptions struct, taking values from a string hash map.
1228    ///
1229    /// Only the built-in configurations will be extracted from the hash map
1230    /// and other key value pairs will be ignored.
1231    pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
1232        struct Visitor(Vec<String>);
1233
1234        impl Visit for Visitor {
1235            fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
1236                self.0.push(key.to_string())
1237            }
1238
1239            fn none(&mut self, key: &str, _: &'static str) {
1240                self.0.push(key.to_string())
1241            }
1242        }
1243
1244        let mut keys = Visitor(vec![]);
1245        let mut ret = Self::default();
1246        ret.visit(&mut keys, "datafusion", "");
1247
1248        for key in keys.0 {
1249            if let Some(var) = settings.get(&key) {
1250                ret.set(&key, var)?;
1251            }
1252        }
1253
1254        Ok(ret)
1255    }
1256
1257    /// Returns the [`ConfigEntry`] stored within this [`ConfigOptions`]
1258    pub fn entries(&self) -> Vec<ConfigEntry> {
1259        struct Visitor(Vec<ConfigEntry>);
1260
1261        impl Visit for Visitor {
1262            fn some<V: Display>(
1263                &mut self,
1264                key: &str,
1265                value: V,
1266                description: &'static str,
1267            ) {
1268                self.0.push(ConfigEntry {
1269                    key: key.to_string(),
1270                    value: Some(value.to_string()),
1271                    description,
1272                })
1273            }
1274
1275            fn none(&mut self, key: &str, description: &'static str) {
1276                self.0.push(ConfigEntry {
1277                    key: key.to_string(),
1278                    value: None,
1279                    description,
1280                })
1281            }
1282        }
1283
1284        let mut v = Visitor(vec![]);
1285        self.visit(&mut v, "datafusion", "");
1286
1287        v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
1288        v.0
1289    }
1290
1291    /// Generate documentation that can be included in the user guide
1292    pub fn generate_config_markdown() -> String {
1293        use std::fmt::Write as _;
1294
1295        let mut s = Self::default();
1296
1297        // Normalize for display
1298        s.execution.target_partitions = 0;
1299        s.execution.planning_concurrency = 0;
1300
1301        let mut docs = "| key | default | description |\n".to_string();
1302        docs += "|-----|---------|-------------|\n";
1303        let mut entries = s.entries();
1304        entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
1305
1306        for entry in s.entries() {
1307            let _ = writeln!(
1308                &mut docs,
1309                "| {} | {} | {} |",
1310                entry.key,
1311                entry.value.as_deref().unwrap_or("NULL"),
1312                entry.description
1313            );
1314        }
1315        docs
1316    }
1317}
1318
1319/// [`ConfigExtension`] provides a mechanism to store third-party configuration
1320/// within DataFusion [`ConfigOptions`]
1321///
1322/// This mechanism can be used to pass configuration to user defined functions
1323/// or optimizer passes
1324///
1325/// # Example
1326/// ```
1327/// use datafusion_common::{
1328///     config::ConfigExtension, config::ConfigOptions, extensions_options,
1329/// };
1330/// // Define a new configuration struct using the `extensions_options` macro
1331/// extensions_options! {
1332///    /// My own config options.
1333///    pub struct MyConfig {
1334///        /// Should "foo" be replaced by "bar"?
1335///        pub foo_to_bar: bool, default = true
1336///
1337///        /// How many "baz" should be created?
1338///        pub baz_count: usize, default = 1337
1339///    }
1340/// }
1341///
1342/// impl ConfigExtension for MyConfig {
1343///     const PREFIX: &'static str = "my_config";
1344/// }
1345///
1346/// // set up config struct and register extension
1347/// let mut config = ConfigOptions::default();
1348/// config.extensions.insert(MyConfig::default());
1349///
1350/// // overwrite config default
1351/// config.set("my_config.baz_count", "42").unwrap();
1352///
1353/// // check config state
1354/// let my_config = config.extensions.get::<MyConfig>().unwrap();
1355/// assert!(my_config.foo_to_bar,);
1356/// assert_eq!(my_config.baz_count, 42,);
1357/// ```
1358///
1359/// # Note:
1360/// Unfortunately associated constants are not currently object-safe, and so this
1361/// extends the object-safe [`ExtensionOptions`]
1362pub trait ConfigExtension: ExtensionOptions {
1363    /// Configuration namespace prefix to use
1364    ///
1365    /// All values under this will be prefixed with `$PREFIX + "."`
1366    const PREFIX: &'static str;
1367}
1368
1369/// An object-safe API for storing arbitrary configuration.
1370///
1371/// See [`ConfigExtension`] for user defined configuration
1372pub trait ExtensionOptions: Send + Sync + fmt::Debug + 'static {
1373    /// Return `self` as [`Any`]
1374    ///
1375    /// This is needed until trait upcasting is stabilized
1376    fn as_any(&self) -> &dyn Any;
1377
1378    /// Return `self` as [`Any`]
1379    ///
1380    /// This is needed until trait upcasting is stabilized
1381    fn as_any_mut(&mut self) -> &mut dyn Any;
1382
1383    /// Return a deep clone of this [`ExtensionOptions`]
1384    ///
1385    /// It is important this does not share mutable state to avoid consistency issues
1386    /// with configuration changing whilst queries are executing
1387    fn cloned(&self) -> Box<dyn ExtensionOptions>;
1388
1389    /// Set the given `key`, `value` pair
1390    fn set(&mut self, key: &str, value: &str) -> Result<()>;
1391
1392    /// Returns the [`ConfigEntry`] stored in this [`ExtensionOptions`]
1393    fn entries(&self) -> Vec<ConfigEntry>;
1394}
1395
1396/// A type-safe container for [`ConfigExtension`]
1397#[derive(Debug, Default, Clone)]
1398pub struct Extensions(BTreeMap<&'static str, ExtensionBox>);
1399
1400impl Extensions {
1401    /// Create a new, empty [`Extensions`]
1402    pub fn new() -> Self {
1403        Self(BTreeMap::new())
1404    }
1405
1406    /// Registers a [`ConfigExtension`] with this [`ConfigOptions`]
1407    pub fn insert<T: ConfigExtension>(&mut self, extension: T) {
1408        assert_ne!(T::PREFIX, "datafusion");
1409        let e = ExtensionBox(Box::new(extension));
1410        self.0.insert(T::PREFIX, e);
1411    }
1412
1413    /// Retrieves the extension of the given type if any
1414    pub fn get<T: ConfigExtension>(&self) -> Option<&T> {
1415        self.0.get(T::PREFIX)?.0.as_any().downcast_ref()
1416    }
1417
1418    /// Retrieves the extension of the given type if any
1419    pub fn get_mut<T: ConfigExtension>(&mut self) -> Option<&mut T> {
1420        let e = self.0.get_mut(T::PREFIX)?;
1421        e.0.as_any_mut().downcast_mut()
1422    }
1423}
1424
1425#[derive(Debug)]
1426struct ExtensionBox(Box<dyn ExtensionOptions>);
1427
1428impl Clone for ExtensionBox {
1429    fn clone(&self) -> Self {
1430        Self(self.0.cloned())
1431    }
1432}
1433
1434/// A trait implemented by `config_namespace` and for field types that provides
1435/// the ability to walk and mutate the configuration tree
1436pub trait ConfigField {
1437    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str);
1438
1439    fn set(&mut self, key: &str, value: &str) -> Result<()>;
1440}
1441
1442impl<F: ConfigField + Default> ConfigField for Option<F> {
1443    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1444        match self {
1445            Some(s) => s.visit(v, key, description),
1446            None => v.none(key, description),
1447        }
1448    }
1449
1450    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1451        self.get_or_insert_with(Default::default).set(key, value)
1452    }
1453}
1454
1455/// Default transformation to parse a [`ConfigField`] for a string.
1456///
1457/// This uses [`FromStr`] to parse the data.
1458pub fn default_config_transform<T>(input: &str) -> Result<T>
1459where
1460    T: FromStr,
1461    <T as FromStr>::Err: Sync + Send + Error + 'static,
1462{
1463    input.parse().map_err(|e| {
1464        DataFusionError::Context(
1465            format!(
1466                "Error parsing '{}' as {}",
1467                input,
1468                std::any::type_name::<T>()
1469            ),
1470            Box::new(DataFusionError::External(Box::new(e))),
1471        )
1472    })
1473}
1474
1475/// Macro that generates [`ConfigField`] for a given type.
1476///
1477/// # Usage
1478/// This always requires [`Display`] to be implemented for the given type.
1479///
1480/// There are two ways to invoke this macro. The first one uses
1481/// [`default_config_transform`]/[`FromStr`] to parse the data:
1482///
1483/// ```ignore
1484/// config_field(MyType);
1485/// ```
1486///
1487/// Note that the parsing error MUST implement [`std::error::Error`]!
1488///
1489/// Or you can specify how you want to parse an [`str`] into the type:
1490///
1491/// ```ignore
1492/// fn parse_it(s: &str) -> Result<MyType> {
1493///     ...
1494/// }
1495///
1496/// config_field(
1497///     MyType,
1498///     value => parse_it(value)
1499/// );
1500/// ```
1501#[macro_export]
1502macro_rules! config_field {
1503    ($t:ty) => {
1504        config_field!($t, value => $crate::config::default_config_transform(value)?);
1505    };
1506
1507    ($t:ty, $arg:ident => $transform:expr) => {
1508        impl $crate::config::ConfigField for $t {
1509            fn visit<V: $crate::config::Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1510                v.some(key, self, description)
1511            }
1512
1513            fn set(&mut self, _: &str, $arg: &str) -> $crate::error::Result<()> {
1514                *self = $transform;
1515                Ok(())
1516            }
1517        }
1518    };
1519}
1520
1521config_field!(String);
1522config_field!(bool, value => default_config_transform(value.to_lowercase().as_str())?);
1523config_field!(usize);
1524config_field!(f64);
1525config_field!(u64);
1526
1527impl ConfigField for u8 {
1528    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1529        v.some(key, self, description)
1530    }
1531
1532    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1533        if value.is_empty() {
1534            return Err(DataFusionError::Configuration(format!(
1535                "Input string for {key} key is empty"
1536            )));
1537        }
1538        // Check if the string is a valid number
1539        if let Ok(num) = value.parse::<u8>() {
1540            // TODO: Let's decide how we treat the numerical strings.
1541            *self = num;
1542        } else {
1543            let bytes = value.as_bytes();
1544            // Check if the first character is ASCII (single byte)
1545            if bytes.len() > 1 || !value.chars().next().unwrap().is_ascii() {
1546                return Err(DataFusionError::Configuration(format!(
1547                    "Error parsing {value} as u8. Non-ASCII string provided"
1548                )));
1549            }
1550            *self = bytes[0];
1551        }
1552        Ok(())
1553    }
1554}
1555
1556impl ConfigField for CompressionTypeVariant {
1557    fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1558        v.some(key, self, description)
1559    }
1560
1561    fn set(&mut self, _: &str, value: &str) -> Result<()> {
1562        *self = CompressionTypeVariant::from_str(value)?;
1563        Ok(())
1564    }
1565}
1566
1567/// An implementation trait used to recursively walk configuration
1568pub trait Visit {
1569    fn some<V: Display>(&mut self, key: &str, value: V, description: &'static str);
1570
1571    fn none(&mut self, key: &str, description: &'static str);
1572}
1573
1574/// Convenience macro to create [`ExtensionsOptions`].
1575///
1576/// The created structure implements the following traits:
1577///
1578/// - [`Clone`]
1579/// - [`Debug`]
1580/// - [`Default`]
1581/// - [`ExtensionOptions`]
1582///
1583/// # Usage
1584/// The syntax is:
1585///
1586/// ```text
1587/// extensions_options! {
1588///      /// Struct docs (optional).
1589///     [<vis>] struct <StructName> {
1590///         /// Field docs (optional)
1591///         [<vis>] <field_name>: <field_type>, default = <default_value>
1592///
1593///         ... more fields
1594///     }
1595/// }
1596/// ```
1597///
1598/// The placeholders are:
1599/// - `[<vis>]`: Optional visibility modifier like `pub` or `pub(crate)`.
1600/// - `<StructName>`: Struct name like `MyStruct`.
1601/// - `<field_name>`: Field name like `my_field`.
1602/// - `<field_type>`: Field type like `u8`.
1603/// - `<default_value>`: Default value matching the field type like `42`.
1604///
1605/// # Example
1606/// See also a full example on the [`ConfigExtension`] documentation
1607///
1608/// ```
1609/// use datafusion_common::extensions_options;
1610///
1611/// extensions_options! {
1612///     /// My own config options.
1613///     pub struct MyConfig {
1614///         /// Should "foo" be replaced by "bar"?
1615///         pub foo_to_bar: bool, default = true
1616///
1617///         /// How many "baz" should be created?
1618///         pub baz_count: usize, default = 1337
1619///     }
1620/// }
1621/// ```
1622///
1623///
1624/// [`Debug`]: std::fmt::Debug
1625/// [`ExtensionsOptions`]: crate::config::ExtensionOptions
1626#[macro_export]
1627macro_rules! extensions_options {
1628    (
1629     $(#[doc = $struct_d:tt])*
1630     $vis:vis struct $struct_name:ident {
1631        $(
1632        $(#[doc = $d:tt])*
1633        $field_vis:vis $field_name:ident : $field_type:ty, default = $default:expr
1634        )*$(,)*
1635    }
1636    ) => {
1637        $(#[doc = $struct_d])*
1638        #[derive(Debug, Clone)]
1639        #[non_exhaustive]
1640        $vis struct $struct_name{
1641            $(
1642            $(#[doc = $d])*
1643            $field_vis $field_name : $field_type,
1644            )*
1645        }
1646
1647        impl Default for $struct_name {
1648            fn default() -> Self {
1649                Self {
1650                    $($field_name: $default),*
1651                }
1652            }
1653        }
1654
1655        impl $crate::config::ExtensionOptions for $struct_name {
1656            fn as_any(&self) -> &dyn ::std::any::Any {
1657                self
1658            }
1659
1660            fn as_any_mut(&mut self) -> &mut dyn ::std::any::Any {
1661                self
1662            }
1663
1664            fn cloned(&self) -> Box<dyn $crate::config::ExtensionOptions> {
1665                Box::new(self.clone())
1666            }
1667
1668            fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
1669                $crate::config::ConfigField::set(self, key, value)
1670            }
1671
1672            fn entries(&self) -> Vec<$crate::config::ConfigEntry> {
1673                struct Visitor(Vec<$crate::config::ConfigEntry>);
1674
1675                impl $crate::config::Visit for Visitor {
1676                    fn some<V: std::fmt::Display>(
1677                        &mut self,
1678                        key: &str,
1679                        value: V,
1680                        description: &'static str,
1681                    ) {
1682                        self.0.push($crate::config::ConfigEntry {
1683                            key: key.to_string(),
1684                            value: Some(value.to_string()),
1685                            description,
1686                        })
1687                    }
1688
1689                    fn none(&mut self, key: &str, description: &'static str) {
1690                        self.0.push($crate::config::ConfigEntry {
1691                            key: key.to_string(),
1692                            value: None,
1693                            description,
1694                        })
1695                    }
1696                }
1697
1698                let mut v = Visitor(vec![]);
1699                // The prefix is not used for extensions.
1700                // The description is generated in ConfigField::visit.
1701                // We can just pass empty strings here.
1702                $crate::config::ConfigField::visit(self, &mut v, "", "");
1703                v.0
1704            }
1705        }
1706
1707        impl $crate::config::ConfigField for $struct_name {
1708            fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
1709                let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1710                match key {
1711                    $(
1712                        stringify!($field_name) => {
1713                            // Safely apply deprecated attribute if present
1714                            // $(#[allow(deprecated)])?
1715                            {
1716                                #[allow(deprecated)]
1717                                self.$field_name.set(rem, value.as_ref())
1718                            }
1719                        },
1720                    )*
1721                    _ => return $crate::error::_config_err!(
1722                        "Config value \"{}\" not found on {}", key, stringify!($struct_name)
1723                    )
1724                }
1725            }
1726
1727            fn visit<V: $crate::config::Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1728                $(
1729                    let key = stringify!($field_name).to_string();
1730                    let desc = concat!($($d),*).trim();
1731                    #[allow(deprecated)]
1732                    self.$field_name.visit(v, key.as_str(), desc);
1733                )*
1734            }
1735        }
1736    }
1737}
1738
1739/// These file types have special built in behavior for configuration.
1740/// Use TableOptions::Extensions for configuring other file types.
1741#[derive(Debug, Clone)]
1742pub enum ConfigFileType {
1743    CSV,
1744    #[cfg(feature = "parquet")]
1745    PARQUET,
1746    JSON,
1747}
1748
1749/// Represents the configuration options available for handling different table formats within a data processing application.
1750/// This struct encompasses options for various file formats including CSV, Parquet, and JSON, allowing for flexible configuration
1751/// of parsing and writing behaviors specific to each format. Additionally, it supports extending functionality through custom extensions.
1752#[derive(Debug, Clone, Default)]
1753pub struct TableOptions {
1754    /// Configuration options for CSV file handling. This includes settings like the delimiter,
1755    /// quote character, and whether the first row is considered as headers.
1756    pub csv: CsvOptions,
1757
1758    /// Configuration options for Parquet file handling. This includes settings for compression,
1759    /// encoding, and other Parquet-specific file characteristics.
1760    pub parquet: TableParquetOptions,
1761
1762    /// Configuration options for JSON file handling.
1763    pub json: JsonOptions,
1764
1765    /// The current file format that the table operations should assume. This option allows
1766    /// for dynamic switching between the supported file types (e.g., CSV, Parquet, JSON).
1767    pub current_format: Option<ConfigFileType>,
1768
1769    /// Optional extensions that can be used to extend or customize the behavior of the table
1770    /// options. Extensions can be registered using `Extensions::insert` and might include
1771    /// custom file handling logic, additional configuration parameters, or other enhancements.
1772    pub extensions: Extensions,
1773}
1774
1775impl ConfigField for TableOptions {
1776    /// Visits configuration settings for the current file format, or all formats if none is selected.
1777    ///
1778    /// This method adapts the behavior based on whether a file format is currently selected in `current_format`.
1779    /// If a format is selected, it visits only the settings relevant to that format. Otherwise,
1780    /// it visits all available format settings.
1781    fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1782        if let Some(file_type) = &self.current_format {
1783            match file_type {
1784                #[cfg(feature = "parquet")]
1785                ConfigFileType::PARQUET => self.parquet.visit(v, "format", ""),
1786                ConfigFileType::CSV => self.csv.visit(v, "format", ""),
1787                ConfigFileType::JSON => self.json.visit(v, "format", ""),
1788            }
1789        } else {
1790            self.csv.visit(v, "csv", "");
1791            self.parquet.visit(v, "parquet", "");
1792            self.json.visit(v, "json", "");
1793        }
1794    }
1795
1796    /// Sets a configuration value for a specific key within `TableOptions`.
1797    ///
1798    /// This method delegates setting configuration values to the specific file format configurations,
1799    /// based on the current format selected. If no format is selected, it returns an error.
1800    ///
1801    /// # Parameters
1802    ///
1803    /// * `key`: The configuration key specifying which setting to adjust, prefixed with the format (e.g., "format.delimiter")
1804    ///   for CSV format.
1805    /// * `value`: The value to set for the specified configuration key.
1806    ///
1807    /// # Returns
1808    ///
1809    /// A result indicating success or an error if the key is not recognized, if a format is not specified,
1810    /// or if setting the configuration value fails for the specific format.
1811    fn set(&mut self, key: &str, value: &str) -> Result<()> {
1812        // Extensions are handled in the public `ConfigOptions::set`
1813        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1814        match key {
1815            "format" => {
1816                let Some(format) = &self.current_format else {
1817                    return _config_err!("Specify a format for TableOptions");
1818                };
1819                match format {
1820                    #[cfg(feature = "parquet")]
1821                    ConfigFileType::PARQUET => self.parquet.set(rem, value),
1822                    ConfigFileType::CSV => self.csv.set(rem, value),
1823                    ConfigFileType::JSON => self.json.set(rem, value),
1824                }
1825            }
1826            _ => _config_err!("Config value \"{key}\" not found on TableOptions"),
1827        }
1828    }
1829}
1830
1831impl TableOptions {
1832    /// Constructs a new instance of `TableOptions` with default settings.
1833    ///
1834    /// # Returns
1835    ///
1836    /// A new `TableOptions` instance with default configuration values.
1837    pub fn new() -> Self {
1838        Self::default()
1839    }
1840
1841    /// Creates a new `TableOptions` instance initialized with settings from a given session config.
1842    ///
1843    /// # Parameters
1844    ///
1845    /// * `config`: A reference to the session `ConfigOptions` from which to derive initial settings.
1846    ///
1847    /// # Returns
1848    ///
1849    /// A new `TableOptions` instance with settings applied from the session config.
1850    pub fn default_from_session_config(config: &ConfigOptions) -> Self {
1851        let initial = TableOptions::default();
1852        initial.combine_with_session_config(config)
1853    }
1854
1855    /// Updates the current `TableOptions` with settings from a given session config.
1856    ///
1857    /// # Parameters
1858    ///
1859    /// * `config`: A reference to the session `ConfigOptions` whose settings are to be applied.
1860    ///
1861    /// # Returns
1862    ///
1863    /// A new `TableOptions` instance with updated settings from the session config.
1864    #[must_use = "this method returns a new instance"]
1865    pub fn combine_with_session_config(&self, config: &ConfigOptions) -> Self {
1866        let mut clone = self.clone();
1867        clone.parquet.global = config.execution.parquet.clone();
1868        clone
1869    }
1870
1871    /// Sets the file format for the table.
1872    ///
1873    /// # Parameters
1874    ///
1875    /// * `format`: The file format to use (e.g., CSV, Parquet).
1876    pub fn set_config_format(&mut self, format: ConfigFileType) {
1877        self.current_format = Some(format);
1878    }
1879
1880    /// Sets the extensions for this `TableOptions` instance.
1881    ///
1882    /// # Parameters
1883    ///
1884    /// * `extensions`: The `Extensions` instance to set.
1885    ///
1886    /// # Returns
1887    ///
1888    /// A new `TableOptions` instance with the specified extensions applied.
1889    pub fn with_extensions(mut self, extensions: Extensions) -> Self {
1890        self.extensions = extensions;
1891        self
1892    }
1893
1894    /// Sets a specific configuration option.
1895    ///
1896    /// # Parameters
1897    ///
1898    /// * `key`: The configuration key (e.g., "format.delimiter").
1899    /// * `value`: The value to set for the specified key.
1900    ///
1901    /// # Returns
1902    ///
1903    /// A result indicating success or failure in setting the configuration option.
1904    pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
1905        let Some((prefix, _)) = key.split_once('.') else {
1906            return _config_err!("could not find config namespace for key \"{key}\"");
1907        };
1908
1909        if prefix == "format" {
1910            return ConfigField::set(self, key, value);
1911        }
1912
1913        if prefix == "execution" {
1914            return Ok(());
1915        }
1916
1917        let Some(e) = self.extensions.0.get_mut(prefix) else {
1918            return _config_err!("Could not find config namespace \"{prefix}\"");
1919        };
1920        e.0.set(key, value)
1921    }
1922
1923    /// Initializes a new `TableOptions` from a hash map of string settings.
1924    ///
1925    /// # Parameters
1926    ///
1927    /// * `settings`: A hash map where each key-value pair represents a configuration setting.
1928    ///
1929    /// # Returns
1930    ///
1931    /// A result containing the new `TableOptions` instance or an error if any setting could not be applied.
1932    pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
1933        let mut ret = Self::default();
1934        for (k, v) in settings {
1935            ret.set(k, v)?;
1936        }
1937
1938        Ok(ret)
1939    }
1940
1941    /// Modifies the current `TableOptions` instance with settings from a hash map.
1942    ///
1943    /// # Parameters
1944    ///
1945    /// * `settings`: A hash map where each key-value pair represents a configuration setting.
1946    ///
1947    /// # Returns
1948    ///
1949    /// A result indicating success or failure in applying the settings.
1950    pub fn alter_with_string_hash_map(
1951        &mut self,
1952        settings: &HashMap<String, String>,
1953    ) -> Result<()> {
1954        for (k, v) in settings {
1955            self.set(k, v)?;
1956        }
1957        Ok(())
1958    }
1959
1960    /// Retrieves all configuration entries from this `TableOptions`.
1961    ///
1962    /// # Returns
1963    ///
1964    /// A vector of `ConfigEntry` instances, representing all the configuration options within this `TableOptions`.
1965    pub fn entries(&self) -> Vec<ConfigEntry> {
1966        struct Visitor(Vec<ConfigEntry>);
1967
1968        impl Visit for Visitor {
1969            fn some<V: Display>(
1970                &mut self,
1971                key: &str,
1972                value: V,
1973                description: &'static str,
1974            ) {
1975                self.0.push(ConfigEntry {
1976                    key: key.to_string(),
1977                    value: Some(value.to_string()),
1978                    description,
1979                })
1980            }
1981
1982            fn none(&mut self, key: &str, description: &'static str) {
1983                self.0.push(ConfigEntry {
1984                    key: key.to_string(),
1985                    value: None,
1986                    description,
1987                })
1988            }
1989        }
1990
1991        let mut v = Visitor(vec![]);
1992        self.visit(&mut v, "format", "");
1993
1994        v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
1995        v.0
1996    }
1997}
1998
1999/// Options that control how Parquet files are read, including global options
2000/// that apply to all columns and optional column-specific overrides
2001///
2002/// Closely tied to [`ParquetWriterOptions`](crate::file_options::parquet_writer::ParquetWriterOptions).
2003/// Properties not included in [`TableParquetOptions`] may not be configurable at the external API
2004/// (e.g. sorting_columns).
2005#[derive(Clone, Default, Debug, PartialEq)]
2006pub struct TableParquetOptions {
2007    /// Global Parquet options that propagates to all columns.
2008    pub global: ParquetOptions,
2009    /// Column specific options. Default usage is parquet.XX::column.
2010    pub column_specific_options: HashMap<String, ParquetColumnOptions>,
2011    /// Additional file-level metadata to include. Inserted into the key_value_metadata
2012    /// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html).
2013    ///
2014    /// Multiple entries are permitted
2015    /// ```sql
2016    /// OPTIONS (
2017    ///    'format.metadata::key1' '',
2018    ///    'format.metadata::key2' 'value',
2019    ///    'format.metadata::key3' 'value has spaces',
2020    ///    'format.metadata::key4' 'value has special chars :: :',
2021    ///    'format.metadata::key_dupe' 'original will be overwritten',
2022    ///    'format.metadata::key_dupe' 'final'
2023    /// )
2024    /// ```
2025    pub key_value_metadata: HashMap<String, Option<String>>,
2026    /// Options for configuring Parquet modular encryption
2027    ///
2028    /// To use Parquet encryption, you must enable the `parquet_encryption` feature flag, as it is not activated by default.
2029    /// See ConfigFileEncryptionProperties and ConfigFileDecryptionProperties in datafusion/common/src/config.rs
2030    /// These can be set via 'format.crypto', for example:
2031    /// ```sql
2032    /// OPTIONS (
2033    ///    'format.crypto.file_encryption.encrypt_footer' 'true',
2034    ///    'format.crypto.file_encryption.footer_key_as_hex' '30313233343536373839303132333435',  -- b"0123456789012345" */
2035    ///    'format.crypto.file_encryption.column_key_as_hex::double_field' '31323334353637383930313233343530', -- b"1234567890123450"
2036    ///    'format.crypto.file_encryption.column_key_as_hex::float_field' '31323334353637383930313233343531', -- b"1234567890123451"
2037    ///     -- Same for decryption
2038    ///    'format.crypto.file_decryption.footer_key_as_hex' '30313233343536373839303132333435', -- b"0123456789012345"
2039    ///    'format.crypto.file_decryption.column_key_as_hex::double_field' '31323334353637383930313233343530', -- b"1234567890123450"
2040    ///    'format.crypto.file_decryption.column_key_as_hex::float_field' '31323334353637383930313233343531', -- b"1234567890123451"
2041    /// )
2042    /// ```
2043    /// See datafusion-cli/tests/sql/encrypted_parquet.sql for a more complete example.
2044    /// Note that keys must be provided as in hex format since these are binary strings.
2045    pub crypto: ParquetEncryptionOptions,
2046}
2047
2048impl TableParquetOptions {
2049    /// Return new default TableParquetOptions
2050    pub fn new() -> Self {
2051        Self::default()
2052    }
2053
2054    /// Set whether the encoding of the arrow metadata should occur
2055    /// during the writing of parquet.
2056    ///
2057    /// Default is to encode the arrow schema in the file kv_metadata.
2058    pub fn with_skip_arrow_metadata(self, skip: bool) -> Self {
2059        Self {
2060            global: ParquetOptions {
2061                skip_arrow_metadata: skip,
2062                ..self.global
2063            },
2064            ..self
2065        }
2066    }
2067
2068    /// Retrieves all configuration entries from this `TableParquetOptions`.
2069    ///
2070    /// # Returns
2071    ///
2072    /// A vector of `ConfigEntry` instances, representing all the configuration options within this
2073    pub fn entries(self: &TableParquetOptions) -> Vec<ConfigEntry> {
2074        struct Visitor(Vec<ConfigEntry>);
2075
2076        impl Visit for Visitor {
2077            fn some<V: Display>(
2078                &mut self,
2079                key: &str,
2080                value: V,
2081                description: &'static str,
2082            ) {
2083                self.0.push(ConfigEntry {
2084                    key: key[1..].to_string(),
2085                    value: Some(value.to_string()),
2086                    description,
2087                })
2088            }
2089
2090            fn none(&mut self, key: &str, description: &'static str) {
2091                self.0.push(ConfigEntry {
2092                    key: key[1..].to_string(),
2093                    value: None,
2094                    description,
2095                })
2096            }
2097        }
2098
2099        let mut v = Visitor(vec![]);
2100        self.visit(&mut v, "", "");
2101
2102        v.0
2103    }
2104}
2105
2106impl ConfigField for TableParquetOptions {
2107    fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, description: &'static str) {
2108        self.global.visit(v, key_prefix, description);
2109        self.column_specific_options
2110            .visit(v, key_prefix, description);
2111        self.crypto
2112            .visit(v, &format!("{key_prefix}.crypto"), description);
2113    }
2114
2115    fn set(&mut self, key: &str, value: &str) -> Result<()> {
2116        // Determine if the key is a global, metadata, or column-specific setting
2117        if key.starts_with("metadata::") {
2118            let k = match key.split("::").collect::<Vec<_>>()[..] {
2119                [_meta] | [_meta, ""] => {
2120                    return _config_err!(
2121                        "Invalid metadata key provided, missing key in metadata::<key>"
2122                    )
2123                }
2124                [_meta, k] => k.into(),
2125                _ => {
2126                    return _config_err!(
2127                        "Invalid metadata key provided, found too many '::' in \"{key}\""
2128                    )
2129                }
2130            };
2131            self.key_value_metadata.insert(k, Some(value.into()));
2132            Ok(())
2133        } else if let Some(crypto_feature) = key.strip_prefix("crypto.") {
2134            self.crypto.set(crypto_feature, value)
2135        } else if key.contains("::") {
2136            self.column_specific_options.set(key, value)
2137        } else {
2138            self.global.set(key, value)
2139        }
2140    }
2141}
2142
2143macro_rules! config_namespace_with_hashmap {
2144    (
2145     $(#[doc = $struct_d:tt])*
2146     $(#[deprecated($($struct_depr:tt)*)])?  // Optional struct-level deprecated attribute
2147     $vis:vis struct $struct_name:ident {
2148        $(
2149        $(#[doc = $d:tt])*
2150        $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
2151        $field_vis:vis $field_name:ident : $field_type:ty, $(transform = $transform:expr,)? default = $default:expr
2152        )*$(,)*
2153    }
2154    ) => {
2155
2156        $(#[doc = $struct_d])*
2157        $(#[deprecated($($struct_depr)*)])?  // Apply struct deprecation
2158        #[derive(Debug, Clone, PartialEq)]
2159        $vis struct $struct_name{
2160            $(
2161            $(#[doc = $d])*
2162            $(#[deprecated($($field_depr)*)])? // Apply field deprecation
2163            $field_vis $field_name : $field_type,
2164            )*
2165        }
2166
2167        impl ConfigField for $struct_name {
2168            fn set(&mut self, key: &str, value: &str) -> Result<()> {
2169                let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2170                match key {
2171                    $(
2172                       stringify!($field_name) => {
2173                           // Handle deprecated fields
2174                           #[allow(deprecated)] // Allow deprecated fields
2175                           $(let value = $transform(value);)?
2176                           self.$field_name.set(rem, value.as_ref())
2177                       },
2178                    )*
2179                    _ => _config_err!(
2180                        "Config value \"{}\" not found on {}", key, stringify!($struct_name)
2181                    )
2182                }
2183            }
2184
2185            fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2186                $(
2187                let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
2188                let desc = concat!($($d),*).trim();
2189                // Handle deprecated fields
2190                #[allow(deprecated)]
2191                self.$field_name.visit(v, key.as_str(), desc);
2192                )*
2193            }
2194        }
2195
2196        impl Default for $struct_name {
2197            fn default() -> Self {
2198                #[allow(deprecated)]
2199                Self {
2200                    $($field_name: $default),*
2201                }
2202            }
2203        }
2204
2205        impl ConfigField for HashMap<String,$struct_name> {
2206            fn set(&mut self, key: &str, value: &str) -> Result<()> {
2207                let parts: Vec<&str> = key.splitn(2, "::").collect();
2208                match parts.as_slice() {
2209                    [inner_key, hashmap_key] => {
2210                        // Get or create the struct for the specified key
2211                        let inner_value = self
2212                            .entry((*hashmap_key).to_owned())
2213                            .or_insert_with($struct_name::default);
2214
2215                        inner_value.set(inner_key, value)
2216                    }
2217                    _ => _config_err!("Unrecognized key '{key}'."),
2218                }
2219            }
2220
2221            fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2222                for (column_name, col_options) in self {
2223                    $(
2224                    let key = format!("{}.{field}::{}", key_prefix, column_name, field = stringify!($field_name));
2225                    let desc = concat!($($d),*).trim();
2226                    #[allow(deprecated)]
2227                    col_options.$field_name.visit(v, key.as_str(), desc);
2228                    )*
2229                }
2230            }
2231        }
2232    }
2233}
2234
2235config_namespace_with_hashmap! {
2236    /// Options controlling parquet format for individual columns.
2237    ///
2238    /// See [`ParquetOptions`] for more details
2239    pub struct ParquetColumnOptions {
2240        /// Sets if bloom filter is enabled for the column path.
2241        pub bloom_filter_enabled: Option<bool>, default = None
2242
2243        /// Sets encoding for the column path.
2244        /// Valid values are: plain, plain_dictionary, rle,
2245        /// bit_packed, delta_binary_packed, delta_length_byte_array,
2246        /// delta_byte_array, rle_dictionary, and byte_stream_split.
2247        /// These values are not case-sensitive. If NULL, uses
2248        /// default parquet options
2249        pub encoding: Option<String>, default = None
2250
2251        /// Sets if dictionary encoding is enabled for the column path. If NULL, uses
2252        /// default parquet options
2253        pub dictionary_enabled: Option<bool>, default = None
2254
2255        /// Sets default parquet compression codec for the column path.
2256        /// Valid values are: uncompressed, snappy, gzip(level),
2257        /// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
2258        /// These values are not case-sensitive. If NULL, uses
2259        /// default parquet options
2260        pub compression: Option<String>, transform = str::to_lowercase, default = None
2261
2262        /// Sets if statistics are enabled for the column
2263        /// Valid values are: "none", "chunk", and "page"
2264        /// These values are not case sensitive. If NULL, uses
2265        /// default parquet options
2266        pub statistics_enabled: Option<String>, default = None
2267
2268        /// Sets bloom filter false positive probability for the column path. If NULL, uses
2269        /// default parquet options
2270        pub bloom_filter_fpp: Option<f64>, default = None
2271
2272        /// Sets bloom filter number of distinct values. If NULL, uses
2273        /// default parquet options
2274        pub bloom_filter_ndv: Option<u64>, default = None
2275    }
2276}
2277
2278#[derive(Clone, Debug, PartialEq)]
2279pub struct ConfigFileEncryptionProperties {
2280    /// Should the parquet footer be encrypted
2281    /// default is true
2282    pub encrypt_footer: bool,
2283    /// Key to use for the parquet footer encoded in hex format
2284    pub footer_key_as_hex: String,
2285    /// Metadata information for footer key
2286    pub footer_key_metadata_as_hex: String,
2287    /// HashMap of column names --> (key in hex format, metadata)
2288    pub column_encryption_properties: HashMap<String, ColumnEncryptionProperties>,
2289    /// AAD prefix string uniquely identifies the file and prevents file swapping
2290    pub aad_prefix_as_hex: String,
2291    /// If true, store the AAD prefix in the file
2292    /// default is false
2293    pub store_aad_prefix: bool,
2294}
2295
2296// Setup to match EncryptionPropertiesBuilder::new()
2297impl Default for ConfigFileEncryptionProperties {
2298    fn default() -> Self {
2299        ConfigFileEncryptionProperties {
2300            encrypt_footer: true,
2301            footer_key_as_hex: String::new(),
2302            footer_key_metadata_as_hex: String::new(),
2303            column_encryption_properties: Default::default(),
2304            aad_prefix_as_hex: String::new(),
2305            store_aad_prefix: false,
2306        }
2307    }
2308}
2309
2310config_namespace_with_hashmap! {
2311    pub struct ColumnEncryptionProperties {
2312        /// Per column encryption key
2313        pub column_key_as_hex: String, default = "".to_string()
2314        /// Per column encryption key metadata
2315        pub column_metadata_as_hex: Option<String>, default = None
2316    }
2317}
2318
2319impl ConfigField for ConfigFileEncryptionProperties {
2320    fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2321        let key = format!("{key_prefix}.encrypt_footer");
2322        let desc = "Encrypt the footer";
2323        self.encrypt_footer.visit(v, key.as_str(), desc);
2324
2325        let key = format!("{key_prefix}.footer_key_as_hex");
2326        let desc = "Key to use for the parquet footer";
2327        self.footer_key_as_hex.visit(v, key.as_str(), desc);
2328
2329        let key = format!("{key_prefix}.footer_key_metadata_as_hex");
2330        let desc = "Metadata to use for the parquet footer";
2331        self.footer_key_metadata_as_hex.visit(v, key.as_str(), desc);
2332
2333        self.column_encryption_properties.visit(v, key_prefix, desc);
2334
2335        let key = format!("{key_prefix}.aad_prefix_as_hex");
2336        let desc = "AAD prefix to use";
2337        self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2338
2339        let key = format!("{key_prefix}.store_aad_prefix");
2340        let desc = "If true, store the AAD prefix";
2341        self.store_aad_prefix.visit(v, key.as_str(), desc);
2342
2343        self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2344    }
2345
2346    fn set(&mut self, key: &str, value: &str) -> Result<()> {
2347        // Any hex encoded values must be pre-encoded using
2348        // hex::encode() before calling set.
2349
2350        if key.contains("::") {
2351            // Handle any column specific properties
2352            return self.column_encryption_properties.set(key, value);
2353        };
2354
2355        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2356        match key {
2357            "encrypt_footer" => self.encrypt_footer.set(rem, value.as_ref()),
2358            "footer_key_as_hex" => self.footer_key_as_hex.set(rem, value.as_ref()),
2359            "footer_key_metadata_as_hex" => {
2360                self.footer_key_metadata_as_hex.set(rem, value.as_ref())
2361            }
2362            "aad_prefix_as_hex" => self.aad_prefix_as_hex.set(rem, value.as_ref()),
2363            "store_aad_prefix" => self.store_aad_prefix.set(rem, value.as_ref()),
2364            _ => _config_err!(
2365                "Config value \"{}\" not found on ConfigFileEncryptionProperties",
2366                key
2367            ),
2368        }
2369    }
2370}
2371
2372#[cfg(feature = "parquet_encryption")]
2373impl From<ConfigFileEncryptionProperties> for FileEncryptionProperties {
2374    fn from(val: ConfigFileEncryptionProperties) -> Self {
2375        let mut fep = FileEncryptionProperties::builder(
2376            hex::decode(val.footer_key_as_hex).unwrap(),
2377        )
2378        .with_plaintext_footer(!val.encrypt_footer)
2379        .with_aad_prefix_storage(val.store_aad_prefix);
2380
2381        if !val.footer_key_metadata_as_hex.is_empty() {
2382            fep = fep.with_footer_key_metadata(
2383                hex::decode(&val.footer_key_metadata_as_hex)
2384                    .expect("Invalid footer key metadata"),
2385            );
2386        }
2387
2388        for (column_name, encryption_props) in val.column_encryption_properties.iter() {
2389            let encryption_key = hex::decode(&encryption_props.column_key_as_hex)
2390                .expect("Invalid column encryption key");
2391            let key_metadata = encryption_props
2392                .column_metadata_as_hex
2393                .as_ref()
2394                .map(|x| hex::decode(x).expect("Invalid column metadata"));
2395            match key_metadata {
2396                Some(key_metadata) => {
2397                    fep = fep.with_column_key_and_metadata(
2398                        column_name,
2399                        encryption_key,
2400                        key_metadata,
2401                    );
2402                }
2403                None => {
2404                    fep = fep.with_column_key(column_name, encryption_key);
2405                }
2406            }
2407        }
2408
2409        if !val.aad_prefix_as_hex.is_empty() {
2410            let aad_prefix: Vec<u8> =
2411                hex::decode(&val.aad_prefix_as_hex).expect("Invalid AAD prefix");
2412            fep = fep.with_aad_prefix(aad_prefix);
2413        }
2414        Arc::unwrap_or_clone(fep.build().unwrap())
2415    }
2416}
2417
2418#[cfg(feature = "parquet_encryption")]
2419impl From<&Arc<FileEncryptionProperties>> for ConfigFileEncryptionProperties {
2420    fn from(f: &Arc<FileEncryptionProperties>) -> Self {
2421        let (column_names_vec, column_keys_vec, column_metas_vec) = f.column_keys();
2422
2423        let mut column_encryption_properties: HashMap<
2424            String,
2425            ColumnEncryptionProperties,
2426        > = HashMap::new();
2427
2428        for (i, column_name) in column_names_vec.iter().enumerate() {
2429            let column_key_as_hex = hex::encode(&column_keys_vec[i]);
2430            let column_metadata_as_hex: Option<String> =
2431                column_metas_vec.get(i).map(hex::encode);
2432            column_encryption_properties.insert(
2433                column_name.clone(),
2434                ColumnEncryptionProperties {
2435                    column_key_as_hex,
2436                    column_metadata_as_hex,
2437                },
2438            );
2439        }
2440        let mut aad_prefix: Vec<u8> = Vec::new();
2441        if let Some(prefix) = f.aad_prefix() {
2442            aad_prefix = prefix.clone();
2443        }
2444        ConfigFileEncryptionProperties {
2445            encrypt_footer: f.encrypt_footer(),
2446            footer_key_as_hex: hex::encode(f.footer_key()),
2447            footer_key_metadata_as_hex: f
2448                .footer_key_metadata()
2449                .map(hex::encode)
2450                .unwrap_or_default(),
2451            column_encryption_properties,
2452            aad_prefix_as_hex: hex::encode(aad_prefix),
2453            store_aad_prefix: f.store_aad_prefix(),
2454        }
2455    }
2456}
2457
2458#[derive(Clone, Debug, PartialEq)]
2459pub struct ConfigFileDecryptionProperties {
2460    /// Binary string to use for the parquet footer encoded in hex format
2461    pub footer_key_as_hex: String,
2462    /// HashMap of column names --> key in hex format
2463    pub column_decryption_properties: HashMap<String, ColumnDecryptionProperties>,
2464    /// AAD prefix string uniquely identifies the file and prevents file swapping
2465    pub aad_prefix_as_hex: String,
2466    /// If true, then verify signature for files with plaintext footers.
2467    /// default = true
2468    pub footer_signature_verification: bool,
2469}
2470
2471config_namespace_with_hashmap! {
2472    pub struct ColumnDecryptionProperties {
2473        /// Per column encryption key
2474        pub column_key_as_hex: String, default = "".to_string()
2475    }
2476}
2477
2478// Setup to match DecryptionPropertiesBuilder::new()
2479impl Default for ConfigFileDecryptionProperties {
2480    fn default() -> Self {
2481        ConfigFileDecryptionProperties {
2482            footer_key_as_hex: String::new(),
2483            column_decryption_properties: Default::default(),
2484            aad_prefix_as_hex: String::new(),
2485            footer_signature_verification: true,
2486        }
2487    }
2488}
2489
2490impl ConfigField for ConfigFileDecryptionProperties {
2491    fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2492        let key = format!("{key_prefix}.footer_key_as_hex");
2493        let desc = "Key to use for the parquet footer";
2494        self.footer_key_as_hex.visit(v, key.as_str(), desc);
2495
2496        let key = format!("{key_prefix}.aad_prefix_as_hex");
2497        let desc = "AAD prefix to use";
2498        self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2499
2500        let key = format!("{key_prefix}.footer_signature_verification");
2501        let desc = "If true, verify the footer signature";
2502        self.footer_signature_verification
2503            .visit(v, key.as_str(), desc);
2504
2505        self.column_decryption_properties.visit(v, key_prefix, desc);
2506    }
2507
2508    fn set(&mut self, key: &str, value: &str) -> Result<()> {
2509        // Any hex encoded values must be pre-encoded using
2510        // hex::encode() before calling set.
2511
2512        if key.contains("::") {
2513            // Handle any column specific properties
2514            return self.column_decryption_properties.set(key, value);
2515        };
2516
2517        let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2518        match key {
2519            "footer_key_as_hex" => self.footer_key_as_hex.set(rem, value.as_ref()),
2520            "aad_prefix_as_hex" => self.aad_prefix_as_hex.set(rem, value.as_ref()),
2521            "footer_signature_verification" => {
2522                self.footer_signature_verification.set(rem, value.as_ref())
2523            }
2524            _ => _config_err!(
2525                "Config value \"{}\" not found on ConfigFileEncryptionProperties",
2526                key
2527            ),
2528        }
2529    }
2530}
2531
2532#[cfg(feature = "parquet_encryption")]
2533impl From<ConfigFileDecryptionProperties> for FileDecryptionProperties {
2534    fn from(val: ConfigFileDecryptionProperties) -> Self {
2535        let mut column_names: Vec<&str> = Vec::new();
2536        let mut column_keys: Vec<Vec<u8>> = Vec::new();
2537
2538        for (col_name, decryption_properties) in val.column_decryption_properties.iter() {
2539            column_names.push(col_name.as_str());
2540            column_keys.push(
2541                hex::decode(&decryption_properties.column_key_as_hex)
2542                    .expect("Invalid column decryption key"),
2543            );
2544        }
2545
2546        let mut fep = FileDecryptionProperties::builder(
2547            hex::decode(val.footer_key_as_hex).expect("Invalid footer key"),
2548        )
2549        .with_column_keys(column_names, column_keys)
2550        .unwrap();
2551
2552        if !val.footer_signature_verification {
2553            fep = fep.disable_footer_signature_verification();
2554        }
2555
2556        if !val.aad_prefix_as_hex.is_empty() {
2557            let aad_prefix =
2558                hex::decode(&val.aad_prefix_as_hex).expect("Invalid AAD prefix");
2559            fep = fep.with_aad_prefix(aad_prefix);
2560        }
2561
2562        Arc::unwrap_or_clone(fep.build().unwrap())
2563    }
2564}
2565
2566#[cfg(feature = "parquet_encryption")]
2567impl From<&Arc<FileDecryptionProperties>> for ConfigFileDecryptionProperties {
2568    fn from(f: &Arc<FileDecryptionProperties>) -> Self {
2569        let (column_names_vec, column_keys_vec) = f.column_keys();
2570        let mut column_decryption_properties: HashMap<
2571            String,
2572            ColumnDecryptionProperties,
2573        > = HashMap::new();
2574        for (i, column_name) in column_names_vec.iter().enumerate() {
2575            let props = ColumnDecryptionProperties {
2576                column_key_as_hex: hex::encode(column_keys_vec[i].clone()),
2577            };
2578            column_decryption_properties.insert(column_name.clone(), props);
2579        }
2580
2581        let mut aad_prefix: Vec<u8> = Vec::new();
2582        if let Some(prefix) = f.aad_prefix() {
2583            aad_prefix = prefix.clone();
2584        }
2585        ConfigFileDecryptionProperties {
2586            footer_key_as_hex: hex::encode(
2587                f.footer_key(None).unwrap_or_default().as_ref(),
2588            ),
2589            column_decryption_properties,
2590            aad_prefix_as_hex: hex::encode(aad_prefix),
2591            footer_signature_verification: f.check_plaintext_footer_integrity(),
2592        }
2593    }
2594}
2595
2596/// Holds implementation-specific options for an encryption factory
2597#[derive(Clone, Debug, Default, PartialEq)]
2598pub struct EncryptionFactoryOptions {
2599    pub options: HashMap<String, String>,
2600}
2601
2602impl ConfigField for EncryptionFactoryOptions {
2603    fn visit<V: Visit>(&self, v: &mut V, key: &str, _description: &'static str) {
2604        for (option_key, option_value) in &self.options {
2605            v.some(
2606                &format!("{key}.{option_key}"),
2607                option_value,
2608                "Encryption factory specific option",
2609            );
2610        }
2611    }
2612
2613    fn set(&mut self, key: &str, value: &str) -> Result<()> {
2614        self.options.insert(key.to_owned(), value.to_owned());
2615        Ok(())
2616    }
2617}
2618
2619impl EncryptionFactoryOptions {
2620    /// Convert these encryption factory options to an [`ExtensionOptions`] instance.
2621    pub fn to_extension_options<T: ExtensionOptions + Default>(&self) -> Result<T> {
2622        let mut options = T::default();
2623        for (key, value) in &self.options {
2624            options.set(key, value)?;
2625        }
2626        Ok(options)
2627    }
2628}
2629
2630config_namespace! {
2631    /// Options controlling CSV format
2632    pub struct CsvOptions {
2633        /// Specifies whether there is a CSV header (i.e. the first line
2634        /// consists of is column names). The value `None` indicates that
2635        /// the configuration should be consulted.
2636        pub has_header: Option<bool>, default = None
2637        pub delimiter: u8, default = b','
2638        pub quote: u8, default = b'"'
2639        pub terminator: Option<u8>, default = None
2640        pub escape: Option<u8>, default = None
2641        pub double_quote: Option<bool>, default = None
2642        /// Specifies whether newlines in (quoted) values are supported.
2643        ///
2644        /// Parsing newlines in quoted values may be affected by execution behaviour such as
2645        /// parallel file scanning. Setting this to `true` ensures that newlines in values are
2646        /// parsed successfully, which may reduce performance.
2647        ///
2648        /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
2649        pub newlines_in_values: Option<bool>, default = None
2650        pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
2651        pub schema_infer_max_rec: Option<usize>, default = None
2652        pub date_format: Option<String>, default = None
2653        pub datetime_format: Option<String>, default = None
2654        pub timestamp_format: Option<String>, default = None
2655        pub timestamp_tz_format: Option<String>, default = None
2656        pub time_format: Option<String>, default = None
2657        // The output format for Nulls in the CSV writer.
2658        pub null_value: Option<String>, default = None
2659        // The input regex for Nulls when loading CSVs.
2660        pub null_regex: Option<String>, default = None
2661        pub comment: Option<u8>, default = None
2662        /// Whether to allow truncated rows when parsing, both within a single file and across files.
2663        ///
2664        /// When set to false (default), reading a single CSV file which has rows of different lengths will
2665        /// error; if reading multiple CSV files with different number of columns, it will also fail.
2666        ///
2667        /// When set to true, reading a single CSV file with rows of different lengths will pad the truncated
2668        /// rows with null values for the missing columns; if reading multiple CSV files with different number
2669        /// of columns, it creates a union schema containing all columns found across the files, and will
2670        /// pad any files missing columns with null values for their rows.
2671        pub truncated_rows: Option<bool>, default = None
2672    }
2673}
2674
2675impl CsvOptions {
2676    /// Set a limit in terms of records to scan to infer the schema
2677    /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
2678    pub fn with_compression(
2679        mut self,
2680        compression_type_variant: CompressionTypeVariant,
2681    ) -> Self {
2682        self.compression = compression_type_variant;
2683        self
2684    }
2685
2686    /// Set a limit in terms of records to scan to infer the schema
2687    /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
2688    pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self {
2689        self.schema_infer_max_rec = Some(max_rec);
2690        self
2691    }
2692
2693    /// Set true to indicate that the first line is a header.
2694    /// - default to true
2695    pub fn with_has_header(mut self, has_header: bool) -> Self {
2696        self.has_header = Some(has_header);
2697        self
2698    }
2699
2700    /// Returns true if the first line is a header. If format options does not
2701    /// specify whether there is a header, returns `None` (indicating that the
2702    /// configuration should be consulted).
2703    pub fn has_header(&self) -> Option<bool> {
2704        self.has_header
2705    }
2706
2707    /// The character separating values within a row.
2708    /// - default to ','
2709    pub fn with_delimiter(mut self, delimiter: u8) -> Self {
2710        self.delimiter = delimiter;
2711        self
2712    }
2713
2714    /// The quote character in a row.
2715    /// - default to '"'
2716    pub fn with_quote(mut self, quote: u8) -> Self {
2717        self.quote = quote;
2718        self
2719    }
2720
2721    /// The character that terminates a row.
2722    /// - default to None (CRLF)
2723    pub fn with_terminator(mut self, terminator: Option<u8>) -> Self {
2724        self.terminator = terminator;
2725        self
2726    }
2727
2728    /// The escape character in a row.
2729    /// - default is None
2730    pub fn with_escape(mut self, escape: Option<u8>) -> Self {
2731        self.escape = escape;
2732        self
2733    }
2734
2735    /// Set true to indicate that the CSV quotes should be doubled.
2736    /// - default to true
2737    pub fn with_double_quote(mut self, double_quote: bool) -> Self {
2738        self.double_quote = Some(double_quote);
2739        self
2740    }
2741
2742    /// Specifies whether newlines in (quoted) values are supported.
2743    ///
2744    /// Parsing newlines in quoted values may be affected by execution behaviour such as
2745    /// parallel file scanning. Setting this to `true` ensures that newlines in values are
2746    /// parsed successfully, which may reduce performance.
2747    ///
2748    /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
2749    pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
2750        self.newlines_in_values = Some(newlines_in_values);
2751        self
2752    }
2753
2754    /// Set a `CompressionTypeVariant` of CSV
2755    /// - defaults to `CompressionTypeVariant::UNCOMPRESSED`
2756    pub fn with_file_compression_type(
2757        mut self,
2758        compression: CompressionTypeVariant,
2759    ) -> Self {
2760        self.compression = compression;
2761        self
2762    }
2763
2764    /// Whether to allow truncated rows when parsing.
2765    /// By default this is set to false and will error if the CSV rows have different lengths.
2766    /// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls.
2767    /// If the record’s schema is not nullable, then it will still return an error.
2768    pub fn with_truncated_rows(mut self, allow: bool) -> Self {
2769        self.truncated_rows = Some(allow);
2770        self
2771    }
2772
2773    /// The delimiter character.
2774    pub fn delimiter(&self) -> u8 {
2775        self.delimiter
2776    }
2777
2778    /// The quote character.
2779    pub fn quote(&self) -> u8 {
2780        self.quote
2781    }
2782
2783    /// The terminator character.
2784    pub fn terminator(&self) -> Option<u8> {
2785        self.terminator
2786    }
2787
2788    /// The escape character.
2789    pub fn escape(&self) -> Option<u8> {
2790        self.escape
2791    }
2792}
2793
2794config_namespace! {
2795    /// Options controlling JSON format
2796    pub struct JsonOptions {
2797        pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
2798        pub schema_infer_max_rec: Option<usize>, default = None
2799    }
2800}
2801
2802pub trait OutputFormatExt: Display {}
2803
2804#[derive(Debug, Clone, PartialEq)]
2805#[allow(clippy::large_enum_variant)]
2806pub enum OutputFormat {
2807    CSV(CsvOptions),
2808    JSON(JsonOptions),
2809    #[cfg(feature = "parquet")]
2810    PARQUET(TableParquetOptions),
2811    AVRO,
2812    ARROW,
2813}
2814
2815impl Display for OutputFormat {
2816    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2817        let out = match self {
2818            OutputFormat::CSV(_) => "csv",
2819            OutputFormat::JSON(_) => "json",
2820            #[cfg(feature = "parquet")]
2821            OutputFormat::PARQUET(_) => "parquet",
2822            OutputFormat::AVRO => "avro",
2823            OutputFormat::ARROW => "arrow",
2824        };
2825        write!(f, "{out}")
2826    }
2827}
2828
2829#[cfg(test)]
2830mod tests {
2831    #[cfg(feature = "parquet")]
2832    use crate::config::TableParquetOptions;
2833    use crate::config::{
2834        ConfigEntry, ConfigExtension, ConfigField, ConfigFileType, ExtensionOptions,
2835        Extensions, TableOptions,
2836    };
2837    use std::any::Any;
2838    use std::collections::HashMap;
2839    use std::sync::Arc;
2840
2841    #[derive(Default, Debug, Clone)]
2842    pub struct TestExtensionConfig {
2843        /// Should "foo" be replaced by "bar"?
2844        pub properties: HashMap<String, String>,
2845    }
2846
2847    impl ExtensionOptions for TestExtensionConfig {
2848        fn as_any(&self) -> &dyn Any {
2849            self
2850        }
2851
2852        fn as_any_mut(&mut self) -> &mut dyn Any {
2853            self
2854        }
2855
2856        fn cloned(&self) -> Box<dyn ExtensionOptions> {
2857            Box::new(self.clone())
2858        }
2859
2860        fn set(&mut self, key: &str, value: &str) -> crate::Result<()> {
2861            let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2862            assert_eq!(key, "test");
2863            self.properties.insert(rem.to_owned(), value.to_owned());
2864            Ok(())
2865        }
2866
2867        fn entries(&self) -> Vec<ConfigEntry> {
2868            self.properties
2869                .iter()
2870                .map(|(k, v)| ConfigEntry {
2871                    key: k.into(),
2872                    value: Some(v.into()),
2873                    description: "",
2874                })
2875                .collect()
2876        }
2877    }
2878
2879    impl ConfigExtension for TestExtensionConfig {
2880        const PREFIX: &'static str = "test";
2881    }
2882
2883    #[test]
2884    fn create_table_config() {
2885        let mut extension = Extensions::new();
2886        extension.insert(TestExtensionConfig::default());
2887        let table_config = TableOptions::new().with_extensions(extension);
2888        let kafka_config = table_config.extensions.get::<TestExtensionConfig>();
2889        assert!(kafka_config.is_some())
2890    }
2891
2892    #[test]
2893    fn alter_test_extension_config() {
2894        let mut extension = Extensions::new();
2895        extension.insert(TestExtensionConfig::default());
2896        let mut table_config = TableOptions::new().with_extensions(extension);
2897        table_config.set_config_format(ConfigFileType::CSV);
2898        table_config.set("format.delimiter", ";").unwrap();
2899        assert_eq!(table_config.csv.delimiter, b';');
2900        table_config.set("test.bootstrap.servers", "asd").unwrap();
2901        let kafka_config = table_config
2902            .extensions
2903            .get::<TestExtensionConfig>()
2904            .unwrap();
2905        assert_eq!(
2906            kafka_config.properties.get("bootstrap.servers").unwrap(),
2907            "asd"
2908        );
2909    }
2910
2911    #[test]
2912    fn csv_u8_table_options() {
2913        let mut table_config = TableOptions::new();
2914        table_config.set_config_format(ConfigFileType::CSV);
2915        table_config.set("format.delimiter", ";").unwrap();
2916        assert_eq!(table_config.csv.delimiter as char, ';');
2917        table_config.set("format.escape", "\"").unwrap();
2918        assert_eq!(table_config.csv.escape.unwrap() as char, '"');
2919        table_config.set("format.escape", "\'").unwrap();
2920        assert_eq!(table_config.csv.escape.unwrap() as char, '\'');
2921    }
2922
2923    #[test]
2924    fn warning_only_not_default() {
2925        use std::sync::atomic::AtomicUsize;
2926        static COUNT: AtomicUsize = AtomicUsize::new(0);
2927        use log::{Level, LevelFilter, Metadata, Record};
2928        struct SimpleLogger;
2929        impl log::Log for SimpleLogger {
2930            fn enabled(&self, metadata: &Metadata) -> bool {
2931                metadata.level() <= Level::Info
2932            }
2933
2934            fn log(&self, record: &Record) {
2935                if self.enabled(record.metadata()) {
2936                    COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2937                }
2938            }
2939            fn flush(&self) {}
2940        }
2941        log::set_logger(&SimpleLogger).unwrap();
2942        log::set_max_level(LevelFilter::Info);
2943        let mut sql_parser_options = crate::config::SqlParserOptions::default();
2944        sql_parser_options
2945            .set("enable_options_value_normalization", "false")
2946            .unwrap();
2947        assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 0);
2948        sql_parser_options
2949            .set("enable_options_value_normalization", "true")
2950            .unwrap();
2951        assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 1);
2952    }
2953
2954    #[cfg(feature = "parquet")]
2955    #[test]
2956    fn parquet_table_options() {
2957        let mut table_config = TableOptions::new();
2958        table_config.set_config_format(ConfigFileType::PARQUET);
2959        table_config
2960            .set("format.bloom_filter_enabled::col1", "true")
2961            .unwrap();
2962        assert_eq!(
2963            table_config.parquet.column_specific_options["col1"].bloom_filter_enabled,
2964            Some(true)
2965        );
2966    }
2967
2968    #[cfg(feature = "parquet_encryption")]
2969    #[test]
2970    fn parquet_table_encryption() {
2971        use crate::config::{
2972            ConfigFileDecryptionProperties, ConfigFileEncryptionProperties,
2973        };
2974        use parquet::encryption::decrypt::FileDecryptionProperties;
2975        use parquet::encryption::encrypt::FileEncryptionProperties;
2976
2977        let footer_key = b"0123456789012345".to_vec(); // 128bit/16
2978        let column_names = vec!["double_field", "float_field"];
2979        let column_keys =
2980            vec![b"1234567890123450".to_vec(), b"1234567890123451".to_vec()];
2981
2982        let file_encryption_properties =
2983            FileEncryptionProperties::builder(footer_key.clone())
2984                .with_column_keys(column_names.clone(), column_keys.clone())
2985                .unwrap()
2986                .build()
2987                .unwrap();
2988
2989        let decryption_properties = FileDecryptionProperties::builder(footer_key.clone())
2990            .with_column_keys(column_names.clone(), column_keys.clone())
2991            .unwrap()
2992            .build()
2993            .unwrap();
2994
2995        // Test round-trip
2996        let config_encrypt =
2997            ConfigFileEncryptionProperties::from(&file_encryption_properties);
2998        let encryption_properties_built =
2999            Arc::new(FileEncryptionProperties::from(config_encrypt.clone()));
3000        assert_eq!(file_encryption_properties, encryption_properties_built);
3001
3002        let config_decrypt = ConfigFileDecryptionProperties::from(&decryption_properties);
3003        let decryption_properties_built =
3004            Arc::new(FileDecryptionProperties::from(config_decrypt.clone()));
3005        assert_eq!(decryption_properties, decryption_properties_built);
3006
3007        ///////////////////////////////////////////////////////////////////////////////////
3008        // Test encryption config
3009
3010        // Display original encryption config
3011        // println!("{:#?}", config_encrypt);
3012
3013        let mut table_config = TableOptions::new();
3014        table_config.set_config_format(ConfigFileType::PARQUET);
3015        table_config
3016            .parquet
3017            .set(
3018                "crypto.file_encryption.encrypt_footer",
3019                config_encrypt.encrypt_footer.to_string().as_str(),
3020            )
3021            .unwrap();
3022        table_config
3023            .parquet
3024            .set(
3025                "crypto.file_encryption.footer_key_as_hex",
3026                config_encrypt.footer_key_as_hex.as_str(),
3027            )
3028            .unwrap();
3029
3030        for (i, col_name) in column_names.iter().enumerate() {
3031            let key = format!("crypto.file_encryption.column_key_as_hex::{col_name}");
3032            let value = hex::encode(column_keys[i].clone());
3033            table_config
3034                .parquet
3035                .set(key.as_str(), value.as_str())
3036                .unwrap();
3037        }
3038
3039        // Print matching final encryption config
3040        // println!("{:#?}", table_config.parquet.crypto.file_encryption);
3041
3042        assert_eq!(
3043            table_config.parquet.crypto.file_encryption,
3044            Some(config_encrypt)
3045        );
3046
3047        ///////////////////////////////////////////////////////////////////////////////////
3048        // Test decryption config
3049
3050        // Display original decryption config
3051        // println!("{:#?}", config_decrypt);
3052
3053        let mut table_config = TableOptions::new();
3054        table_config.set_config_format(ConfigFileType::PARQUET);
3055        table_config
3056            .parquet
3057            .set(
3058                "crypto.file_decryption.footer_key_as_hex",
3059                config_decrypt.footer_key_as_hex.as_str(),
3060            )
3061            .unwrap();
3062
3063        for (i, col_name) in column_names.iter().enumerate() {
3064            let key = format!("crypto.file_decryption.column_key_as_hex::{col_name}");
3065            let value = hex::encode(column_keys[i].clone());
3066            table_config
3067                .parquet
3068                .set(key.as_str(), value.as_str())
3069                .unwrap();
3070        }
3071
3072        // Print matching final decryption config
3073        // println!("{:#?}", table_config.parquet.crypto.file_decryption);
3074
3075        assert_eq!(
3076            table_config.parquet.crypto.file_decryption,
3077            Some(config_decrypt.clone())
3078        );
3079
3080        // Set config directly
3081        let mut table_config = TableOptions::new();
3082        table_config.set_config_format(ConfigFileType::PARQUET);
3083        table_config.parquet.crypto.file_decryption = Some(config_decrypt.clone());
3084        assert_eq!(
3085            table_config.parquet.crypto.file_decryption,
3086            Some(config_decrypt.clone())
3087        );
3088    }
3089
3090    #[cfg(feature = "parquet_encryption")]
3091    #[test]
3092    fn parquet_encryption_factory_config() {
3093        let mut parquet_options = TableParquetOptions::default();
3094
3095        assert_eq!(parquet_options.crypto.factory_id, None);
3096        assert_eq!(parquet_options.crypto.factory_options.options.len(), 0);
3097
3098        let mut input_config = TestExtensionConfig::default();
3099        input_config
3100            .properties
3101            .insert("key1".to_string(), "value 1".to_string());
3102        input_config
3103            .properties
3104            .insert("key2".to_string(), "value 2".to_string());
3105
3106        parquet_options
3107            .crypto
3108            .configure_factory("example_factory", &input_config);
3109
3110        assert_eq!(
3111            parquet_options.crypto.factory_id,
3112            Some("example_factory".to_string())
3113        );
3114        let factory_options = &parquet_options.crypto.factory_options.options;
3115        assert_eq!(factory_options.len(), 2);
3116        assert_eq!(factory_options.get("key1"), Some(&"value 1".to_string()));
3117        assert_eq!(factory_options.get("key2"), Some(&"value 2".to_string()));
3118    }
3119
3120    #[cfg(feature = "parquet")]
3121    #[test]
3122    fn parquet_table_options_config_entry() {
3123        let mut table_config = TableOptions::new();
3124        table_config.set_config_format(ConfigFileType::PARQUET);
3125        table_config
3126            .set("format.bloom_filter_enabled::col1", "true")
3127            .unwrap();
3128        let entries = table_config.entries();
3129        assert!(entries
3130            .iter()
3131            .any(|item| item.key == "format.bloom_filter_enabled::col1"))
3132    }
3133
3134    #[cfg(feature = "parquet")]
3135    #[test]
3136    fn parquet_table_parquet_options_config_entry() {
3137        let mut table_parquet_options = TableParquetOptions::new();
3138        table_parquet_options
3139            .set(
3140                "crypto.file_encryption.column_key_as_hex::double_field",
3141                "31323334353637383930313233343530",
3142            )
3143            .unwrap();
3144        let entries = table_parquet_options.entries();
3145        assert!(entries
3146            .iter()
3147            .any(|item| item.key
3148                == "crypto.file_encryption.column_key_as_hex::double_field"))
3149    }
3150
3151    #[cfg(feature = "parquet")]
3152    #[test]
3153    fn parquet_table_options_config_metadata_entry() {
3154        let mut table_config = TableOptions::new();
3155        table_config.set_config_format(ConfigFileType::PARQUET);
3156        table_config.set("format.metadata::key1", "").unwrap();
3157        table_config.set("format.metadata::key2", "value2").unwrap();
3158        table_config
3159            .set("format.metadata::key3", "value with spaces ")
3160            .unwrap();
3161        table_config
3162            .set("format.metadata::key4", "value with special chars :: :")
3163            .unwrap();
3164
3165        let parsed_metadata = table_config.parquet.key_value_metadata.clone();
3166        assert_eq!(parsed_metadata.get("should not exist1"), None);
3167        assert_eq!(parsed_metadata.get("key1"), Some(&Some("".into())));
3168        assert_eq!(parsed_metadata.get("key2"), Some(&Some("value2".into())));
3169        assert_eq!(
3170            parsed_metadata.get("key3"),
3171            Some(&Some("value with spaces ".into()))
3172        );
3173        assert_eq!(
3174            parsed_metadata.get("key4"),
3175            Some(&Some("value with special chars :: :".into()))
3176        );
3177
3178        // duplicate keys are overwritten
3179        table_config.set("format.metadata::key_dupe", "A").unwrap();
3180        table_config.set("format.metadata::key_dupe", "B").unwrap();
3181        let parsed_metadata = table_config.parquet.key_value_metadata;
3182        assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into())));
3183    }
3184}