datafusion_common/config.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Runtime configuration, via [`ConfigOptions`]
19
20use arrow_ipc::CompressionType;
21
22#[cfg(feature = "parquet_encryption")]
23use crate::encryption::{FileDecryptionProperties, FileEncryptionProperties};
24use crate::error::_config_err;
25use crate::format::{ExplainAnalyzeLevel, ExplainFormat};
26use crate::parsers::CompressionTypeVariant;
27use crate::utils::get_available_parallelism;
28use crate::{DataFusionError, Result};
29#[cfg(feature = "parquet_encryption")]
30use hex;
31use std::any::Any;
32use std::collections::{BTreeMap, HashMap};
33use std::error::Error;
34use std::fmt::{self, Display};
35use std::str::FromStr;
36#[cfg(feature = "parquet_encryption")]
37use std::sync::Arc;
38
39/// A macro that wraps a configuration struct and automatically derives
40/// [`Default`] and [`ConfigField`] for it, allowing it to be used
41/// in the [`ConfigOptions`] configuration tree.
42///
43/// `transform` is used to normalize values before parsing.
44///
45/// For example,
46///
47/// ```ignore
48/// config_namespace! {
49/// /// Amazing config
50/// pub struct MyConfig {
51/// /// Field 1 doc
52/// field1: String, transform = str::to_lowercase, default = "".to_string()
53///
54/// /// Field 2 doc
55/// field2: usize, default = 232
56///
57/// /// Field 3 doc
58/// field3: Option<usize>, default = None
59/// }
60/// }
61/// ```
62///
63/// Will generate
64///
65/// ```ignore
66/// /// Amazing config
67/// #[derive(Debug, Clone)]
68/// #[non_exhaustive]
69/// pub struct MyConfig {
70/// /// Field 1 doc
71/// field1: String,
72/// /// Field 2 doc
73/// field2: usize,
74/// /// Field 3 doc
75/// field3: Option<usize>,
76/// }
77/// impl ConfigField for MyConfig {
78/// fn set(&mut self, key: &str, value: &str) -> Result<()> {
79/// let (key, rem) = key.split_once('.').unwrap_or((key, ""));
80/// match key {
81/// "field1" => {
82/// let value = str::to_lowercase(value);
83/// self.field1.set(rem, value.as_ref())
84/// },
85/// "field2" => self.field2.set(rem, value.as_ref()),
86/// "field3" => self.field3.set(rem, value.as_ref()),
87/// _ => _internal_err!(
88/// "Config value \"{}\" not found on MyConfig",
89/// key
90/// ),
91/// }
92/// }
93///
94/// fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
95/// let key = format!("{}.field1", key_prefix);
96/// let desc = "Field 1 doc";
97/// self.field1.visit(v, key.as_str(), desc);
98/// let key = format!("{}.field2", key_prefix);
99/// let desc = "Field 2 doc";
100/// self.field2.visit(v, key.as_str(), desc);
101/// let key = format!("{}.field3", key_prefix);
102/// let desc = "Field 3 doc";
103/// self.field3.visit(v, key.as_str(), desc);
104/// }
105/// }
106///
107/// impl Default for MyConfig {
108/// fn default() -> Self {
109/// Self {
110/// field1: "".to_string(),
111/// field2: 232,
112/// field3: None,
113/// }
114/// }
115/// }
116/// ```
117///
118/// NB: Misplaced commas may result in nonsensical errors
119#[macro_export]
120macro_rules! config_namespace {
121 (
122 $(#[doc = $struct_d:tt])* // Struct-level documentation attributes
123 $(#[deprecated($($struct_depr:tt)*)])? // Optional struct-level deprecated attribute
124 $(#[allow($($struct_de:tt)*)])?
125 $vis:vis struct $struct_name:ident {
126 $(
127 $(#[doc = $d:tt])* // Field-level documentation attributes
128 $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
129 $(#[allow($($field_de:tt)*)])?
130 $field_vis:vis $field_name:ident : $field_type:ty,
131 $(warn = $warn:expr,)?
132 $(transform = $transform:expr,)?
133 default = $default:expr
134 )*$(,)*
135 }
136 ) => {
137 $(#[doc = $struct_d])* // Apply struct documentation
138 $(#[deprecated($($struct_depr)*)])? // Apply struct deprecation
139 $(#[allow($($struct_de)*)])?
140 #[derive(Debug, Clone, PartialEq)]
141 $vis struct $struct_name {
142 $(
143 $(#[doc = $d])* // Apply field documentation
144 $(#[deprecated($($field_depr)*)])? // Apply field deprecation
145 $(#[allow($($field_de)*)])?
146 $field_vis $field_name: $field_type,
147 )*
148 }
149
150 impl $crate::config::ConfigField for $struct_name {
151 fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
152 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
153 match key {
154 $(
155 stringify!($field_name) => {
156 // Safely apply deprecated attribute if present
157 // $(#[allow(deprecated)])?
158 {
159 $(let value = $transform(value);)? // Apply transformation if specified
160 #[allow(deprecated)]
161 let ret = self.$field_name.set(rem, value.as_ref());
162
163 $(if !$warn.is_empty() {
164 let default: $field_type = $default;
165 #[allow(deprecated)]
166 if default != self.$field_name {
167 log::warn!($warn);
168 }
169 })? // Log warning if specified, and the value is not the default
170 ret
171 }
172 },
173 )*
174 _ => return $crate::error::_config_err!(
175 "Config value \"{}\" not found on {}", key, stringify!($struct_name)
176 )
177 }
178 }
179
180 fn visit<V: $crate::config::Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
181 $(
182 let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
183 let desc = concat!($($d),*).trim();
184 #[allow(deprecated)]
185 self.$field_name.visit(v, key.as_str(), desc);
186 )*
187 }
188 }
189 impl Default for $struct_name {
190 fn default() -> Self {
191 #[allow(deprecated)]
192 Self {
193 $($field_name: $default),*
194 }
195 }
196 }
197 }
198}
199
200config_namespace! {
201 /// Options related to catalog and directory scanning
202 ///
203 /// See also: [`SessionConfig`]
204 ///
205 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
206 pub struct CatalogOptions {
207 /// Whether the default catalog and schema should be created automatically.
208 pub create_default_catalog_and_schema: bool, default = true
209
210 /// The default catalog name - this impacts what SQL queries use if not specified
211 pub default_catalog: String, default = "datafusion".to_string()
212
213 /// The default schema name - this impacts what SQL queries use if not specified
214 pub default_schema: String, default = "public".to_string()
215
216 /// Should DataFusion provide access to `information_schema`
217 /// virtual tables for displaying schema information
218 pub information_schema: bool, default = false
219
220 /// Location scanned to load tables for `default` schema
221 pub location: Option<String>, default = None
222
223 /// Type of `TableProvider` to use when loading `default` schema
224 pub format: Option<String>, default = None
225
226 /// Default value for `format.has_header` for `CREATE EXTERNAL TABLE`
227 /// if not specified explicitly in the statement.
228 pub has_header: bool, default = true
229
230 /// Specifies whether newlines in (quoted) CSV values are supported.
231 ///
232 /// This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE`
233 /// if not specified explicitly in the statement.
234 ///
235 /// Parsing newlines in quoted values may be affected by execution behaviour such as
236 /// parallel file scanning. Setting this to `true` ensures that newlines in values are
237 /// parsed successfully, which may reduce performance.
238 pub newlines_in_values: bool, default = false
239 }
240}
241
242config_namespace! {
243 /// Options related to SQL parser
244 ///
245 /// See also: [`SessionConfig`]
246 ///
247 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
248 pub struct SqlParserOptions {
249 /// When set to true, SQL parser will parse float as decimal type
250 pub parse_float_as_decimal: bool, default = false
251
252 /// When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted)
253 pub enable_ident_normalization: bool, default = true
254
255 /// When set to true, SQL parser will normalize options value (convert value to lowercase).
256 /// Note that this option is ignored and will be removed in the future. All case-insensitive values
257 /// are normalized automatically.
258 pub enable_options_value_normalization: bool, warn = "`enable_options_value_normalization` is deprecated and ignored", default = false
259
260 /// Configure the SQL dialect used by DataFusion's parser; supported values include: Generic,
261 /// MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB and Databricks.
262 pub dialect: Dialect, default = Dialect::Generic
263 // no need to lowercase because `sqlparser::dialect_from_str`] is case-insensitive
264
265 /// If true, permit lengths for `VARCHAR` such as `VARCHAR(20)`, but
266 /// ignore the length. If false, error if a `VARCHAR` with a length is
267 /// specified. The Arrow type system does not have a notion of maximum
268 /// string length and thus DataFusion can not enforce such limits.
269 pub support_varchar_with_length: bool, default = true
270
271 /// If true, string types (VARCHAR, CHAR, Text, and String) are mapped to `Utf8View` during SQL planning.
272 /// If false, they are mapped to `Utf8`.
273 /// Default is true.
274 pub map_string_types_to_utf8view: bool, default = true
275
276 /// When set to true, the source locations relative to the original SQL
277 /// query (i.e. [`Span`](https://docs.rs/sqlparser/latest/sqlparser/tokenizer/struct.Span.html)) will be collected
278 /// and recorded in the logical plan nodes.
279 pub collect_spans: bool, default = false
280
281 /// Specifies the recursion depth limit when parsing complex SQL Queries
282 pub recursion_limit: usize, default = 50
283
284 /// Specifies the default null ordering for query results. There are 4 options:
285 /// - `nulls_max`: Nulls appear last in ascending order.
286 /// - `nulls_min`: Nulls appear first in ascending order.
287 /// - `nulls_first`: Nulls always be first in any order.
288 /// - `nulls_last`: Nulls always be last in any order.
289 ///
290 /// By default, `nulls_max` is used to follow Postgres's behavior.
291 /// postgres rule: <https://www.postgresql.org/docs/current/queries-order.html>
292 pub default_null_ordering: String, default = "nulls_max".to_string()
293 }
294}
295
296/// This is the SQL dialect used by DataFusion's parser.
297/// This mirrors [sqlparser::dialect::Dialect](https://docs.rs/sqlparser/latest/sqlparser/dialect/trait.Dialect.html)
298/// trait in order to offer an easier API and avoid adding the `sqlparser` dependency
299#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
300pub enum Dialect {
301 #[default]
302 Generic,
303 MySQL,
304 PostgreSQL,
305 Hive,
306 SQLite,
307 Snowflake,
308 Redshift,
309 MsSQL,
310 ClickHouse,
311 BigQuery,
312 Ansi,
313 DuckDB,
314 Databricks,
315}
316
317impl AsRef<str> for Dialect {
318 fn as_ref(&self) -> &str {
319 match self {
320 Self::Generic => "generic",
321 Self::MySQL => "mysql",
322 Self::PostgreSQL => "postgresql",
323 Self::Hive => "hive",
324 Self::SQLite => "sqlite",
325 Self::Snowflake => "snowflake",
326 Self::Redshift => "redshift",
327 Self::MsSQL => "mssql",
328 Self::ClickHouse => "clickhouse",
329 Self::BigQuery => "bigquery",
330 Self::Ansi => "ansi",
331 Self::DuckDB => "duckdb",
332 Self::Databricks => "databricks",
333 }
334 }
335}
336
337impl FromStr for Dialect {
338 type Err = DataFusionError;
339
340 fn from_str(s: &str) -> Result<Self, Self::Err> {
341 let value = match s.to_ascii_lowercase().as_str() {
342 "generic" => Self::Generic,
343 "mysql" => Self::MySQL,
344 "postgresql" | "postgres" => Self::PostgreSQL,
345 "hive" => Self::Hive,
346 "sqlite" => Self::SQLite,
347 "snowflake" => Self::Snowflake,
348 "redshift" => Self::Redshift,
349 "mssql" => Self::MsSQL,
350 "clickhouse" => Self::ClickHouse,
351 "bigquery" => Self::BigQuery,
352 "ansi" => Self::Ansi,
353 "duckdb" => Self::DuckDB,
354 "databricks" => Self::Databricks,
355 other => {
356 let error_message = format!(
357 "Invalid Dialect: {other}. Expected one of: Generic, MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, Ansi, DuckDB, Databricks"
358 );
359 return Err(DataFusionError::Configuration(error_message));
360 }
361 };
362 Ok(value)
363 }
364}
365
366impl ConfigField for Dialect {
367 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
368 v.some(key, self, description)
369 }
370
371 fn set(&mut self, _: &str, value: &str) -> Result<()> {
372 *self = Self::from_str(value)?;
373 Ok(())
374 }
375}
376
377impl Display for Dialect {
378 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
379 let str = self.as_ref();
380 write!(f, "{str}")
381 }
382}
383
384#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
385pub enum SpillCompression {
386 Zstd,
387 Lz4Frame,
388 #[default]
389 Uncompressed,
390}
391
392impl FromStr for SpillCompression {
393 type Err = DataFusionError;
394
395 fn from_str(s: &str) -> Result<Self, Self::Err> {
396 match s.to_ascii_lowercase().as_str() {
397 "zstd" => Ok(Self::Zstd),
398 "lz4_frame" => Ok(Self::Lz4Frame),
399 "uncompressed" | "" => Ok(Self::Uncompressed),
400 other => Err(DataFusionError::Configuration(format!(
401 "Invalid Spill file compression type: {other}. Expected one of: zstd, lz4_frame, uncompressed"
402 ))),
403 }
404 }
405}
406
407impl ConfigField for SpillCompression {
408 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
409 v.some(key, self, description)
410 }
411
412 fn set(&mut self, _: &str, value: &str) -> Result<()> {
413 *self = SpillCompression::from_str(value)?;
414 Ok(())
415 }
416}
417
418impl Display for SpillCompression {
419 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
420 let str = match self {
421 Self::Zstd => "zstd",
422 Self::Lz4Frame => "lz4_frame",
423 Self::Uncompressed => "uncompressed",
424 };
425 write!(f, "{str}")
426 }
427}
428
429impl From<SpillCompression> for Option<CompressionType> {
430 fn from(c: SpillCompression) -> Self {
431 match c {
432 SpillCompression::Zstd => Some(CompressionType::ZSTD),
433 SpillCompression::Lz4Frame => Some(CompressionType::LZ4_FRAME),
434 SpillCompression::Uncompressed => None,
435 }
436 }
437}
438
439config_namespace! {
440 /// Options related to query execution
441 ///
442 /// See also: [`SessionConfig`]
443 ///
444 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
445 pub struct ExecutionOptions {
446 /// Default batch size while creating new batches, it's especially useful for
447 /// buffer-in-memory batches since creating tiny batches would result in too much
448 /// metadata memory consumption
449 pub batch_size: usize, default = 8192
450
451 /// When set to true, record batches will be examined between each operator and
452 /// small batches will be coalesced into larger batches. This is helpful when there
453 /// are highly selective filters or joins that could produce tiny output batches. The
454 /// target batch size is determined by the configuration setting
455 pub coalesce_batches: bool, default = true
456
457 /// Should DataFusion collect statistics when first creating a table.
458 /// Has no effect after the table is created. Applies to the default
459 /// `ListingTableProvider` in DataFusion. Defaults to true.
460 pub collect_statistics: bool, default = true
461
462 /// Number of partitions for query execution. Increasing partitions can increase
463 /// concurrency.
464 ///
465 /// Defaults to the number of CPU cores on the system
466 pub target_partitions: usize, transform = ExecutionOptions::normalized_parallelism, default = get_available_parallelism()
467
468 /// The default time zone
469 ///
470 /// Some functions, e.g. `now` return timestamps in this time zone
471 pub time_zone: Option<String>, default = None
472
473 /// Parquet options
474 pub parquet: ParquetOptions, default = Default::default()
475
476 /// Fan-out during initial physical planning.
477 ///
478 /// This is mostly use to plan `UNION` children in parallel.
479 ///
480 /// Defaults to the number of CPU cores on the system
481 pub planning_concurrency: usize, transform = ExecutionOptions::normalized_parallelism, default = get_available_parallelism()
482
483 /// When set to true, skips verifying that the schema produced by
484 /// planning the input of `LogicalPlan::Aggregate` exactly matches the
485 /// schema of the input plan.
486 ///
487 /// When set to false, if the schema does not match exactly
488 /// (including nullability and metadata), a planning error will be raised.
489 ///
490 /// This is used to workaround bugs in the planner that are now caught by
491 /// the new schema verification step.
492 pub skip_physical_aggregate_schema_check: bool, default = false
493
494 /// Sets the compression codec used when spilling data to disk.
495 ///
496 /// Since datafusion writes spill files using the Arrow IPC Stream format,
497 /// only codecs supported by the Arrow IPC Stream Writer are allowed.
498 /// Valid values are: uncompressed, lz4_frame, zstd.
499 /// Note: lz4_frame offers faster (de)compression, but typically results in
500 /// larger spill files. In contrast, zstd achieves
501 /// higher compression ratios at the cost of slower (de)compression speed.
502 pub spill_compression: SpillCompression, default = SpillCompression::Uncompressed
503
504 /// Specifies the reserved memory for each spillable sort operation to
505 /// facilitate an in-memory merge.
506 ///
507 /// When a sort operation spills to disk, the in-memory data must be
508 /// sorted and merged before being written to a file. This setting reserves
509 /// a specific amount of memory for that in-memory sort/merge process.
510 ///
511 /// Note: This setting is irrelevant if the sort operation cannot spill
512 /// (i.e., if there's no `DiskManager` configured).
513 pub sort_spill_reservation_bytes: usize, default = 10 * 1024 * 1024
514
515 /// When sorting, below what size should data be concatenated
516 /// and sorted in a single RecordBatch rather than sorted in
517 /// batches and merged.
518 pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
519
520 /// Number of files to read in parallel when inferring schema and statistics
521 pub meta_fetch_concurrency: usize, default = 32
522
523 /// Guarantees a minimum level of output files running in parallel.
524 /// RecordBatches will be distributed in round robin fashion to each
525 /// parallel writer. Each writer is closed and a new file opened once
526 /// soft_max_rows_per_output_file is reached.
527 pub minimum_parallel_output_files: usize, default = 4
528
529 /// Target number of rows in output files when writing multiple.
530 /// This is a soft max, so it can be exceeded slightly. There also
531 /// will be one file smaller than the limit if the total
532 /// number of rows written is not roughly divisible by the soft max
533 pub soft_max_rows_per_output_file: usize, default = 50000000
534
535 /// This is the maximum number of RecordBatches buffered
536 /// for each output file being worked. Higher values can potentially
537 /// give faster write performance at the cost of higher peak
538 /// memory consumption
539 pub max_buffered_batches_per_output_file: usize, default = 2
540
541 /// Should sub directories be ignored when scanning directories for data
542 /// files. Defaults to true (ignores subdirectories), consistent with
543 /// Hive. Note that this setting does not affect reading partitioned
544 /// tables (e.g. `/table/year=2021/month=01/data.parquet`).
545 pub listing_table_ignore_subdirectory: bool, default = true
546
547 /// Should a `ListingTable` created through the `ListingTableFactory` infer table
548 /// partitions from Hive compliant directories. Defaults to true (partition columns are
549 /// inferred and will be represented in the table schema).
550 pub listing_table_factory_infer_partitions: bool, default = true
551
552 /// Should DataFusion support recursive CTEs
553 pub enable_recursive_ctes: bool, default = true
554
555 /// Attempt to eliminate sorts by packing & sorting files with non-overlapping
556 /// statistics into the same file groups.
557 /// Currently experimental
558 pub split_file_groups_by_statistics: bool, default = false
559
560 /// Should DataFusion keep the columns used for partition_by in the output RecordBatches
561 pub keep_partition_by_columns: bool, default = false
562
563 /// Aggregation ratio (number of distinct groups / number of input rows)
564 /// threshold for skipping partial aggregation. If the value is greater
565 /// then partial aggregation will skip aggregation for further input
566 pub skip_partial_aggregation_probe_ratio_threshold: f64, default = 0.8
567
568 /// Number of input rows partial aggregation partition should process, before
569 /// aggregation ratio check and trying to switch to skipping aggregation mode
570 pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000
571
572 /// Should DataFusion use row number estimates at the input to decide
573 /// whether increasing parallelism is beneficial or not. By default,
574 /// only exact row numbers (not estimates) are used for this decision.
575 /// Setting this flag to `true` will likely produce better plans.
576 /// if the source of statistics is accurate.
577 /// We plan to make this the default in the future.
578 pub use_row_number_estimates_to_optimize_partitioning: bool, default = false
579
580 /// Should DataFusion enforce batch size in joins or not. By default,
581 /// DataFusion will not enforce batch size in joins. Enforcing batch size
582 /// in joins can reduce memory usage when joining large
583 /// tables with a highly-selective join filter, but is also slightly slower.
584 pub enforce_batch_size_in_joins: bool, default = false
585
586 /// Size (bytes) of data buffer DataFusion uses when writing output files.
587 /// This affects the size of the data chunks that are uploaded to remote
588 /// object stores (e.g. AWS S3). If very large (>= 100 GiB) output files are being
589 /// written, it may be necessary to increase this size to avoid errors from
590 /// the remote end point.
591 pub objectstore_writer_buffer_size: usize, default = 10 * 1024 * 1024
592 }
593}
594
595config_namespace! {
596 /// Options for reading and writing parquet files
597 ///
598 /// See also: [`SessionConfig`]
599 ///
600 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
601 pub struct ParquetOptions {
602 // The following options affect reading parquet files
603
604 /// (reading) If true, reads the Parquet data page level metadata (the
605 /// Page Index), if present, to reduce the I/O and number of
606 /// rows decoded.
607 pub enable_page_index: bool, default = true
608
609 /// (reading) If true, the parquet reader attempts to skip entire row groups based
610 /// on the predicate in the query and the metadata (min/max values) stored in
611 /// the parquet file
612 pub pruning: bool, default = true
613
614 /// (reading) If true, the parquet reader skip the optional embedded metadata that may be in
615 /// the file Schema. This setting can help avoid schema conflicts when querying
616 /// multiple parquet files with schemas containing compatible types but different metadata
617 pub skip_metadata: bool, default = true
618
619 /// (reading) If specified, the parquet reader will try and fetch the last `size_hint`
620 /// bytes of the parquet file optimistically. If not specified, two reads are required:
621 /// One read to fetch the 8-byte parquet footer and
622 /// another to fetch the metadata length encoded in the footer
623 /// Default setting to 512 KiB, which should be sufficient for most parquet files,
624 /// it can reduce one I/O operation per parquet file. If the metadata is larger than
625 /// the hint, two reads will still be performed.
626 pub metadata_size_hint: Option<usize>, default = Some(512 * 1024)
627
628 /// (reading) If true, filter expressions are be applied during the parquet decoding operation to
629 /// reduce the number of rows decoded. This optimization is sometimes called "late materialization".
630 pub pushdown_filters: bool, default = false
631
632 /// (reading) If true, filter expressions evaluated during the parquet decoding operation
633 /// will be reordered heuristically to minimize the cost of evaluation. If false,
634 /// the filters are applied in the same order as written in the query
635 pub reorder_filters: bool, default = false
636
637 /// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
638 /// and `Binary/BinaryLarge` with `BinaryView`.
639 pub schema_force_view_types: bool, default = true
640
641 /// (reading) If true, parquet reader will read columns of
642 /// `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`.
643 ///
644 /// Parquet files generated by some legacy writers do not correctly set
645 /// the UTF8 flag for strings, causing string columns to be loaded as
646 /// BLOB instead.
647 pub binary_as_string: bool, default = false
648
649 /// (reading) If true, parquet reader will read columns of
650 /// physical type int96 as originating from a different resolution
651 /// than nanosecond. This is useful for reading data from systems like Spark
652 /// which stores microsecond resolution timestamps in an int96 allowing it
653 /// to write values with a larger date range than 64-bit timestamps with
654 /// nanosecond resolution.
655 pub coerce_int96: Option<String>, transform = str::to_lowercase, default = None
656
657 /// (reading) Use any available bloom filters when reading parquet files
658 pub bloom_filter_on_read: bool, default = true
659
660 /// (reading) The maximum predicate cache size, in bytes. When
661 /// `pushdown_filters` is enabled, sets the maximum memory used to cache
662 /// the results of predicate evaluation between filter evaluation and
663 /// output generation. Decreasing this value will reduce memory usage,
664 /// but may increase IO and CPU usage. None means use the default
665 /// parquet reader setting. 0 means no caching.
666 pub max_predicate_cache_size: Option<usize>, default = None
667
668 // The following options affect writing to parquet files
669 // and map to parquet::file::properties::WriterProperties
670
671 /// (writing) Sets best effort maximum size of data page in bytes
672 pub data_pagesize_limit: usize, default = 1024 * 1024
673
674 /// (writing) Sets write_batch_size in bytes
675 pub write_batch_size: usize, default = 1024
676
677 /// (writing) Sets parquet writer version
678 /// valid values are "1.0" and "2.0"
679 pub writer_version: String, default = "1.0".to_string()
680
681 /// (writing) Skip encoding the embedded arrow metadata in the KV_meta
682 ///
683 /// This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`.
684 /// Refer to <https://docs.rs/parquet/53.3.0/parquet/arrow/arrow_writer/struct.ArrowWriterOptions.html#method.with_skip_arrow_metadata>
685 pub skip_arrow_metadata: bool, default = false
686
687 /// (writing) Sets default parquet compression codec.
688 /// Valid values are: uncompressed, snappy, gzip(level),
689 /// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
690 /// These values are not case sensitive. If NULL, uses
691 /// default parquet writer setting
692 ///
693 /// Note that this default setting is not the same as
694 /// the default parquet writer setting.
695 pub compression: Option<String>, transform = str::to_lowercase, default = Some("zstd(3)".into())
696
697 /// (writing) Sets if dictionary encoding is enabled. If NULL, uses
698 /// default parquet writer setting
699 pub dictionary_enabled: Option<bool>, default = Some(true)
700
701 /// (writing) Sets best effort maximum dictionary page size, in bytes
702 pub dictionary_page_size_limit: usize, default = 1024 * 1024
703
704 /// (writing) Sets if statistics are enabled for any column
705 /// Valid values are: "none", "chunk", and "page"
706 /// These values are not case sensitive. If NULL, uses
707 /// default parquet writer setting
708 pub statistics_enabled: Option<String>, transform = str::to_lowercase, default = Some("page".into())
709
710 /// (writing) Target maximum number of rows in each row group (defaults to 1M
711 /// rows). Writing larger row groups requires more memory to write, but
712 /// can get better compression and be faster to read.
713 pub max_row_group_size: usize, default = 1024 * 1024
714
715 /// (writing) Sets "created by" property
716 pub created_by: String, default = concat!("datafusion version ", env!("CARGO_PKG_VERSION")).into()
717
718 /// (writing) Sets column index truncate length
719 pub column_index_truncate_length: Option<usize>, default = Some(64)
720
721 /// (writing) Sets statistics truncate length. If NULL, uses
722 /// default parquet writer setting
723 pub statistics_truncate_length: Option<usize>, default = Some(64)
724
725 /// (writing) Sets best effort maximum number of rows in data page
726 pub data_page_row_count_limit: usize, default = 20_000
727
728 /// (writing) Sets default encoding for any column.
729 /// Valid values are: plain, plain_dictionary, rle,
730 /// bit_packed, delta_binary_packed, delta_length_byte_array,
731 /// delta_byte_array, rle_dictionary, and byte_stream_split.
732 /// These values are not case sensitive. If NULL, uses
733 /// default parquet writer setting
734 pub encoding: Option<String>, transform = str::to_lowercase, default = None
735
736 /// (writing) Write bloom filters for all columns when creating parquet files
737 pub bloom_filter_on_write: bool, default = false
738
739 /// (writing) Sets bloom filter false positive probability. If NULL, uses
740 /// default parquet writer setting
741 pub bloom_filter_fpp: Option<f64>, default = None
742
743 /// (writing) Sets bloom filter number of distinct values. If NULL, uses
744 /// default parquet writer setting
745 pub bloom_filter_ndv: Option<u64>, default = None
746
747 /// (writing) Controls whether DataFusion will attempt to speed up writing
748 /// parquet files by serializing them in parallel. Each column
749 /// in each row group in each output file are serialized in parallel
750 /// leveraging a maximum possible core count of n_files*n_row_groups*n_columns.
751 pub allow_single_file_parallelism: bool, default = true
752
753 /// (writing) By default parallel parquet writer is tuned for minimum
754 /// memory usage in a streaming execution plan. You may see
755 /// a performance benefit when writing large parquet files
756 /// by increasing maximum_parallel_row_group_writers and
757 /// maximum_buffered_record_batches_per_stream if your system
758 /// has idle cores and can tolerate additional memory usage.
759 /// Boosting these values is likely worthwhile when
760 /// writing out already in-memory data, such as from a cached
761 /// data frame.
762 pub maximum_parallel_row_group_writers: usize, default = 1
763
764 /// (writing) By default parallel parquet writer is tuned for minimum
765 /// memory usage in a streaming execution plan. You may see
766 /// a performance benefit when writing large parquet files
767 /// by increasing maximum_parallel_row_group_writers and
768 /// maximum_buffered_record_batches_per_stream if your system
769 /// has idle cores and can tolerate additional memory usage.
770 /// Boosting these values is likely worthwhile when
771 /// writing out already in-memory data, such as from a cached
772 /// data frame.
773 pub maximum_buffered_record_batches_per_stream: usize, default = 2
774 }
775}
776
777config_namespace! {
778 /// Options for configuring Parquet Modular Encryption
779 ///
780 /// To use Parquet encryption, you must enable the `parquet_encryption` feature flag, as it is not activated by default.
781 pub struct ParquetEncryptionOptions {
782 /// Optional file decryption properties
783 pub file_decryption: Option<ConfigFileDecryptionProperties>, default = None
784
785 /// Optional file encryption properties
786 pub file_encryption: Option<ConfigFileEncryptionProperties>, default = None
787
788 /// Identifier for the encryption factory to use to create file encryption and decryption properties.
789 /// Encryption factories can be registered in the runtime environment with
790 /// `RuntimeEnv::register_parquet_encryption_factory`.
791 pub factory_id: Option<String>, default = None
792
793 /// Any encryption factory specific options
794 pub factory_options: EncryptionFactoryOptions, default = EncryptionFactoryOptions::default()
795 }
796}
797
798impl ParquetEncryptionOptions {
799 /// Specify the encryption factory to use for Parquet modular encryption, along with its configuration
800 pub fn configure_factory(
801 &mut self,
802 factory_id: &str,
803 config: &impl ExtensionOptions,
804 ) {
805 self.factory_id = Some(factory_id.to_owned());
806 self.factory_options.options.clear();
807 for entry in config.entries() {
808 if let Some(value) = entry.value {
809 self.factory_options.options.insert(entry.key, value);
810 }
811 }
812 }
813}
814
815config_namespace! {
816 /// Options related to query optimization
817 ///
818 /// See also: [`SessionConfig`]
819 ///
820 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
821 pub struct OptimizerOptions {
822 /// When set to true, the optimizer will push a limit operation into
823 /// grouped aggregations which have no aggregate expressions, as a soft limit,
824 /// emitting groups once the limit is reached, before all rows in the group are read.
825 pub enable_distinct_aggregation_soft_limit: bool, default = true
826
827 /// When set to true, the physical plan optimizer will try to add round robin
828 /// repartitioning to increase parallelism to leverage more CPU cores
829 pub enable_round_robin_repartition: bool, default = true
830
831 /// When set to true, the optimizer will attempt to perform limit operations
832 /// during aggregations, if possible
833 pub enable_topk_aggregation: bool, default = true
834
835 /// When set to true, the optimizer will attempt to push limit operations
836 /// past window functions, if possible
837 pub enable_window_limits: bool, default = true
838
839 /// When set to true, the optimizer will attempt to push down TopK dynamic filters
840 /// into the file scan phase.
841 pub enable_topk_dynamic_filter_pushdown: bool, default = true
842
843 /// When set to true, the optimizer will attempt to push down Join dynamic filters
844 /// into the file scan phase.
845 pub enable_join_dynamic_filter_pushdown: bool, default = true
846
847 /// When set to true attempts to push down dynamic filters generated by operators (topk & join) into the file scan phase.
848 /// For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer
849 /// will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans.
850 /// This means that if we already have 10 timestamps in the year 2025
851 /// any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan.
852 /// The config will suppress `enable_join_dynamic_filter_pushdown` & `enable_topk_dynamic_filter_pushdown`
853 /// So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden.
854 pub enable_dynamic_filter_pushdown: bool, default = true
855
856 /// When set to true, the optimizer will insert filters before a join between
857 /// a nullable and non-nullable column to filter out nulls on the nullable side. This
858 /// filter can add additional overhead when the file format does not fully support
859 /// predicate push down.
860 pub filter_null_join_keys: bool, default = false
861
862 /// Should DataFusion repartition data using the aggregate keys to execute aggregates
863 /// in parallel using the provided `target_partitions` level
864 pub repartition_aggregations: bool, default = true
865
866 /// Minimum total files size in bytes to perform file scan repartitioning.
867 pub repartition_file_min_size: usize, default = 10 * 1024 * 1024
868
869 /// Should DataFusion repartition data using the join keys to execute joins in parallel
870 /// using the provided `target_partitions` level
871 pub repartition_joins: bool, default = true
872
873 /// Should DataFusion allow symmetric hash joins for unbounded data sources even when
874 /// its inputs do not have any ordering or filtering If the flag is not enabled,
875 /// the SymmetricHashJoin operator will be unable to prune its internal buffers,
876 /// resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right,
877 /// RightAnti, and RightSemi - being produced only at the end of the execution.
878 /// This is not typical in stream processing. Additionally, without proper design for
879 /// long runner execution, all types of joins may encounter out-of-memory errors.
880 pub allow_symmetric_joins_without_pruning: bool, default = true
881
882 /// When set to `true`, datasource partitions will be repartitioned to achieve maximum parallelism.
883 /// This applies to both in-memory partitions and FileSource's file groups (1 group is 1 partition).
884 ///
885 /// For FileSources, only Parquet and CSV formats are currently supported.
886 ///
887 /// If set to `true` for a FileSource, all files will be repartitioned evenly (i.e., a single large file
888 /// might be partitioned into smaller chunks) for parallel scanning.
889 /// If set to `false` for a FileSource, different files will be read in parallel, but repartitioning won't
890 /// happen within a single file.
891 ///
892 /// If set to `true` for an in-memory source, all memtable's partitions will have their batches
893 /// repartitioned evenly to the desired number of `target_partitions`. Repartitioning can change
894 /// the total number of partitions and batches per partition, but does not slice the initial
895 /// record tables provided to the MemTable on creation.
896 pub repartition_file_scans: bool, default = true
897
898 /// Should DataFusion repartition data using the partitions keys to execute window
899 /// functions in parallel using the provided `target_partitions` level
900 pub repartition_windows: bool, default = true
901
902 /// Should DataFusion execute sorts in a per-partition fashion and merge
903 /// afterwards instead of coalescing first and sorting globally.
904 /// With this flag is enabled, plans in the form below
905 ///
906 /// ```text
907 /// "SortExec: [a@0 ASC]",
908 /// " CoalescePartitionsExec",
909 /// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
910 /// ```
911 /// would turn into the plan below which performs better in multithreaded environments
912 ///
913 /// ```text
914 /// "SortPreservingMergeExec: [a@0 ASC]",
915 /// " SortExec: [a@0 ASC]",
916 /// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
917 /// ```
918 pub repartition_sorts: bool, default = true
919
920 /// When true, DataFusion will opportunistically remove sorts when the data is already sorted,
921 /// (i.e. setting `preserve_order` to true on `RepartitionExec` and
922 /// using `SortPreservingMergeExec`)
923 ///
924 /// When false, DataFusion will maximize plan parallelism using
925 /// `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
926 pub prefer_existing_sort: bool, default = false
927
928 /// When set to true, the logical plan optimizer will produce warning
929 /// messages if any optimization rules produce errors and then proceed to the next
930 /// rule. When set to false, any rules that produce errors will cause the query to fail
931 pub skip_failed_rules: bool, default = false
932
933 /// Number of times that the optimizer will attempt to optimize the plan
934 pub max_passes: usize, default = 3
935
936 /// When set to true, the physical plan optimizer will run a top down
937 /// process to reorder the join keys
938 pub top_down_join_key_reordering: bool, default = true
939
940 /// When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin.
941 /// HashJoin can work more efficiently than SortMergeJoin but consumes more memory
942 pub prefer_hash_join: bool, default = true
943
944 /// When set to true, piecewise merge join is enabled. PiecewiseMergeJoin is currently
945 /// experimental. Physical planner will opt for PiecewiseMergeJoin when there is only
946 /// one range filter.
947 pub enable_piecewise_merge_join: bool, default = false
948
949 /// The maximum estimated size in bytes for one input side of a HashJoin
950 /// will be collected into a single partition
951 pub hash_join_single_partition_threshold: usize, default = 1024 * 1024
952
953 /// The maximum estimated size in rows for one input side of a HashJoin
954 /// will be collected into a single partition
955 pub hash_join_single_partition_threshold_rows: usize, default = 1024 * 128
956
957 /// The default filter selectivity used by Filter Statistics
958 /// when an exact selectivity cannot be determined. Valid values are
959 /// between 0 (no selectivity) and 100 (all rows are selected).
960 pub default_filter_selectivity: u8, default = 20
961
962 /// When set to true, the optimizer will not attempt to convert Union to Interleave
963 pub prefer_existing_union: bool, default = false
964
965 /// When set to true, if the returned type is a view type
966 /// then the output will be coerced to a non-view.
967 /// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
968 pub expand_views_at_output: bool, default = false
969 }
970}
971
972config_namespace! {
973 /// Options controlling explain output
974 ///
975 /// See also: [`SessionConfig`]
976 ///
977 /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
978 pub struct ExplainOptions {
979 /// When set to true, the explain statement will only print logical plans
980 pub logical_plan_only: bool, default = false
981
982 /// When set to true, the explain statement will only print physical plans
983 pub physical_plan_only: bool, default = false
984
985 /// When set to true, the explain statement will print operator statistics
986 /// for physical plans
987 pub show_statistics: bool, default = false
988
989 /// When set to true, the explain statement will print the partition sizes
990 pub show_sizes: bool, default = true
991
992 /// When set to true, the explain statement will print schema information
993 pub show_schema: bool, default = false
994
995 /// Display format of explain. Default is "indent".
996 /// When set to "tree", it will print the plan in a tree-rendered format.
997 pub format: ExplainFormat, default = ExplainFormat::Indent
998
999 /// (format=tree only) Maximum total width of the rendered tree.
1000 /// When set to 0, the tree will have no width limit.
1001 pub tree_maximum_render_width: usize, default = 240
1002
1003 /// Verbosity level for "EXPLAIN ANALYZE". Default is "dev"
1004 /// "summary" shows common metrics for high-level insights.
1005 /// "dev" provides deep operator-level introspection for developers.
1006 pub analyze_level: ExplainAnalyzeLevel, default = ExplainAnalyzeLevel::Dev
1007 }
1008}
1009
1010impl ExecutionOptions {
1011 /// Returns the correct parallelism based on the provided `value`.
1012 /// If `value` is `"0"`, returns the default available parallelism, computed with
1013 /// `get_available_parallelism`. Otherwise, returns `value`.
1014 fn normalized_parallelism(value: &str) -> String {
1015 if value.parse::<usize>() == Ok(0) {
1016 get_available_parallelism().to_string()
1017 } else {
1018 value.to_owned()
1019 }
1020 }
1021}
1022
1023config_namespace! {
1024 /// Options controlling the format of output when printing record batches
1025 /// Copies [`arrow::util::display::FormatOptions`]
1026 pub struct FormatOptions {
1027 /// If set to `true` any formatting errors will be written to the output
1028 /// instead of being converted into a [`std::fmt::Error`]
1029 pub safe: bool, default = true
1030 /// Format string for nulls
1031 pub null: String, default = "".into()
1032 /// Date format for date arrays
1033 pub date_format: Option<String>, default = Some("%Y-%m-%d".to_string())
1034 /// Format for DateTime arrays
1035 pub datetime_format: Option<String>, default = Some("%Y-%m-%dT%H:%M:%S%.f".to_string())
1036 /// Timestamp format for timestamp arrays
1037 pub timestamp_format: Option<String>, default = Some("%Y-%m-%dT%H:%M:%S%.f".to_string())
1038 /// Timestamp format for timestamp with timezone arrays. When `None`, ISO 8601 format is used.
1039 pub timestamp_tz_format: Option<String>, default = None
1040 /// Time format for time arrays
1041 pub time_format: Option<String>, default = Some("%H:%M:%S%.f".to_string())
1042 /// Duration format. Can be either `"pretty"` or `"ISO8601"`
1043 pub duration_format: String, transform = str::to_lowercase, default = "pretty".into()
1044 /// Show types in visual representation batches
1045 pub types_info: bool, default = false
1046 }
1047}
1048
1049impl<'a> TryInto<arrow::util::display::FormatOptions<'a>> for &'a FormatOptions {
1050 type Error = DataFusionError;
1051 fn try_into(self) -> Result<arrow::util::display::FormatOptions<'a>> {
1052 let duration_format = match self.duration_format.as_str() {
1053 "pretty" => arrow::util::display::DurationFormat::Pretty,
1054 "iso8601" => arrow::util::display::DurationFormat::ISO8601,
1055 _ => {
1056 return _config_err!(
1057 "Invalid duration format: {}. Valid values are pretty or iso8601",
1058 self.duration_format
1059 )
1060 }
1061 };
1062
1063 Ok(arrow::util::display::FormatOptions::new()
1064 .with_display_error(self.safe)
1065 .with_null(&self.null)
1066 .with_date_format(self.date_format.as_deref())
1067 .with_datetime_format(self.datetime_format.as_deref())
1068 .with_timestamp_format(self.timestamp_format.as_deref())
1069 .with_timestamp_tz_format(self.timestamp_tz_format.as_deref())
1070 .with_time_format(self.time_format.as_deref())
1071 .with_duration_format(duration_format)
1072 .with_types_info(self.types_info))
1073 }
1074}
1075
1076/// A key value pair, with a corresponding description
1077#[derive(Debug, Hash, PartialEq, Eq)]
1078pub struct ConfigEntry {
1079 /// A unique string to identify this config value
1080 pub key: String,
1081
1082 /// The value if any
1083 pub value: Option<String>,
1084
1085 /// A description of this configuration entry
1086 pub description: &'static str,
1087}
1088
1089/// Configuration options struct, able to store both built-in configuration and custom options
1090#[derive(Debug, Clone, Default)]
1091#[non_exhaustive]
1092pub struct ConfigOptions {
1093 /// Catalog options
1094 pub catalog: CatalogOptions,
1095 /// Execution options
1096 pub execution: ExecutionOptions,
1097 /// Optimizer options
1098 pub optimizer: OptimizerOptions,
1099 /// SQL parser options
1100 pub sql_parser: SqlParserOptions,
1101 /// Explain options
1102 pub explain: ExplainOptions,
1103 /// Optional extensions registered using [`Extensions::insert`]
1104 pub extensions: Extensions,
1105 /// Formatting options when printing batches
1106 pub format: FormatOptions,
1107}
1108
1109impl ConfigField for ConfigOptions {
1110 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1111 // Extensions are handled in the public `ConfigOptions::set`
1112 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1113 match key {
1114 "catalog" => self.catalog.set(rem, value),
1115 "execution" => self.execution.set(rem, value),
1116 "optimizer" => self.optimizer.set(rem, value),
1117 "explain" => self.explain.set(rem, value),
1118 "sql_parser" => self.sql_parser.set(rem, value),
1119 "format" => self.format.set(rem, value),
1120 _ => _config_err!("Config value \"{key}\" not found on ConfigOptions"),
1121 }
1122 }
1123
1124 fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1125 self.catalog.visit(v, "datafusion.catalog", "");
1126 self.execution.visit(v, "datafusion.execution", "");
1127 self.optimizer.visit(v, "datafusion.optimizer", "");
1128 self.explain.visit(v, "datafusion.explain", "");
1129 self.sql_parser.visit(v, "datafusion.sql_parser", "");
1130 self.format.visit(v, "datafusion.format", "");
1131 }
1132}
1133
1134impl ConfigOptions {
1135 /// Creates a new [`ConfigOptions`] with default values
1136 pub fn new() -> Self {
1137 Self::default()
1138 }
1139
1140 /// Set extensions to provided value
1141 pub fn with_extensions(mut self, extensions: Extensions) -> Self {
1142 self.extensions = extensions;
1143 self
1144 }
1145
1146 /// Set a configuration option
1147 pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
1148 let Some((prefix, key)) = key.split_once('.') else {
1149 return _config_err!("could not find config namespace for key \"{key}\"");
1150 };
1151
1152 if prefix == "datafusion" {
1153 if key == "optimizer.enable_dynamic_filter_pushdown" {
1154 let bool_value = value.parse::<bool>().map_err(|e| {
1155 DataFusionError::Configuration(format!(
1156 "Failed to parse '{value}' as bool: {e}",
1157 ))
1158 })?;
1159
1160 {
1161 self.optimizer.enable_dynamic_filter_pushdown = bool_value;
1162 self.optimizer.enable_topk_dynamic_filter_pushdown = bool_value;
1163 self.optimizer.enable_join_dynamic_filter_pushdown = bool_value;
1164 }
1165 return Ok(());
1166 }
1167 return ConfigField::set(self, key, value);
1168 }
1169
1170 let Some(e) = self.extensions.0.get_mut(prefix) else {
1171 return _config_err!("Could not find config namespace \"{prefix}\"");
1172 };
1173 e.0.set(key, value)
1174 }
1175
1176 /// Create new [`ConfigOptions`], taking values from environment variables
1177 /// where possible.
1178 ///
1179 /// For example, to configure `datafusion.execution.batch_size`
1180 /// ([`ExecutionOptions::batch_size`]) you would set the
1181 /// `DATAFUSION_EXECUTION_BATCH_SIZE` environment variable.
1182 ///
1183 /// The name of the environment variable is the option's key, transformed to
1184 /// uppercase and with periods replaced with underscores.
1185 ///
1186 /// Values are parsed according to the [same rules used in casts from
1187 /// Utf8](https://docs.rs/arrow/latest/arrow/compute/kernels/cast/fn.cast.html).
1188 ///
1189 /// If the value in the environment variable cannot be cast to the type of
1190 /// the configuration option, the default value will be used instead and a
1191 /// warning emitted. Environment variables are read when this method is
1192 /// called, and are not re-read later.
1193 pub fn from_env() -> Result<Self> {
1194 struct Visitor(Vec<String>);
1195
1196 impl Visit for Visitor {
1197 fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
1198 self.0.push(key.to_string())
1199 }
1200
1201 fn none(&mut self, key: &str, _: &'static str) {
1202 self.0.push(key.to_string())
1203 }
1204 }
1205
1206 // Extract the names of all fields and then look up the corresponding
1207 // environment variables. This isn't hugely efficient but avoids
1208 // ambiguity between `a.b` and `a_b` which would both correspond
1209 // to an environment variable of `A_B`
1210
1211 let mut keys = Visitor(vec![]);
1212 let mut ret = Self::default();
1213 ret.visit(&mut keys, "datafusion", "");
1214
1215 for key in keys.0 {
1216 let env = key.to_uppercase().replace('.', "_");
1217 if let Some(var) = std::env::var_os(env) {
1218 let value = var.to_string_lossy();
1219 log::info!("Set {key} to {value} from the environment variable");
1220 ret.set(&key, value.as_ref())?;
1221 }
1222 }
1223
1224 Ok(ret)
1225 }
1226
1227 /// Create new ConfigOptions struct, taking values from a string hash map.
1228 ///
1229 /// Only the built-in configurations will be extracted from the hash map
1230 /// and other key value pairs will be ignored.
1231 pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
1232 struct Visitor(Vec<String>);
1233
1234 impl Visit for Visitor {
1235 fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
1236 self.0.push(key.to_string())
1237 }
1238
1239 fn none(&mut self, key: &str, _: &'static str) {
1240 self.0.push(key.to_string())
1241 }
1242 }
1243
1244 let mut keys = Visitor(vec![]);
1245 let mut ret = Self::default();
1246 ret.visit(&mut keys, "datafusion", "");
1247
1248 for key in keys.0 {
1249 if let Some(var) = settings.get(&key) {
1250 ret.set(&key, var)?;
1251 }
1252 }
1253
1254 Ok(ret)
1255 }
1256
1257 /// Returns the [`ConfigEntry`] stored within this [`ConfigOptions`]
1258 pub fn entries(&self) -> Vec<ConfigEntry> {
1259 struct Visitor(Vec<ConfigEntry>);
1260
1261 impl Visit for Visitor {
1262 fn some<V: Display>(
1263 &mut self,
1264 key: &str,
1265 value: V,
1266 description: &'static str,
1267 ) {
1268 self.0.push(ConfigEntry {
1269 key: key.to_string(),
1270 value: Some(value.to_string()),
1271 description,
1272 })
1273 }
1274
1275 fn none(&mut self, key: &str, description: &'static str) {
1276 self.0.push(ConfigEntry {
1277 key: key.to_string(),
1278 value: None,
1279 description,
1280 })
1281 }
1282 }
1283
1284 let mut v = Visitor(vec![]);
1285 self.visit(&mut v, "datafusion", "");
1286
1287 v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
1288 v.0
1289 }
1290
1291 /// Generate documentation that can be included in the user guide
1292 pub fn generate_config_markdown() -> String {
1293 use std::fmt::Write as _;
1294
1295 let mut s = Self::default();
1296
1297 // Normalize for display
1298 s.execution.target_partitions = 0;
1299 s.execution.planning_concurrency = 0;
1300
1301 let mut docs = "| key | default | description |\n".to_string();
1302 docs += "|-----|---------|-------------|\n";
1303 let mut entries = s.entries();
1304 entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
1305
1306 for entry in s.entries() {
1307 let _ = writeln!(
1308 &mut docs,
1309 "| {} | {} | {} |",
1310 entry.key,
1311 entry.value.as_deref().unwrap_or("NULL"),
1312 entry.description
1313 );
1314 }
1315 docs
1316 }
1317}
1318
1319/// [`ConfigExtension`] provides a mechanism to store third-party configuration
1320/// within DataFusion [`ConfigOptions`]
1321///
1322/// This mechanism can be used to pass configuration to user defined functions
1323/// or optimizer passes
1324///
1325/// # Example
1326/// ```
1327/// use datafusion_common::{
1328/// config::ConfigExtension, config::ConfigOptions, extensions_options,
1329/// };
1330/// // Define a new configuration struct using the `extensions_options` macro
1331/// extensions_options! {
1332/// /// My own config options.
1333/// pub struct MyConfig {
1334/// /// Should "foo" be replaced by "bar"?
1335/// pub foo_to_bar: bool, default = true
1336///
1337/// /// How many "baz" should be created?
1338/// pub baz_count: usize, default = 1337
1339/// }
1340/// }
1341///
1342/// impl ConfigExtension for MyConfig {
1343/// const PREFIX: &'static str = "my_config";
1344/// }
1345///
1346/// // set up config struct and register extension
1347/// let mut config = ConfigOptions::default();
1348/// config.extensions.insert(MyConfig::default());
1349///
1350/// // overwrite config default
1351/// config.set("my_config.baz_count", "42").unwrap();
1352///
1353/// // check config state
1354/// let my_config = config.extensions.get::<MyConfig>().unwrap();
1355/// assert!(my_config.foo_to_bar,);
1356/// assert_eq!(my_config.baz_count, 42,);
1357/// ```
1358///
1359/// # Note:
1360/// Unfortunately associated constants are not currently object-safe, and so this
1361/// extends the object-safe [`ExtensionOptions`]
1362pub trait ConfigExtension: ExtensionOptions {
1363 /// Configuration namespace prefix to use
1364 ///
1365 /// All values under this will be prefixed with `$PREFIX + "."`
1366 const PREFIX: &'static str;
1367}
1368
1369/// An object-safe API for storing arbitrary configuration.
1370///
1371/// See [`ConfigExtension`] for user defined configuration
1372pub trait ExtensionOptions: Send + Sync + fmt::Debug + 'static {
1373 /// Return `self` as [`Any`]
1374 ///
1375 /// This is needed until trait upcasting is stabilized
1376 fn as_any(&self) -> &dyn Any;
1377
1378 /// Return `self` as [`Any`]
1379 ///
1380 /// This is needed until trait upcasting is stabilized
1381 fn as_any_mut(&mut self) -> &mut dyn Any;
1382
1383 /// Return a deep clone of this [`ExtensionOptions`]
1384 ///
1385 /// It is important this does not share mutable state to avoid consistency issues
1386 /// with configuration changing whilst queries are executing
1387 fn cloned(&self) -> Box<dyn ExtensionOptions>;
1388
1389 /// Set the given `key`, `value` pair
1390 fn set(&mut self, key: &str, value: &str) -> Result<()>;
1391
1392 /// Returns the [`ConfigEntry`] stored in this [`ExtensionOptions`]
1393 fn entries(&self) -> Vec<ConfigEntry>;
1394}
1395
1396/// A type-safe container for [`ConfigExtension`]
1397#[derive(Debug, Default, Clone)]
1398pub struct Extensions(BTreeMap<&'static str, ExtensionBox>);
1399
1400impl Extensions {
1401 /// Create a new, empty [`Extensions`]
1402 pub fn new() -> Self {
1403 Self(BTreeMap::new())
1404 }
1405
1406 /// Registers a [`ConfigExtension`] with this [`ConfigOptions`]
1407 pub fn insert<T: ConfigExtension>(&mut self, extension: T) {
1408 assert_ne!(T::PREFIX, "datafusion");
1409 let e = ExtensionBox(Box::new(extension));
1410 self.0.insert(T::PREFIX, e);
1411 }
1412
1413 /// Retrieves the extension of the given type if any
1414 pub fn get<T: ConfigExtension>(&self) -> Option<&T> {
1415 self.0.get(T::PREFIX)?.0.as_any().downcast_ref()
1416 }
1417
1418 /// Retrieves the extension of the given type if any
1419 pub fn get_mut<T: ConfigExtension>(&mut self) -> Option<&mut T> {
1420 let e = self.0.get_mut(T::PREFIX)?;
1421 e.0.as_any_mut().downcast_mut()
1422 }
1423}
1424
1425#[derive(Debug)]
1426struct ExtensionBox(Box<dyn ExtensionOptions>);
1427
1428impl Clone for ExtensionBox {
1429 fn clone(&self) -> Self {
1430 Self(self.0.cloned())
1431 }
1432}
1433
1434/// A trait implemented by `config_namespace` and for field types that provides
1435/// the ability to walk and mutate the configuration tree
1436pub trait ConfigField {
1437 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str);
1438
1439 fn set(&mut self, key: &str, value: &str) -> Result<()>;
1440}
1441
1442impl<F: ConfigField + Default> ConfigField for Option<F> {
1443 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1444 match self {
1445 Some(s) => s.visit(v, key, description),
1446 None => v.none(key, description),
1447 }
1448 }
1449
1450 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1451 self.get_or_insert_with(Default::default).set(key, value)
1452 }
1453}
1454
1455/// Default transformation to parse a [`ConfigField`] for a string.
1456///
1457/// This uses [`FromStr`] to parse the data.
1458pub fn default_config_transform<T>(input: &str) -> Result<T>
1459where
1460 T: FromStr,
1461 <T as FromStr>::Err: Sync + Send + Error + 'static,
1462{
1463 input.parse().map_err(|e| {
1464 DataFusionError::Context(
1465 format!(
1466 "Error parsing '{}' as {}",
1467 input,
1468 std::any::type_name::<T>()
1469 ),
1470 Box::new(DataFusionError::External(Box::new(e))),
1471 )
1472 })
1473}
1474
1475/// Macro that generates [`ConfigField`] for a given type.
1476///
1477/// # Usage
1478/// This always requires [`Display`] to be implemented for the given type.
1479///
1480/// There are two ways to invoke this macro. The first one uses
1481/// [`default_config_transform`]/[`FromStr`] to parse the data:
1482///
1483/// ```ignore
1484/// config_field(MyType);
1485/// ```
1486///
1487/// Note that the parsing error MUST implement [`std::error::Error`]!
1488///
1489/// Or you can specify how you want to parse an [`str`] into the type:
1490///
1491/// ```ignore
1492/// fn parse_it(s: &str) -> Result<MyType> {
1493/// ...
1494/// }
1495///
1496/// config_field(
1497/// MyType,
1498/// value => parse_it(value)
1499/// );
1500/// ```
1501#[macro_export]
1502macro_rules! config_field {
1503 ($t:ty) => {
1504 config_field!($t, value => $crate::config::default_config_transform(value)?);
1505 };
1506
1507 ($t:ty, $arg:ident => $transform:expr) => {
1508 impl $crate::config::ConfigField for $t {
1509 fn visit<V: $crate::config::Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1510 v.some(key, self, description)
1511 }
1512
1513 fn set(&mut self, _: &str, $arg: &str) -> $crate::error::Result<()> {
1514 *self = $transform;
1515 Ok(())
1516 }
1517 }
1518 };
1519}
1520
1521config_field!(String);
1522config_field!(bool, value => default_config_transform(value.to_lowercase().as_str())?);
1523config_field!(usize);
1524config_field!(f64);
1525config_field!(u64);
1526
1527impl ConfigField for u8 {
1528 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1529 v.some(key, self, description)
1530 }
1531
1532 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1533 if value.is_empty() {
1534 return Err(DataFusionError::Configuration(format!(
1535 "Input string for {key} key is empty"
1536 )));
1537 }
1538 // Check if the string is a valid number
1539 if let Ok(num) = value.parse::<u8>() {
1540 // TODO: Let's decide how we treat the numerical strings.
1541 *self = num;
1542 } else {
1543 let bytes = value.as_bytes();
1544 // Check if the first character is ASCII (single byte)
1545 if bytes.len() > 1 || !value.chars().next().unwrap().is_ascii() {
1546 return Err(DataFusionError::Configuration(format!(
1547 "Error parsing {value} as u8. Non-ASCII string provided"
1548 )));
1549 }
1550 *self = bytes[0];
1551 }
1552 Ok(())
1553 }
1554}
1555
1556impl ConfigField for CompressionTypeVariant {
1557 fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
1558 v.some(key, self, description)
1559 }
1560
1561 fn set(&mut self, _: &str, value: &str) -> Result<()> {
1562 *self = CompressionTypeVariant::from_str(value)?;
1563 Ok(())
1564 }
1565}
1566
1567/// An implementation trait used to recursively walk configuration
1568pub trait Visit {
1569 fn some<V: Display>(&mut self, key: &str, value: V, description: &'static str);
1570
1571 fn none(&mut self, key: &str, description: &'static str);
1572}
1573
1574/// Convenience macro to create [`ExtensionsOptions`].
1575///
1576/// The created structure implements the following traits:
1577///
1578/// - [`Clone`]
1579/// - [`Debug`]
1580/// - [`Default`]
1581/// - [`ExtensionOptions`]
1582///
1583/// # Usage
1584/// The syntax is:
1585///
1586/// ```text
1587/// extensions_options! {
1588/// /// Struct docs (optional).
1589/// [<vis>] struct <StructName> {
1590/// /// Field docs (optional)
1591/// [<vis>] <field_name>: <field_type>, default = <default_value>
1592///
1593/// ... more fields
1594/// }
1595/// }
1596/// ```
1597///
1598/// The placeholders are:
1599/// - `[<vis>]`: Optional visibility modifier like `pub` or `pub(crate)`.
1600/// - `<StructName>`: Struct name like `MyStruct`.
1601/// - `<field_name>`: Field name like `my_field`.
1602/// - `<field_type>`: Field type like `u8`.
1603/// - `<default_value>`: Default value matching the field type like `42`.
1604///
1605/// # Example
1606/// See also a full example on the [`ConfigExtension`] documentation
1607///
1608/// ```
1609/// use datafusion_common::extensions_options;
1610///
1611/// extensions_options! {
1612/// /// My own config options.
1613/// pub struct MyConfig {
1614/// /// Should "foo" be replaced by "bar"?
1615/// pub foo_to_bar: bool, default = true
1616///
1617/// /// How many "baz" should be created?
1618/// pub baz_count: usize, default = 1337
1619/// }
1620/// }
1621/// ```
1622///
1623///
1624/// [`Debug`]: std::fmt::Debug
1625/// [`ExtensionsOptions`]: crate::config::ExtensionOptions
1626#[macro_export]
1627macro_rules! extensions_options {
1628 (
1629 $(#[doc = $struct_d:tt])*
1630 $vis:vis struct $struct_name:ident {
1631 $(
1632 $(#[doc = $d:tt])*
1633 $field_vis:vis $field_name:ident : $field_type:ty, default = $default:expr
1634 )*$(,)*
1635 }
1636 ) => {
1637 $(#[doc = $struct_d])*
1638 #[derive(Debug, Clone)]
1639 #[non_exhaustive]
1640 $vis struct $struct_name{
1641 $(
1642 $(#[doc = $d])*
1643 $field_vis $field_name : $field_type,
1644 )*
1645 }
1646
1647 impl Default for $struct_name {
1648 fn default() -> Self {
1649 Self {
1650 $($field_name: $default),*
1651 }
1652 }
1653 }
1654
1655 impl $crate::config::ExtensionOptions for $struct_name {
1656 fn as_any(&self) -> &dyn ::std::any::Any {
1657 self
1658 }
1659
1660 fn as_any_mut(&mut self) -> &mut dyn ::std::any::Any {
1661 self
1662 }
1663
1664 fn cloned(&self) -> Box<dyn $crate::config::ExtensionOptions> {
1665 Box::new(self.clone())
1666 }
1667
1668 fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
1669 $crate::config::ConfigField::set(self, key, value)
1670 }
1671
1672 fn entries(&self) -> Vec<$crate::config::ConfigEntry> {
1673 struct Visitor(Vec<$crate::config::ConfigEntry>);
1674
1675 impl $crate::config::Visit for Visitor {
1676 fn some<V: std::fmt::Display>(
1677 &mut self,
1678 key: &str,
1679 value: V,
1680 description: &'static str,
1681 ) {
1682 self.0.push($crate::config::ConfigEntry {
1683 key: key.to_string(),
1684 value: Some(value.to_string()),
1685 description,
1686 })
1687 }
1688
1689 fn none(&mut self, key: &str, description: &'static str) {
1690 self.0.push($crate::config::ConfigEntry {
1691 key: key.to_string(),
1692 value: None,
1693 description,
1694 })
1695 }
1696 }
1697
1698 let mut v = Visitor(vec![]);
1699 // The prefix is not used for extensions.
1700 // The description is generated in ConfigField::visit.
1701 // We can just pass empty strings here.
1702 $crate::config::ConfigField::visit(self, &mut v, "", "");
1703 v.0
1704 }
1705 }
1706
1707 impl $crate::config::ConfigField for $struct_name {
1708 fn set(&mut self, key: &str, value: &str) -> $crate::error::Result<()> {
1709 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1710 match key {
1711 $(
1712 stringify!($field_name) => {
1713 // Safely apply deprecated attribute if present
1714 // $(#[allow(deprecated)])?
1715 {
1716 #[allow(deprecated)]
1717 self.$field_name.set(rem, value.as_ref())
1718 }
1719 },
1720 )*
1721 _ => return $crate::error::_config_err!(
1722 "Config value \"{}\" not found on {}", key, stringify!($struct_name)
1723 )
1724 }
1725 }
1726
1727 fn visit<V: $crate::config::Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1728 $(
1729 let key = stringify!($field_name).to_string();
1730 let desc = concat!($($d),*).trim();
1731 #[allow(deprecated)]
1732 self.$field_name.visit(v, key.as_str(), desc);
1733 )*
1734 }
1735 }
1736 }
1737}
1738
1739/// These file types have special built in behavior for configuration.
1740/// Use TableOptions::Extensions for configuring other file types.
1741#[derive(Debug, Clone)]
1742pub enum ConfigFileType {
1743 CSV,
1744 #[cfg(feature = "parquet")]
1745 PARQUET,
1746 JSON,
1747}
1748
1749/// Represents the configuration options available for handling different table formats within a data processing application.
1750/// This struct encompasses options for various file formats including CSV, Parquet, and JSON, allowing for flexible configuration
1751/// of parsing and writing behaviors specific to each format. Additionally, it supports extending functionality through custom extensions.
1752#[derive(Debug, Clone, Default)]
1753pub struct TableOptions {
1754 /// Configuration options for CSV file handling. This includes settings like the delimiter,
1755 /// quote character, and whether the first row is considered as headers.
1756 pub csv: CsvOptions,
1757
1758 /// Configuration options for Parquet file handling. This includes settings for compression,
1759 /// encoding, and other Parquet-specific file characteristics.
1760 pub parquet: TableParquetOptions,
1761
1762 /// Configuration options for JSON file handling.
1763 pub json: JsonOptions,
1764
1765 /// The current file format that the table operations should assume. This option allows
1766 /// for dynamic switching between the supported file types (e.g., CSV, Parquet, JSON).
1767 pub current_format: Option<ConfigFileType>,
1768
1769 /// Optional extensions that can be used to extend or customize the behavior of the table
1770 /// options. Extensions can be registered using `Extensions::insert` and might include
1771 /// custom file handling logic, additional configuration parameters, or other enhancements.
1772 pub extensions: Extensions,
1773}
1774
1775impl ConfigField for TableOptions {
1776 /// Visits configuration settings for the current file format, or all formats if none is selected.
1777 ///
1778 /// This method adapts the behavior based on whether a file format is currently selected in `current_format`.
1779 /// If a format is selected, it visits only the settings relevant to that format. Otherwise,
1780 /// it visits all available format settings.
1781 fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
1782 if let Some(file_type) = &self.current_format {
1783 match file_type {
1784 #[cfg(feature = "parquet")]
1785 ConfigFileType::PARQUET => self.parquet.visit(v, "format", ""),
1786 ConfigFileType::CSV => self.csv.visit(v, "format", ""),
1787 ConfigFileType::JSON => self.json.visit(v, "format", ""),
1788 }
1789 } else {
1790 self.csv.visit(v, "csv", "");
1791 self.parquet.visit(v, "parquet", "");
1792 self.json.visit(v, "json", "");
1793 }
1794 }
1795
1796 /// Sets a configuration value for a specific key within `TableOptions`.
1797 ///
1798 /// This method delegates setting configuration values to the specific file format configurations,
1799 /// based on the current format selected. If no format is selected, it returns an error.
1800 ///
1801 /// # Parameters
1802 ///
1803 /// * `key`: The configuration key specifying which setting to adjust, prefixed with the format (e.g., "format.delimiter")
1804 /// for CSV format.
1805 /// * `value`: The value to set for the specified configuration key.
1806 ///
1807 /// # Returns
1808 ///
1809 /// A result indicating success or an error if the key is not recognized, if a format is not specified,
1810 /// or if setting the configuration value fails for the specific format.
1811 fn set(&mut self, key: &str, value: &str) -> Result<()> {
1812 // Extensions are handled in the public `ConfigOptions::set`
1813 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
1814 match key {
1815 "format" => {
1816 let Some(format) = &self.current_format else {
1817 return _config_err!("Specify a format for TableOptions");
1818 };
1819 match format {
1820 #[cfg(feature = "parquet")]
1821 ConfigFileType::PARQUET => self.parquet.set(rem, value),
1822 ConfigFileType::CSV => self.csv.set(rem, value),
1823 ConfigFileType::JSON => self.json.set(rem, value),
1824 }
1825 }
1826 _ => _config_err!("Config value \"{key}\" not found on TableOptions"),
1827 }
1828 }
1829}
1830
1831impl TableOptions {
1832 /// Constructs a new instance of `TableOptions` with default settings.
1833 ///
1834 /// # Returns
1835 ///
1836 /// A new `TableOptions` instance with default configuration values.
1837 pub fn new() -> Self {
1838 Self::default()
1839 }
1840
1841 /// Creates a new `TableOptions` instance initialized with settings from a given session config.
1842 ///
1843 /// # Parameters
1844 ///
1845 /// * `config`: A reference to the session `ConfigOptions` from which to derive initial settings.
1846 ///
1847 /// # Returns
1848 ///
1849 /// A new `TableOptions` instance with settings applied from the session config.
1850 pub fn default_from_session_config(config: &ConfigOptions) -> Self {
1851 let initial = TableOptions::default();
1852 initial.combine_with_session_config(config)
1853 }
1854
1855 /// Updates the current `TableOptions` with settings from a given session config.
1856 ///
1857 /// # Parameters
1858 ///
1859 /// * `config`: A reference to the session `ConfigOptions` whose settings are to be applied.
1860 ///
1861 /// # Returns
1862 ///
1863 /// A new `TableOptions` instance with updated settings from the session config.
1864 #[must_use = "this method returns a new instance"]
1865 pub fn combine_with_session_config(&self, config: &ConfigOptions) -> Self {
1866 let mut clone = self.clone();
1867 clone.parquet.global = config.execution.parquet.clone();
1868 clone
1869 }
1870
1871 /// Sets the file format for the table.
1872 ///
1873 /// # Parameters
1874 ///
1875 /// * `format`: The file format to use (e.g., CSV, Parquet).
1876 pub fn set_config_format(&mut self, format: ConfigFileType) {
1877 self.current_format = Some(format);
1878 }
1879
1880 /// Sets the extensions for this `TableOptions` instance.
1881 ///
1882 /// # Parameters
1883 ///
1884 /// * `extensions`: The `Extensions` instance to set.
1885 ///
1886 /// # Returns
1887 ///
1888 /// A new `TableOptions` instance with the specified extensions applied.
1889 pub fn with_extensions(mut self, extensions: Extensions) -> Self {
1890 self.extensions = extensions;
1891 self
1892 }
1893
1894 /// Sets a specific configuration option.
1895 ///
1896 /// # Parameters
1897 ///
1898 /// * `key`: The configuration key (e.g., "format.delimiter").
1899 /// * `value`: The value to set for the specified key.
1900 ///
1901 /// # Returns
1902 ///
1903 /// A result indicating success or failure in setting the configuration option.
1904 pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
1905 let Some((prefix, _)) = key.split_once('.') else {
1906 return _config_err!("could not find config namespace for key \"{key}\"");
1907 };
1908
1909 if prefix == "format" {
1910 return ConfigField::set(self, key, value);
1911 }
1912
1913 if prefix == "execution" {
1914 return Ok(());
1915 }
1916
1917 let Some(e) = self.extensions.0.get_mut(prefix) else {
1918 return _config_err!("Could not find config namespace \"{prefix}\"");
1919 };
1920 e.0.set(key, value)
1921 }
1922
1923 /// Initializes a new `TableOptions` from a hash map of string settings.
1924 ///
1925 /// # Parameters
1926 ///
1927 /// * `settings`: A hash map where each key-value pair represents a configuration setting.
1928 ///
1929 /// # Returns
1930 ///
1931 /// A result containing the new `TableOptions` instance or an error if any setting could not be applied.
1932 pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
1933 let mut ret = Self::default();
1934 for (k, v) in settings {
1935 ret.set(k, v)?;
1936 }
1937
1938 Ok(ret)
1939 }
1940
1941 /// Modifies the current `TableOptions` instance with settings from a hash map.
1942 ///
1943 /// # Parameters
1944 ///
1945 /// * `settings`: A hash map where each key-value pair represents a configuration setting.
1946 ///
1947 /// # Returns
1948 ///
1949 /// A result indicating success or failure in applying the settings.
1950 pub fn alter_with_string_hash_map(
1951 &mut self,
1952 settings: &HashMap<String, String>,
1953 ) -> Result<()> {
1954 for (k, v) in settings {
1955 self.set(k, v)?;
1956 }
1957 Ok(())
1958 }
1959
1960 /// Retrieves all configuration entries from this `TableOptions`.
1961 ///
1962 /// # Returns
1963 ///
1964 /// A vector of `ConfigEntry` instances, representing all the configuration options within this `TableOptions`.
1965 pub fn entries(&self) -> Vec<ConfigEntry> {
1966 struct Visitor(Vec<ConfigEntry>);
1967
1968 impl Visit for Visitor {
1969 fn some<V: Display>(
1970 &mut self,
1971 key: &str,
1972 value: V,
1973 description: &'static str,
1974 ) {
1975 self.0.push(ConfigEntry {
1976 key: key.to_string(),
1977 value: Some(value.to_string()),
1978 description,
1979 })
1980 }
1981
1982 fn none(&mut self, key: &str, description: &'static str) {
1983 self.0.push(ConfigEntry {
1984 key: key.to_string(),
1985 value: None,
1986 description,
1987 })
1988 }
1989 }
1990
1991 let mut v = Visitor(vec![]);
1992 self.visit(&mut v, "format", "");
1993
1994 v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
1995 v.0
1996 }
1997}
1998
1999/// Options that control how Parquet files are read, including global options
2000/// that apply to all columns and optional column-specific overrides
2001///
2002/// Closely tied to [`ParquetWriterOptions`](crate::file_options::parquet_writer::ParquetWriterOptions).
2003/// Properties not included in [`TableParquetOptions`] may not be configurable at the external API
2004/// (e.g. sorting_columns).
2005#[derive(Clone, Default, Debug, PartialEq)]
2006pub struct TableParquetOptions {
2007 /// Global Parquet options that propagates to all columns.
2008 pub global: ParquetOptions,
2009 /// Column specific options. Default usage is parquet.XX::column.
2010 pub column_specific_options: HashMap<String, ParquetColumnOptions>,
2011 /// Additional file-level metadata to include. Inserted into the key_value_metadata
2012 /// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html).
2013 ///
2014 /// Multiple entries are permitted
2015 /// ```sql
2016 /// OPTIONS (
2017 /// 'format.metadata::key1' '',
2018 /// 'format.metadata::key2' 'value',
2019 /// 'format.metadata::key3' 'value has spaces',
2020 /// 'format.metadata::key4' 'value has special chars :: :',
2021 /// 'format.metadata::key_dupe' 'original will be overwritten',
2022 /// 'format.metadata::key_dupe' 'final'
2023 /// )
2024 /// ```
2025 pub key_value_metadata: HashMap<String, Option<String>>,
2026 /// Options for configuring Parquet modular encryption
2027 ///
2028 /// To use Parquet encryption, you must enable the `parquet_encryption` feature flag, as it is not activated by default.
2029 /// See ConfigFileEncryptionProperties and ConfigFileDecryptionProperties in datafusion/common/src/config.rs
2030 /// These can be set via 'format.crypto', for example:
2031 /// ```sql
2032 /// OPTIONS (
2033 /// 'format.crypto.file_encryption.encrypt_footer' 'true',
2034 /// 'format.crypto.file_encryption.footer_key_as_hex' '30313233343536373839303132333435', -- b"0123456789012345" */
2035 /// 'format.crypto.file_encryption.column_key_as_hex::double_field' '31323334353637383930313233343530', -- b"1234567890123450"
2036 /// 'format.crypto.file_encryption.column_key_as_hex::float_field' '31323334353637383930313233343531', -- b"1234567890123451"
2037 /// -- Same for decryption
2038 /// 'format.crypto.file_decryption.footer_key_as_hex' '30313233343536373839303132333435', -- b"0123456789012345"
2039 /// 'format.crypto.file_decryption.column_key_as_hex::double_field' '31323334353637383930313233343530', -- b"1234567890123450"
2040 /// 'format.crypto.file_decryption.column_key_as_hex::float_field' '31323334353637383930313233343531', -- b"1234567890123451"
2041 /// )
2042 /// ```
2043 /// See datafusion-cli/tests/sql/encrypted_parquet.sql for a more complete example.
2044 /// Note that keys must be provided as in hex format since these are binary strings.
2045 pub crypto: ParquetEncryptionOptions,
2046}
2047
2048impl TableParquetOptions {
2049 /// Return new default TableParquetOptions
2050 pub fn new() -> Self {
2051 Self::default()
2052 }
2053
2054 /// Set whether the encoding of the arrow metadata should occur
2055 /// during the writing of parquet.
2056 ///
2057 /// Default is to encode the arrow schema in the file kv_metadata.
2058 pub fn with_skip_arrow_metadata(self, skip: bool) -> Self {
2059 Self {
2060 global: ParquetOptions {
2061 skip_arrow_metadata: skip,
2062 ..self.global
2063 },
2064 ..self
2065 }
2066 }
2067
2068 /// Retrieves all configuration entries from this `TableParquetOptions`.
2069 ///
2070 /// # Returns
2071 ///
2072 /// A vector of `ConfigEntry` instances, representing all the configuration options within this
2073 pub fn entries(self: &TableParquetOptions) -> Vec<ConfigEntry> {
2074 struct Visitor(Vec<ConfigEntry>);
2075
2076 impl Visit for Visitor {
2077 fn some<V: Display>(
2078 &mut self,
2079 key: &str,
2080 value: V,
2081 description: &'static str,
2082 ) {
2083 self.0.push(ConfigEntry {
2084 key: key[1..].to_string(),
2085 value: Some(value.to_string()),
2086 description,
2087 })
2088 }
2089
2090 fn none(&mut self, key: &str, description: &'static str) {
2091 self.0.push(ConfigEntry {
2092 key: key[1..].to_string(),
2093 value: None,
2094 description,
2095 })
2096 }
2097 }
2098
2099 let mut v = Visitor(vec![]);
2100 self.visit(&mut v, "", "");
2101
2102 v.0
2103 }
2104}
2105
2106impl ConfigField for TableParquetOptions {
2107 fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, description: &'static str) {
2108 self.global.visit(v, key_prefix, description);
2109 self.column_specific_options
2110 .visit(v, key_prefix, description);
2111 self.crypto
2112 .visit(v, &format!("{key_prefix}.crypto"), description);
2113 }
2114
2115 fn set(&mut self, key: &str, value: &str) -> Result<()> {
2116 // Determine if the key is a global, metadata, or column-specific setting
2117 if key.starts_with("metadata::") {
2118 let k = match key.split("::").collect::<Vec<_>>()[..] {
2119 [_meta] | [_meta, ""] => {
2120 return _config_err!(
2121 "Invalid metadata key provided, missing key in metadata::<key>"
2122 )
2123 }
2124 [_meta, k] => k.into(),
2125 _ => {
2126 return _config_err!(
2127 "Invalid metadata key provided, found too many '::' in \"{key}\""
2128 )
2129 }
2130 };
2131 self.key_value_metadata.insert(k, Some(value.into()));
2132 Ok(())
2133 } else if let Some(crypto_feature) = key.strip_prefix("crypto.") {
2134 self.crypto.set(crypto_feature, value)
2135 } else if key.contains("::") {
2136 self.column_specific_options.set(key, value)
2137 } else {
2138 self.global.set(key, value)
2139 }
2140 }
2141}
2142
2143macro_rules! config_namespace_with_hashmap {
2144 (
2145 $(#[doc = $struct_d:tt])*
2146 $(#[deprecated($($struct_depr:tt)*)])? // Optional struct-level deprecated attribute
2147 $vis:vis struct $struct_name:ident {
2148 $(
2149 $(#[doc = $d:tt])*
2150 $(#[deprecated($($field_depr:tt)*)])? // Optional field-level deprecated attribute
2151 $field_vis:vis $field_name:ident : $field_type:ty, $(transform = $transform:expr,)? default = $default:expr
2152 )*$(,)*
2153 }
2154 ) => {
2155
2156 $(#[doc = $struct_d])*
2157 $(#[deprecated($($struct_depr)*)])? // Apply struct deprecation
2158 #[derive(Debug, Clone, PartialEq)]
2159 $vis struct $struct_name{
2160 $(
2161 $(#[doc = $d])*
2162 $(#[deprecated($($field_depr)*)])? // Apply field deprecation
2163 $field_vis $field_name : $field_type,
2164 )*
2165 }
2166
2167 impl ConfigField for $struct_name {
2168 fn set(&mut self, key: &str, value: &str) -> Result<()> {
2169 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2170 match key {
2171 $(
2172 stringify!($field_name) => {
2173 // Handle deprecated fields
2174 #[allow(deprecated)] // Allow deprecated fields
2175 $(let value = $transform(value);)?
2176 self.$field_name.set(rem, value.as_ref())
2177 },
2178 )*
2179 _ => _config_err!(
2180 "Config value \"{}\" not found on {}", key, stringify!($struct_name)
2181 )
2182 }
2183 }
2184
2185 fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2186 $(
2187 let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
2188 let desc = concat!($($d),*).trim();
2189 // Handle deprecated fields
2190 #[allow(deprecated)]
2191 self.$field_name.visit(v, key.as_str(), desc);
2192 )*
2193 }
2194 }
2195
2196 impl Default for $struct_name {
2197 fn default() -> Self {
2198 #[allow(deprecated)]
2199 Self {
2200 $($field_name: $default),*
2201 }
2202 }
2203 }
2204
2205 impl ConfigField for HashMap<String,$struct_name> {
2206 fn set(&mut self, key: &str, value: &str) -> Result<()> {
2207 let parts: Vec<&str> = key.splitn(2, "::").collect();
2208 match parts.as_slice() {
2209 [inner_key, hashmap_key] => {
2210 // Get or create the struct for the specified key
2211 let inner_value = self
2212 .entry((*hashmap_key).to_owned())
2213 .or_insert_with($struct_name::default);
2214
2215 inner_value.set(inner_key, value)
2216 }
2217 _ => _config_err!("Unrecognized key '{key}'."),
2218 }
2219 }
2220
2221 fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2222 for (column_name, col_options) in self {
2223 $(
2224 let key = format!("{}.{field}::{}", key_prefix, column_name, field = stringify!($field_name));
2225 let desc = concat!($($d),*).trim();
2226 #[allow(deprecated)]
2227 col_options.$field_name.visit(v, key.as_str(), desc);
2228 )*
2229 }
2230 }
2231 }
2232 }
2233}
2234
2235config_namespace_with_hashmap! {
2236 /// Options controlling parquet format for individual columns.
2237 ///
2238 /// See [`ParquetOptions`] for more details
2239 pub struct ParquetColumnOptions {
2240 /// Sets if bloom filter is enabled for the column path.
2241 pub bloom_filter_enabled: Option<bool>, default = None
2242
2243 /// Sets encoding for the column path.
2244 /// Valid values are: plain, plain_dictionary, rle,
2245 /// bit_packed, delta_binary_packed, delta_length_byte_array,
2246 /// delta_byte_array, rle_dictionary, and byte_stream_split.
2247 /// These values are not case-sensitive. If NULL, uses
2248 /// default parquet options
2249 pub encoding: Option<String>, default = None
2250
2251 /// Sets if dictionary encoding is enabled for the column path. If NULL, uses
2252 /// default parquet options
2253 pub dictionary_enabled: Option<bool>, default = None
2254
2255 /// Sets default parquet compression codec for the column path.
2256 /// Valid values are: uncompressed, snappy, gzip(level),
2257 /// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
2258 /// These values are not case-sensitive. If NULL, uses
2259 /// default parquet options
2260 pub compression: Option<String>, transform = str::to_lowercase, default = None
2261
2262 /// Sets if statistics are enabled for the column
2263 /// Valid values are: "none", "chunk", and "page"
2264 /// These values are not case sensitive. If NULL, uses
2265 /// default parquet options
2266 pub statistics_enabled: Option<String>, default = None
2267
2268 /// Sets bloom filter false positive probability for the column path. If NULL, uses
2269 /// default parquet options
2270 pub bloom_filter_fpp: Option<f64>, default = None
2271
2272 /// Sets bloom filter number of distinct values. If NULL, uses
2273 /// default parquet options
2274 pub bloom_filter_ndv: Option<u64>, default = None
2275 }
2276}
2277
2278#[derive(Clone, Debug, PartialEq)]
2279pub struct ConfigFileEncryptionProperties {
2280 /// Should the parquet footer be encrypted
2281 /// default is true
2282 pub encrypt_footer: bool,
2283 /// Key to use for the parquet footer encoded in hex format
2284 pub footer_key_as_hex: String,
2285 /// Metadata information for footer key
2286 pub footer_key_metadata_as_hex: String,
2287 /// HashMap of column names --> (key in hex format, metadata)
2288 pub column_encryption_properties: HashMap<String, ColumnEncryptionProperties>,
2289 /// AAD prefix string uniquely identifies the file and prevents file swapping
2290 pub aad_prefix_as_hex: String,
2291 /// If true, store the AAD prefix in the file
2292 /// default is false
2293 pub store_aad_prefix: bool,
2294}
2295
2296// Setup to match EncryptionPropertiesBuilder::new()
2297impl Default for ConfigFileEncryptionProperties {
2298 fn default() -> Self {
2299 ConfigFileEncryptionProperties {
2300 encrypt_footer: true,
2301 footer_key_as_hex: String::new(),
2302 footer_key_metadata_as_hex: String::new(),
2303 column_encryption_properties: Default::default(),
2304 aad_prefix_as_hex: String::new(),
2305 store_aad_prefix: false,
2306 }
2307 }
2308}
2309
2310config_namespace_with_hashmap! {
2311 pub struct ColumnEncryptionProperties {
2312 /// Per column encryption key
2313 pub column_key_as_hex: String, default = "".to_string()
2314 /// Per column encryption key metadata
2315 pub column_metadata_as_hex: Option<String>, default = None
2316 }
2317}
2318
2319impl ConfigField for ConfigFileEncryptionProperties {
2320 fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2321 let key = format!("{key_prefix}.encrypt_footer");
2322 let desc = "Encrypt the footer";
2323 self.encrypt_footer.visit(v, key.as_str(), desc);
2324
2325 let key = format!("{key_prefix}.footer_key_as_hex");
2326 let desc = "Key to use for the parquet footer";
2327 self.footer_key_as_hex.visit(v, key.as_str(), desc);
2328
2329 let key = format!("{key_prefix}.footer_key_metadata_as_hex");
2330 let desc = "Metadata to use for the parquet footer";
2331 self.footer_key_metadata_as_hex.visit(v, key.as_str(), desc);
2332
2333 self.column_encryption_properties.visit(v, key_prefix, desc);
2334
2335 let key = format!("{key_prefix}.aad_prefix_as_hex");
2336 let desc = "AAD prefix to use";
2337 self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2338
2339 let key = format!("{key_prefix}.store_aad_prefix");
2340 let desc = "If true, store the AAD prefix";
2341 self.store_aad_prefix.visit(v, key.as_str(), desc);
2342
2343 self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2344 }
2345
2346 fn set(&mut self, key: &str, value: &str) -> Result<()> {
2347 // Any hex encoded values must be pre-encoded using
2348 // hex::encode() before calling set.
2349
2350 if key.contains("::") {
2351 // Handle any column specific properties
2352 return self.column_encryption_properties.set(key, value);
2353 };
2354
2355 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2356 match key {
2357 "encrypt_footer" => self.encrypt_footer.set(rem, value.as_ref()),
2358 "footer_key_as_hex" => self.footer_key_as_hex.set(rem, value.as_ref()),
2359 "footer_key_metadata_as_hex" => {
2360 self.footer_key_metadata_as_hex.set(rem, value.as_ref())
2361 }
2362 "aad_prefix_as_hex" => self.aad_prefix_as_hex.set(rem, value.as_ref()),
2363 "store_aad_prefix" => self.store_aad_prefix.set(rem, value.as_ref()),
2364 _ => _config_err!(
2365 "Config value \"{}\" not found on ConfigFileEncryptionProperties",
2366 key
2367 ),
2368 }
2369 }
2370}
2371
2372#[cfg(feature = "parquet_encryption")]
2373impl From<ConfigFileEncryptionProperties> for FileEncryptionProperties {
2374 fn from(val: ConfigFileEncryptionProperties) -> Self {
2375 let mut fep = FileEncryptionProperties::builder(
2376 hex::decode(val.footer_key_as_hex).unwrap(),
2377 )
2378 .with_plaintext_footer(!val.encrypt_footer)
2379 .with_aad_prefix_storage(val.store_aad_prefix);
2380
2381 if !val.footer_key_metadata_as_hex.is_empty() {
2382 fep = fep.with_footer_key_metadata(
2383 hex::decode(&val.footer_key_metadata_as_hex)
2384 .expect("Invalid footer key metadata"),
2385 );
2386 }
2387
2388 for (column_name, encryption_props) in val.column_encryption_properties.iter() {
2389 let encryption_key = hex::decode(&encryption_props.column_key_as_hex)
2390 .expect("Invalid column encryption key");
2391 let key_metadata = encryption_props
2392 .column_metadata_as_hex
2393 .as_ref()
2394 .map(|x| hex::decode(x).expect("Invalid column metadata"));
2395 match key_metadata {
2396 Some(key_metadata) => {
2397 fep = fep.with_column_key_and_metadata(
2398 column_name,
2399 encryption_key,
2400 key_metadata,
2401 );
2402 }
2403 None => {
2404 fep = fep.with_column_key(column_name, encryption_key);
2405 }
2406 }
2407 }
2408
2409 if !val.aad_prefix_as_hex.is_empty() {
2410 let aad_prefix: Vec<u8> =
2411 hex::decode(&val.aad_prefix_as_hex).expect("Invalid AAD prefix");
2412 fep = fep.with_aad_prefix(aad_prefix);
2413 }
2414 Arc::unwrap_or_clone(fep.build().unwrap())
2415 }
2416}
2417
2418#[cfg(feature = "parquet_encryption")]
2419impl From<&Arc<FileEncryptionProperties>> for ConfigFileEncryptionProperties {
2420 fn from(f: &Arc<FileEncryptionProperties>) -> Self {
2421 let (column_names_vec, column_keys_vec, column_metas_vec) = f.column_keys();
2422
2423 let mut column_encryption_properties: HashMap<
2424 String,
2425 ColumnEncryptionProperties,
2426 > = HashMap::new();
2427
2428 for (i, column_name) in column_names_vec.iter().enumerate() {
2429 let column_key_as_hex = hex::encode(&column_keys_vec[i]);
2430 let column_metadata_as_hex: Option<String> =
2431 column_metas_vec.get(i).map(hex::encode);
2432 column_encryption_properties.insert(
2433 column_name.clone(),
2434 ColumnEncryptionProperties {
2435 column_key_as_hex,
2436 column_metadata_as_hex,
2437 },
2438 );
2439 }
2440 let mut aad_prefix: Vec<u8> = Vec::new();
2441 if let Some(prefix) = f.aad_prefix() {
2442 aad_prefix = prefix.clone();
2443 }
2444 ConfigFileEncryptionProperties {
2445 encrypt_footer: f.encrypt_footer(),
2446 footer_key_as_hex: hex::encode(f.footer_key()),
2447 footer_key_metadata_as_hex: f
2448 .footer_key_metadata()
2449 .map(hex::encode)
2450 .unwrap_or_default(),
2451 column_encryption_properties,
2452 aad_prefix_as_hex: hex::encode(aad_prefix),
2453 store_aad_prefix: f.store_aad_prefix(),
2454 }
2455 }
2456}
2457
2458#[derive(Clone, Debug, PartialEq)]
2459pub struct ConfigFileDecryptionProperties {
2460 /// Binary string to use for the parquet footer encoded in hex format
2461 pub footer_key_as_hex: String,
2462 /// HashMap of column names --> key in hex format
2463 pub column_decryption_properties: HashMap<String, ColumnDecryptionProperties>,
2464 /// AAD prefix string uniquely identifies the file and prevents file swapping
2465 pub aad_prefix_as_hex: String,
2466 /// If true, then verify signature for files with plaintext footers.
2467 /// default = true
2468 pub footer_signature_verification: bool,
2469}
2470
2471config_namespace_with_hashmap! {
2472 pub struct ColumnDecryptionProperties {
2473 /// Per column encryption key
2474 pub column_key_as_hex: String, default = "".to_string()
2475 }
2476}
2477
2478// Setup to match DecryptionPropertiesBuilder::new()
2479impl Default for ConfigFileDecryptionProperties {
2480 fn default() -> Self {
2481 ConfigFileDecryptionProperties {
2482 footer_key_as_hex: String::new(),
2483 column_decryption_properties: Default::default(),
2484 aad_prefix_as_hex: String::new(),
2485 footer_signature_verification: true,
2486 }
2487 }
2488}
2489
2490impl ConfigField for ConfigFileDecryptionProperties {
2491 fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
2492 let key = format!("{key_prefix}.footer_key_as_hex");
2493 let desc = "Key to use for the parquet footer";
2494 self.footer_key_as_hex.visit(v, key.as_str(), desc);
2495
2496 let key = format!("{key_prefix}.aad_prefix_as_hex");
2497 let desc = "AAD prefix to use";
2498 self.aad_prefix_as_hex.visit(v, key.as_str(), desc);
2499
2500 let key = format!("{key_prefix}.footer_signature_verification");
2501 let desc = "If true, verify the footer signature";
2502 self.footer_signature_verification
2503 .visit(v, key.as_str(), desc);
2504
2505 self.column_decryption_properties.visit(v, key_prefix, desc);
2506 }
2507
2508 fn set(&mut self, key: &str, value: &str) -> Result<()> {
2509 // Any hex encoded values must be pre-encoded using
2510 // hex::encode() before calling set.
2511
2512 if key.contains("::") {
2513 // Handle any column specific properties
2514 return self.column_decryption_properties.set(key, value);
2515 };
2516
2517 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2518 match key {
2519 "footer_key_as_hex" => self.footer_key_as_hex.set(rem, value.as_ref()),
2520 "aad_prefix_as_hex" => self.aad_prefix_as_hex.set(rem, value.as_ref()),
2521 "footer_signature_verification" => {
2522 self.footer_signature_verification.set(rem, value.as_ref())
2523 }
2524 _ => _config_err!(
2525 "Config value \"{}\" not found on ConfigFileEncryptionProperties",
2526 key
2527 ),
2528 }
2529 }
2530}
2531
2532#[cfg(feature = "parquet_encryption")]
2533impl From<ConfigFileDecryptionProperties> for FileDecryptionProperties {
2534 fn from(val: ConfigFileDecryptionProperties) -> Self {
2535 let mut column_names: Vec<&str> = Vec::new();
2536 let mut column_keys: Vec<Vec<u8>> = Vec::new();
2537
2538 for (col_name, decryption_properties) in val.column_decryption_properties.iter() {
2539 column_names.push(col_name.as_str());
2540 column_keys.push(
2541 hex::decode(&decryption_properties.column_key_as_hex)
2542 .expect("Invalid column decryption key"),
2543 );
2544 }
2545
2546 let mut fep = FileDecryptionProperties::builder(
2547 hex::decode(val.footer_key_as_hex).expect("Invalid footer key"),
2548 )
2549 .with_column_keys(column_names, column_keys)
2550 .unwrap();
2551
2552 if !val.footer_signature_verification {
2553 fep = fep.disable_footer_signature_verification();
2554 }
2555
2556 if !val.aad_prefix_as_hex.is_empty() {
2557 let aad_prefix =
2558 hex::decode(&val.aad_prefix_as_hex).expect("Invalid AAD prefix");
2559 fep = fep.with_aad_prefix(aad_prefix);
2560 }
2561
2562 Arc::unwrap_or_clone(fep.build().unwrap())
2563 }
2564}
2565
2566#[cfg(feature = "parquet_encryption")]
2567impl From<&Arc<FileDecryptionProperties>> for ConfigFileDecryptionProperties {
2568 fn from(f: &Arc<FileDecryptionProperties>) -> Self {
2569 let (column_names_vec, column_keys_vec) = f.column_keys();
2570 let mut column_decryption_properties: HashMap<
2571 String,
2572 ColumnDecryptionProperties,
2573 > = HashMap::new();
2574 for (i, column_name) in column_names_vec.iter().enumerate() {
2575 let props = ColumnDecryptionProperties {
2576 column_key_as_hex: hex::encode(column_keys_vec[i].clone()),
2577 };
2578 column_decryption_properties.insert(column_name.clone(), props);
2579 }
2580
2581 let mut aad_prefix: Vec<u8> = Vec::new();
2582 if let Some(prefix) = f.aad_prefix() {
2583 aad_prefix = prefix.clone();
2584 }
2585 ConfigFileDecryptionProperties {
2586 footer_key_as_hex: hex::encode(
2587 f.footer_key(None).unwrap_or_default().as_ref(),
2588 ),
2589 column_decryption_properties,
2590 aad_prefix_as_hex: hex::encode(aad_prefix),
2591 footer_signature_verification: f.check_plaintext_footer_integrity(),
2592 }
2593 }
2594}
2595
2596/// Holds implementation-specific options for an encryption factory
2597#[derive(Clone, Debug, Default, PartialEq)]
2598pub struct EncryptionFactoryOptions {
2599 pub options: HashMap<String, String>,
2600}
2601
2602impl ConfigField for EncryptionFactoryOptions {
2603 fn visit<V: Visit>(&self, v: &mut V, key: &str, _description: &'static str) {
2604 for (option_key, option_value) in &self.options {
2605 v.some(
2606 &format!("{key}.{option_key}"),
2607 option_value,
2608 "Encryption factory specific option",
2609 );
2610 }
2611 }
2612
2613 fn set(&mut self, key: &str, value: &str) -> Result<()> {
2614 self.options.insert(key.to_owned(), value.to_owned());
2615 Ok(())
2616 }
2617}
2618
2619impl EncryptionFactoryOptions {
2620 /// Convert these encryption factory options to an [`ExtensionOptions`] instance.
2621 pub fn to_extension_options<T: ExtensionOptions + Default>(&self) -> Result<T> {
2622 let mut options = T::default();
2623 for (key, value) in &self.options {
2624 options.set(key, value)?;
2625 }
2626 Ok(options)
2627 }
2628}
2629
2630config_namespace! {
2631 /// Options controlling CSV format
2632 pub struct CsvOptions {
2633 /// Specifies whether there is a CSV header (i.e. the first line
2634 /// consists of is column names). The value `None` indicates that
2635 /// the configuration should be consulted.
2636 pub has_header: Option<bool>, default = None
2637 pub delimiter: u8, default = b','
2638 pub quote: u8, default = b'"'
2639 pub terminator: Option<u8>, default = None
2640 pub escape: Option<u8>, default = None
2641 pub double_quote: Option<bool>, default = None
2642 /// Specifies whether newlines in (quoted) values are supported.
2643 ///
2644 /// Parsing newlines in quoted values may be affected by execution behaviour such as
2645 /// parallel file scanning. Setting this to `true` ensures that newlines in values are
2646 /// parsed successfully, which may reduce performance.
2647 ///
2648 /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
2649 pub newlines_in_values: Option<bool>, default = None
2650 pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
2651 pub schema_infer_max_rec: Option<usize>, default = None
2652 pub date_format: Option<String>, default = None
2653 pub datetime_format: Option<String>, default = None
2654 pub timestamp_format: Option<String>, default = None
2655 pub timestamp_tz_format: Option<String>, default = None
2656 pub time_format: Option<String>, default = None
2657 // The output format for Nulls in the CSV writer.
2658 pub null_value: Option<String>, default = None
2659 // The input regex for Nulls when loading CSVs.
2660 pub null_regex: Option<String>, default = None
2661 pub comment: Option<u8>, default = None
2662 /// Whether to allow truncated rows when parsing, both within a single file and across files.
2663 ///
2664 /// When set to false (default), reading a single CSV file which has rows of different lengths will
2665 /// error; if reading multiple CSV files with different number of columns, it will also fail.
2666 ///
2667 /// When set to true, reading a single CSV file with rows of different lengths will pad the truncated
2668 /// rows with null values for the missing columns; if reading multiple CSV files with different number
2669 /// of columns, it creates a union schema containing all columns found across the files, and will
2670 /// pad any files missing columns with null values for their rows.
2671 pub truncated_rows: Option<bool>, default = None
2672 }
2673}
2674
2675impl CsvOptions {
2676 /// Set a limit in terms of records to scan to infer the schema
2677 /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
2678 pub fn with_compression(
2679 mut self,
2680 compression_type_variant: CompressionTypeVariant,
2681 ) -> Self {
2682 self.compression = compression_type_variant;
2683 self
2684 }
2685
2686 /// Set a limit in terms of records to scan to infer the schema
2687 /// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
2688 pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self {
2689 self.schema_infer_max_rec = Some(max_rec);
2690 self
2691 }
2692
2693 /// Set true to indicate that the first line is a header.
2694 /// - default to true
2695 pub fn with_has_header(mut self, has_header: bool) -> Self {
2696 self.has_header = Some(has_header);
2697 self
2698 }
2699
2700 /// Returns true if the first line is a header. If format options does not
2701 /// specify whether there is a header, returns `None` (indicating that the
2702 /// configuration should be consulted).
2703 pub fn has_header(&self) -> Option<bool> {
2704 self.has_header
2705 }
2706
2707 /// The character separating values within a row.
2708 /// - default to ','
2709 pub fn with_delimiter(mut self, delimiter: u8) -> Self {
2710 self.delimiter = delimiter;
2711 self
2712 }
2713
2714 /// The quote character in a row.
2715 /// - default to '"'
2716 pub fn with_quote(mut self, quote: u8) -> Self {
2717 self.quote = quote;
2718 self
2719 }
2720
2721 /// The character that terminates a row.
2722 /// - default to None (CRLF)
2723 pub fn with_terminator(mut self, terminator: Option<u8>) -> Self {
2724 self.terminator = terminator;
2725 self
2726 }
2727
2728 /// The escape character in a row.
2729 /// - default is None
2730 pub fn with_escape(mut self, escape: Option<u8>) -> Self {
2731 self.escape = escape;
2732 self
2733 }
2734
2735 /// Set true to indicate that the CSV quotes should be doubled.
2736 /// - default to true
2737 pub fn with_double_quote(mut self, double_quote: bool) -> Self {
2738 self.double_quote = Some(double_quote);
2739 self
2740 }
2741
2742 /// Specifies whether newlines in (quoted) values are supported.
2743 ///
2744 /// Parsing newlines in quoted values may be affected by execution behaviour such as
2745 /// parallel file scanning. Setting this to `true` ensures that newlines in values are
2746 /// parsed successfully, which may reduce performance.
2747 ///
2748 /// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
2749 pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
2750 self.newlines_in_values = Some(newlines_in_values);
2751 self
2752 }
2753
2754 /// Set a `CompressionTypeVariant` of CSV
2755 /// - defaults to `CompressionTypeVariant::UNCOMPRESSED`
2756 pub fn with_file_compression_type(
2757 mut self,
2758 compression: CompressionTypeVariant,
2759 ) -> Self {
2760 self.compression = compression;
2761 self
2762 }
2763
2764 /// Whether to allow truncated rows when parsing.
2765 /// By default this is set to false and will error if the CSV rows have different lengths.
2766 /// When set to true then it will allow records with less than the expected number of columns and fill the missing columns with nulls.
2767 /// If the record’s schema is not nullable, then it will still return an error.
2768 pub fn with_truncated_rows(mut self, allow: bool) -> Self {
2769 self.truncated_rows = Some(allow);
2770 self
2771 }
2772
2773 /// The delimiter character.
2774 pub fn delimiter(&self) -> u8 {
2775 self.delimiter
2776 }
2777
2778 /// The quote character.
2779 pub fn quote(&self) -> u8 {
2780 self.quote
2781 }
2782
2783 /// The terminator character.
2784 pub fn terminator(&self) -> Option<u8> {
2785 self.terminator
2786 }
2787
2788 /// The escape character.
2789 pub fn escape(&self) -> Option<u8> {
2790 self.escape
2791 }
2792}
2793
2794config_namespace! {
2795 /// Options controlling JSON format
2796 pub struct JsonOptions {
2797 pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
2798 pub schema_infer_max_rec: Option<usize>, default = None
2799 }
2800}
2801
2802pub trait OutputFormatExt: Display {}
2803
2804#[derive(Debug, Clone, PartialEq)]
2805#[allow(clippy::large_enum_variant)]
2806pub enum OutputFormat {
2807 CSV(CsvOptions),
2808 JSON(JsonOptions),
2809 #[cfg(feature = "parquet")]
2810 PARQUET(TableParquetOptions),
2811 AVRO,
2812 ARROW,
2813}
2814
2815impl Display for OutputFormat {
2816 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2817 let out = match self {
2818 OutputFormat::CSV(_) => "csv",
2819 OutputFormat::JSON(_) => "json",
2820 #[cfg(feature = "parquet")]
2821 OutputFormat::PARQUET(_) => "parquet",
2822 OutputFormat::AVRO => "avro",
2823 OutputFormat::ARROW => "arrow",
2824 };
2825 write!(f, "{out}")
2826 }
2827}
2828
2829#[cfg(test)]
2830mod tests {
2831 #[cfg(feature = "parquet")]
2832 use crate::config::TableParquetOptions;
2833 use crate::config::{
2834 ConfigEntry, ConfigExtension, ConfigField, ConfigFileType, ExtensionOptions,
2835 Extensions, TableOptions,
2836 };
2837 use std::any::Any;
2838 use std::collections::HashMap;
2839 use std::sync::Arc;
2840
2841 #[derive(Default, Debug, Clone)]
2842 pub struct TestExtensionConfig {
2843 /// Should "foo" be replaced by "bar"?
2844 pub properties: HashMap<String, String>,
2845 }
2846
2847 impl ExtensionOptions for TestExtensionConfig {
2848 fn as_any(&self) -> &dyn Any {
2849 self
2850 }
2851
2852 fn as_any_mut(&mut self) -> &mut dyn Any {
2853 self
2854 }
2855
2856 fn cloned(&self) -> Box<dyn ExtensionOptions> {
2857 Box::new(self.clone())
2858 }
2859
2860 fn set(&mut self, key: &str, value: &str) -> crate::Result<()> {
2861 let (key, rem) = key.split_once('.').unwrap_or((key, ""));
2862 assert_eq!(key, "test");
2863 self.properties.insert(rem.to_owned(), value.to_owned());
2864 Ok(())
2865 }
2866
2867 fn entries(&self) -> Vec<ConfigEntry> {
2868 self.properties
2869 .iter()
2870 .map(|(k, v)| ConfigEntry {
2871 key: k.into(),
2872 value: Some(v.into()),
2873 description: "",
2874 })
2875 .collect()
2876 }
2877 }
2878
2879 impl ConfigExtension for TestExtensionConfig {
2880 const PREFIX: &'static str = "test";
2881 }
2882
2883 #[test]
2884 fn create_table_config() {
2885 let mut extension = Extensions::new();
2886 extension.insert(TestExtensionConfig::default());
2887 let table_config = TableOptions::new().with_extensions(extension);
2888 let kafka_config = table_config.extensions.get::<TestExtensionConfig>();
2889 assert!(kafka_config.is_some())
2890 }
2891
2892 #[test]
2893 fn alter_test_extension_config() {
2894 let mut extension = Extensions::new();
2895 extension.insert(TestExtensionConfig::default());
2896 let mut table_config = TableOptions::new().with_extensions(extension);
2897 table_config.set_config_format(ConfigFileType::CSV);
2898 table_config.set("format.delimiter", ";").unwrap();
2899 assert_eq!(table_config.csv.delimiter, b';');
2900 table_config.set("test.bootstrap.servers", "asd").unwrap();
2901 let kafka_config = table_config
2902 .extensions
2903 .get::<TestExtensionConfig>()
2904 .unwrap();
2905 assert_eq!(
2906 kafka_config.properties.get("bootstrap.servers").unwrap(),
2907 "asd"
2908 );
2909 }
2910
2911 #[test]
2912 fn csv_u8_table_options() {
2913 let mut table_config = TableOptions::new();
2914 table_config.set_config_format(ConfigFileType::CSV);
2915 table_config.set("format.delimiter", ";").unwrap();
2916 assert_eq!(table_config.csv.delimiter as char, ';');
2917 table_config.set("format.escape", "\"").unwrap();
2918 assert_eq!(table_config.csv.escape.unwrap() as char, '"');
2919 table_config.set("format.escape", "\'").unwrap();
2920 assert_eq!(table_config.csv.escape.unwrap() as char, '\'');
2921 }
2922
2923 #[test]
2924 fn warning_only_not_default() {
2925 use std::sync::atomic::AtomicUsize;
2926 static COUNT: AtomicUsize = AtomicUsize::new(0);
2927 use log::{Level, LevelFilter, Metadata, Record};
2928 struct SimpleLogger;
2929 impl log::Log for SimpleLogger {
2930 fn enabled(&self, metadata: &Metadata) -> bool {
2931 metadata.level() <= Level::Info
2932 }
2933
2934 fn log(&self, record: &Record) {
2935 if self.enabled(record.metadata()) {
2936 COUNT.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
2937 }
2938 }
2939 fn flush(&self) {}
2940 }
2941 log::set_logger(&SimpleLogger).unwrap();
2942 log::set_max_level(LevelFilter::Info);
2943 let mut sql_parser_options = crate::config::SqlParserOptions::default();
2944 sql_parser_options
2945 .set("enable_options_value_normalization", "false")
2946 .unwrap();
2947 assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 0);
2948 sql_parser_options
2949 .set("enable_options_value_normalization", "true")
2950 .unwrap();
2951 assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 1);
2952 }
2953
2954 #[cfg(feature = "parquet")]
2955 #[test]
2956 fn parquet_table_options() {
2957 let mut table_config = TableOptions::new();
2958 table_config.set_config_format(ConfigFileType::PARQUET);
2959 table_config
2960 .set("format.bloom_filter_enabled::col1", "true")
2961 .unwrap();
2962 assert_eq!(
2963 table_config.parquet.column_specific_options["col1"].bloom_filter_enabled,
2964 Some(true)
2965 );
2966 }
2967
2968 #[cfg(feature = "parquet_encryption")]
2969 #[test]
2970 fn parquet_table_encryption() {
2971 use crate::config::{
2972 ConfigFileDecryptionProperties, ConfigFileEncryptionProperties,
2973 };
2974 use parquet::encryption::decrypt::FileDecryptionProperties;
2975 use parquet::encryption::encrypt::FileEncryptionProperties;
2976
2977 let footer_key = b"0123456789012345".to_vec(); // 128bit/16
2978 let column_names = vec!["double_field", "float_field"];
2979 let column_keys =
2980 vec![b"1234567890123450".to_vec(), b"1234567890123451".to_vec()];
2981
2982 let file_encryption_properties =
2983 FileEncryptionProperties::builder(footer_key.clone())
2984 .with_column_keys(column_names.clone(), column_keys.clone())
2985 .unwrap()
2986 .build()
2987 .unwrap();
2988
2989 let decryption_properties = FileDecryptionProperties::builder(footer_key.clone())
2990 .with_column_keys(column_names.clone(), column_keys.clone())
2991 .unwrap()
2992 .build()
2993 .unwrap();
2994
2995 // Test round-trip
2996 let config_encrypt =
2997 ConfigFileEncryptionProperties::from(&file_encryption_properties);
2998 let encryption_properties_built =
2999 Arc::new(FileEncryptionProperties::from(config_encrypt.clone()));
3000 assert_eq!(file_encryption_properties, encryption_properties_built);
3001
3002 let config_decrypt = ConfigFileDecryptionProperties::from(&decryption_properties);
3003 let decryption_properties_built =
3004 Arc::new(FileDecryptionProperties::from(config_decrypt.clone()));
3005 assert_eq!(decryption_properties, decryption_properties_built);
3006
3007 ///////////////////////////////////////////////////////////////////////////////////
3008 // Test encryption config
3009
3010 // Display original encryption config
3011 // println!("{:#?}", config_encrypt);
3012
3013 let mut table_config = TableOptions::new();
3014 table_config.set_config_format(ConfigFileType::PARQUET);
3015 table_config
3016 .parquet
3017 .set(
3018 "crypto.file_encryption.encrypt_footer",
3019 config_encrypt.encrypt_footer.to_string().as_str(),
3020 )
3021 .unwrap();
3022 table_config
3023 .parquet
3024 .set(
3025 "crypto.file_encryption.footer_key_as_hex",
3026 config_encrypt.footer_key_as_hex.as_str(),
3027 )
3028 .unwrap();
3029
3030 for (i, col_name) in column_names.iter().enumerate() {
3031 let key = format!("crypto.file_encryption.column_key_as_hex::{col_name}");
3032 let value = hex::encode(column_keys[i].clone());
3033 table_config
3034 .parquet
3035 .set(key.as_str(), value.as_str())
3036 .unwrap();
3037 }
3038
3039 // Print matching final encryption config
3040 // println!("{:#?}", table_config.parquet.crypto.file_encryption);
3041
3042 assert_eq!(
3043 table_config.parquet.crypto.file_encryption,
3044 Some(config_encrypt)
3045 );
3046
3047 ///////////////////////////////////////////////////////////////////////////////////
3048 // Test decryption config
3049
3050 // Display original decryption config
3051 // println!("{:#?}", config_decrypt);
3052
3053 let mut table_config = TableOptions::new();
3054 table_config.set_config_format(ConfigFileType::PARQUET);
3055 table_config
3056 .parquet
3057 .set(
3058 "crypto.file_decryption.footer_key_as_hex",
3059 config_decrypt.footer_key_as_hex.as_str(),
3060 )
3061 .unwrap();
3062
3063 for (i, col_name) in column_names.iter().enumerate() {
3064 let key = format!("crypto.file_decryption.column_key_as_hex::{col_name}");
3065 let value = hex::encode(column_keys[i].clone());
3066 table_config
3067 .parquet
3068 .set(key.as_str(), value.as_str())
3069 .unwrap();
3070 }
3071
3072 // Print matching final decryption config
3073 // println!("{:#?}", table_config.parquet.crypto.file_decryption);
3074
3075 assert_eq!(
3076 table_config.parquet.crypto.file_decryption,
3077 Some(config_decrypt.clone())
3078 );
3079
3080 // Set config directly
3081 let mut table_config = TableOptions::new();
3082 table_config.set_config_format(ConfigFileType::PARQUET);
3083 table_config.parquet.crypto.file_decryption = Some(config_decrypt.clone());
3084 assert_eq!(
3085 table_config.parquet.crypto.file_decryption,
3086 Some(config_decrypt.clone())
3087 );
3088 }
3089
3090 #[cfg(feature = "parquet_encryption")]
3091 #[test]
3092 fn parquet_encryption_factory_config() {
3093 let mut parquet_options = TableParquetOptions::default();
3094
3095 assert_eq!(parquet_options.crypto.factory_id, None);
3096 assert_eq!(parquet_options.crypto.factory_options.options.len(), 0);
3097
3098 let mut input_config = TestExtensionConfig::default();
3099 input_config
3100 .properties
3101 .insert("key1".to_string(), "value 1".to_string());
3102 input_config
3103 .properties
3104 .insert("key2".to_string(), "value 2".to_string());
3105
3106 parquet_options
3107 .crypto
3108 .configure_factory("example_factory", &input_config);
3109
3110 assert_eq!(
3111 parquet_options.crypto.factory_id,
3112 Some("example_factory".to_string())
3113 );
3114 let factory_options = &parquet_options.crypto.factory_options.options;
3115 assert_eq!(factory_options.len(), 2);
3116 assert_eq!(factory_options.get("key1"), Some(&"value 1".to_string()));
3117 assert_eq!(factory_options.get("key2"), Some(&"value 2".to_string()));
3118 }
3119
3120 #[cfg(feature = "parquet")]
3121 #[test]
3122 fn parquet_table_options_config_entry() {
3123 let mut table_config = TableOptions::new();
3124 table_config.set_config_format(ConfigFileType::PARQUET);
3125 table_config
3126 .set("format.bloom_filter_enabled::col1", "true")
3127 .unwrap();
3128 let entries = table_config.entries();
3129 assert!(entries
3130 .iter()
3131 .any(|item| item.key == "format.bloom_filter_enabled::col1"))
3132 }
3133
3134 #[cfg(feature = "parquet")]
3135 #[test]
3136 fn parquet_table_parquet_options_config_entry() {
3137 let mut table_parquet_options = TableParquetOptions::new();
3138 table_parquet_options
3139 .set(
3140 "crypto.file_encryption.column_key_as_hex::double_field",
3141 "31323334353637383930313233343530",
3142 )
3143 .unwrap();
3144 let entries = table_parquet_options.entries();
3145 assert!(entries
3146 .iter()
3147 .any(|item| item.key
3148 == "crypto.file_encryption.column_key_as_hex::double_field"))
3149 }
3150
3151 #[cfg(feature = "parquet")]
3152 #[test]
3153 fn parquet_table_options_config_metadata_entry() {
3154 let mut table_config = TableOptions::new();
3155 table_config.set_config_format(ConfigFileType::PARQUET);
3156 table_config.set("format.metadata::key1", "").unwrap();
3157 table_config.set("format.metadata::key2", "value2").unwrap();
3158 table_config
3159 .set("format.metadata::key3", "value with spaces ")
3160 .unwrap();
3161 table_config
3162 .set("format.metadata::key4", "value with special chars :: :")
3163 .unwrap();
3164
3165 let parsed_metadata = table_config.parquet.key_value_metadata.clone();
3166 assert_eq!(parsed_metadata.get("should not exist1"), None);
3167 assert_eq!(parsed_metadata.get("key1"), Some(&Some("".into())));
3168 assert_eq!(parsed_metadata.get("key2"), Some(&Some("value2".into())));
3169 assert_eq!(
3170 parsed_metadata.get("key3"),
3171 Some(&Some("value with spaces ".into()))
3172 );
3173 assert_eq!(
3174 parsed_metadata.get("key4"),
3175 Some(&Some("value with special chars :: :".into()))
3176 );
3177
3178 // duplicate keys are overwritten
3179 table_config.set("format.metadata::key_dupe", "A").unwrap();
3180 table_config.set("format.metadata::key_dupe", "B").unwrap();
3181 let parsed_metadata = table_config.parquet.key_value_metadata;
3182 assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into())));
3183 }
3184}