datafusion_sql/
planner.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`SqlToRel`]: SQL Query Planner (produces [`LogicalPlan`] from SQL AST)
19use std::collections::HashMap;
20use std::str::FromStr;
21use std::sync::Arc;
22use std::vec;
23
24use crate::utils::make_decimal_type;
25use arrow::datatypes::*;
26use datafusion_common::config::SqlParserOptions;
27use datafusion_common::datatype::{DataTypeExt, FieldExt};
28use datafusion_common::error::add_possible_columns_to_diag;
29use datafusion_common::TableReference;
30use datafusion_common::{
31    field_not_found, plan_datafusion_err, DFSchemaRef, Diagnostic, HashSet, SchemaError,
32};
33use datafusion_common::{not_impl_err, plan_err, DFSchema, DataFusionError, Result};
34use datafusion_expr::logical_plan::{LogicalPlan, LogicalPlanBuilder};
35pub use datafusion_expr::planner::ContextProvider;
36use datafusion_expr::{col, Expr};
37use sqlparser::ast::{ArrayElemTypeDef, ExactNumberInfo, TimezoneInfo};
38use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
39use sqlparser::ast::{DataType as SQLDataType, Ident, ObjectName, TableAlias};
40
41/// SQL parser options
42#[derive(Debug, Clone, Copy)]
43pub struct ParserOptions {
44    /// Whether to parse float as decimal.
45    pub parse_float_as_decimal: bool,
46    /// Whether to normalize identifiers.
47    pub enable_ident_normalization: bool,
48    /// Whether to support varchar with length.
49    pub support_varchar_with_length: bool,
50    /// Whether to normalize options value.
51    pub enable_options_value_normalization: bool,
52    /// Whether to collect spans
53    pub collect_spans: bool,
54    /// Whether string types (VARCHAR, CHAR, Text, and String) are mapped to `Utf8View` during SQL planning.
55    pub map_string_types_to_utf8view: bool,
56    /// Default null ordering for sorting expressions.
57    pub default_null_ordering: NullOrdering,
58}
59
60impl ParserOptions {
61    /// Creates a new `ParserOptions` instance with default values.
62    ///
63    /// # Examples
64    ///
65    /// ```
66    /// use datafusion_sql::planner::ParserOptions;
67    /// let opts = ParserOptions::new();
68    /// assert_eq!(opts.parse_float_as_decimal, false);
69    /// assert_eq!(opts.enable_ident_normalization, true);
70    /// ```
71    pub fn new() -> Self {
72        Self {
73            parse_float_as_decimal: false,
74            enable_ident_normalization: true,
75            support_varchar_with_length: true,
76            map_string_types_to_utf8view: true,
77            enable_options_value_normalization: false,
78            collect_spans: false,
79            // By default, `nulls_max` is used to follow Postgres's behavior.
80            // postgres rule: https://www.postgresql.org/docs/current/queries-order.html
81            default_null_ordering: NullOrdering::NullsMax,
82        }
83    }
84
85    /// Sets the `parse_float_as_decimal` option.
86    ///
87    /// # Examples
88    ///
89    /// ```
90    /// use datafusion_sql::planner::ParserOptions;
91    /// let opts = ParserOptions::new().with_parse_float_as_decimal(true);
92    /// assert_eq!(opts.parse_float_as_decimal, true);
93    /// ```
94    pub fn with_parse_float_as_decimal(mut self, value: bool) -> Self {
95        self.parse_float_as_decimal = value;
96        self
97    }
98
99    /// Sets the `enable_ident_normalization` option.
100    ///
101    /// # Examples
102    ///
103    /// ```
104    /// use datafusion_sql::planner::ParserOptions;
105    /// let opts = ParserOptions::new().with_enable_ident_normalization(false);
106    /// assert_eq!(opts.enable_ident_normalization, false);
107    /// ```
108    pub fn with_enable_ident_normalization(mut self, value: bool) -> Self {
109        self.enable_ident_normalization = value;
110        self
111    }
112
113    /// Sets the `support_varchar_with_length` option.
114    pub fn with_support_varchar_with_length(mut self, value: bool) -> Self {
115        self.support_varchar_with_length = value;
116        self
117    }
118
119    /// Sets the `map_string_types_to_utf8view` option.
120    pub fn with_map_string_types_to_utf8view(mut self, value: bool) -> Self {
121        self.map_string_types_to_utf8view = value;
122        self
123    }
124
125    /// Sets the `enable_options_value_normalization` option.
126    pub fn with_enable_options_value_normalization(mut self, value: bool) -> Self {
127        self.enable_options_value_normalization = value;
128        self
129    }
130
131    /// Sets the `collect_spans` option.
132    pub fn with_collect_spans(mut self, value: bool) -> Self {
133        self.collect_spans = value;
134        self
135    }
136
137    /// Sets the `default_null_ordering` option.
138    pub fn with_default_null_ordering(mut self, value: NullOrdering) -> Self {
139        self.default_null_ordering = value;
140        self
141    }
142}
143
144impl Default for ParserOptions {
145    fn default() -> Self {
146        Self::new()
147    }
148}
149
150impl From<&SqlParserOptions> for ParserOptions {
151    fn from(options: &SqlParserOptions) -> Self {
152        Self {
153            parse_float_as_decimal: options.parse_float_as_decimal,
154            enable_ident_normalization: options.enable_ident_normalization,
155            support_varchar_with_length: options.support_varchar_with_length,
156            map_string_types_to_utf8view: options.map_string_types_to_utf8view,
157            enable_options_value_normalization: options
158                .enable_options_value_normalization,
159            collect_spans: options.collect_spans,
160            default_null_ordering: options.default_null_ordering.as_str().into(),
161        }
162    }
163}
164
165/// Represents the null ordering for sorting expressions.
166#[derive(Debug, Clone, Copy)]
167pub enum NullOrdering {
168    /// Nulls appear last in ascending order.
169    NullsMax,
170    /// Nulls appear first in descending order.
171    NullsMin,
172    /// Nulls appear first.
173    NullsFirst,
174    /// Nulls appear last.
175    NullsLast,
176}
177
178impl NullOrdering {
179    /// Evaluates the null ordering based on the given ascending flag.
180    ///
181    /// # Returns
182    /// * `true` if nulls should appear first.
183    /// * `false` if nulls should appear last.
184    pub fn nulls_first(&self, asc: bool) -> bool {
185        match self {
186            Self::NullsMax => !asc,
187            Self::NullsMin => asc,
188            Self::NullsFirst => true,
189            Self::NullsLast => false,
190        }
191    }
192}
193
194impl FromStr for NullOrdering {
195    type Err = DataFusionError;
196
197    fn from_str(s: &str) -> Result<Self> {
198        match s {
199            "nulls_max" => Ok(Self::NullsMax),
200            "nulls_min" => Ok(Self::NullsMin),
201            "nulls_first" => Ok(Self::NullsFirst),
202            "nulls_last" => Ok(Self::NullsLast),
203            _ => plan_err!("Unknown null ordering: Expected one of 'nulls_first', 'nulls_last', 'nulls_min' or 'nulls_max'. Got {s}"),
204        }
205    }
206}
207
208impl From<&str> for NullOrdering {
209    fn from(s: &str) -> Self {
210        Self::from_str(s).unwrap_or(Self::NullsMax)
211    }
212}
213
214/// Ident Normalizer
215#[derive(Debug)]
216pub struct IdentNormalizer {
217    normalize: bool,
218}
219
220impl Default for IdentNormalizer {
221    fn default() -> Self {
222        Self { normalize: true }
223    }
224}
225
226impl IdentNormalizer {
227    pub fn new(normalize: bool) -> Self {
228        Self { normalize }
229    }
230
231    pub fn normalize(&self, ident: Ident) -> String {
232        if self.normalize {
233            crate::utils::normalize_ident(ident)
234        } else {
235            ident.value
236        }
237    }
238}
239
240/// Struct to store the states used by the Planner. The Planner will leverage the states
241/// to resolve CTEs, Views, subqueries and PREPARE statements. The states include
242/// Common Table Expression (CTE) provided with WITH clause and
243/// Parameter Data Types provided with PREPARE statement and the query schema of the
244/// outer query plan.
245///
246/// # Cloning
247///
248/// Only the `ctes` are truly cloned when the `PlannerContext` is cloned.
249/// This helps resolve scoping issues of CTEs.
250/// By using cloning, a subquery can inherit CTEs from the outer query
251/// and can also define its own private CTEs without affecting the outer query.
252///
253#[derive(Debug, Clone)]
254pub struct PlannerContext {
255    /// Data types for numbered parameters ($1, $2, etc), if supplied
256    /// in `PREPARE` statement
257    prepare_param_data_types: Arc<Vec<FieldRef>>,
258    /// Map of CTE name to logical plan of the WITH clause.
259    /// Use `Arc<LogicalPlan>` to allow cheap cloning
260    ctes: HashMap<String, Arc<LogicalPlan>>,
261    /// The query schema of the outer query plan, used to resolve the columns in subquery
262    outer_query_schema: Option<DFSchemaRef>,
263    /// The joined schemas of all FROM clauses planned so far. When planning LATERAL
264    /// FROM clauses, this should become a suffix of the `outer_query_schema`.
265    outer_from_schema: Option<DFSchemaRef>,
266    /// The query schema defined by the table
267    create_table_schema: Option<DFSchemaRef>,
268    /// The lambda introduced columns names
269    lambdas_parameters: HashSet<String>,
270}
271
272impl Default for PlannerContext {
273    fn default() -> Self {
274        Self::new()
275    }
276}
277
278impl PlannerContext {
279    /// Create an empty PlannerContext
280    pub fn new() -> Self {
281        Self {
282            prepare_param_data_types: Arc::new(vec![]),
283            ctes: HashMap::new(),
284            outer_query_schema: None,
285            outer_from_schema: None,
286            create_table_schema: None,
287            lambdas_parameters: HashSet::new(),
288        }
289    }
290
291    /// Update the PlannerContext with provided prepare_param_data_types
292    pub fn with_prepare_param_data_types(
293        mut self,
294        prepare_param_data_types: Vec<FieldRef>,
295    ) -> Self {
296        self.prepare_param_data_types = prepare_param_data_types.into();
297        self
298    }
299
300    // Return a reference to the outer query's schema
301    pub fn outer_query_schema(&self) -> Option<&DFSchema> {
302        self.outer_query_schema.as_ref().map(|s| s.as_ref())
303    }
304
305    /// Sets the outer query schema, returning the existing one, if
306    /// any
307    pub fn set_outer_query_schema(
308        &mut self,
309        mut schema: Option<DFSchemaRef>,
310    ) -> Option<DFSchemaRef> {
311        std::mem::swap(&mut self.outer_query_schema, &mut schema);
312        schema
313    }
314
315    pub fn set_table_schema(
316        &mut self,
317        mut schema: Option<DFSchemaRef>,
318    ) -> Option<DFSchemaRef> {
319        std::mem::swap(&mut self.create_table_schema, &mut schema);
320        schema
321    }
322
323    pub fn table_schema(&self) -> Option<DFSchemaRef> {
324        self.create_table_schema.clone()
325    }
326
327    // Return a clone of the outer FROM schema
328    pub fn outer_from_schema(&self) -> Option<Arc<DFSchema>> {
329        self.outer_from_schema.clone()
330    }
331
332    /// Sets the outer FROM schema, returning the existing one, if any
333    pub fn set_outer_from_schema(
334        &mut self,
335        mut schema: Option<DFSchemaRef>,
336    ) -> Option<DFSchemaRef> {
337        std::mem::swap(&mut self.outer_from_schema, &mut schema);
338        schema
339    }
340
341    /// Extends the FROM schema, returning the existing one, if any
342    pub fn extend_outer_from_schema(&mut self, schema: &DFSchemaRef) -> Result<()> {
343        match self.outer_from_schema.as_mut() {
344            Some(from_schema) => Arc::make_mut(from_schema).merge(schema),
345            None => self.outer_from_schema = Some(Arc::clone(schema)),
346        };
347        Ok(())
348    }
349
350    /// Return the types of parameters (`$1`, `$2`, etc) if known
351    pub fn prepare_param_data_types(&self) -> &[FieldRef] {
352        &self.prepare_param_data_types
353    }
354
355    /// Returns true if there is a Common Table Expression (CTE) /
356    /// Subquery for the specified name
357    pub fn contains_cte(&self, cte_name: &str) -> bool {
358        self.ctes.contains_key(cte_name)
359    }
360
361    /// Inserts a LogicalPlan for the Common Table Expression (CTE) /
362    /// Subquery for the specified name
363    pub fn insert_cte(&mut self, cte_name: impl Into<String>, plan: LogicalPlan) {
364        let cte_name = cte_name.into();
365        self.ctes.insert(cte_name, Arc::new(plan));
366    }
367
368    /// Return a plan for the Common Table Expression (CTE) / Subquery for the
369    /// specified name
370    pub fn get_cte(&self, cte_name: &str) -> Option<&LogicalPlan> {
371        self.ctes.get(cte_name).map(|cte| cte.as_ref())
372    }
373
374    pub fn lambdas_parameters(&self) -> &HashSet<String> {
375        &self.lambdas_parameters
376    }
377
378    pub fn with_lambda_parameters(
379        mut self,
380        arguments: impl IntoIterator<Item = String>,
381    ) -> Self {
382        self.lambdas_parameters.extend(arguments);
383
384        self
385    }
386
387    /// Remove the plan of CTE / Subquery for the specified name
388    pub(super) fn remove_cte(&mut self, cte_name: &str) {
389        self.ctes.remove(cte_name);
390    }
391}
392
393/// SQL query planner and binder
394///
395/// This struct is used to convert a SQL AST into a [`LogicalPlan`].
396///
397/// You can control the behavior of the planner by providing [`ParserOptions`].
398///
399/// It performs the following tasks:
400///
401/// 1. Name and type resolution (called "binding" in other systems). This
402///    phase looks up table and column names using the [`ContextProvider`].
403/// 2. Mechanical translation of the AST into a [`LogicalPlan`].
404///
405/// It does not perform type coercion, or perform optimization, which are done
406/// by subsequent passes.
407///
408/// Key interfaces are:
409/// * [`Self::sql_statement_to_plan`]: Convert a statement
410///   (e.g. `SELECT ...`) into a [`LogicalPlan`]
411/// * [`Self::sql_to_expr`]: Convert an expression (e.g. `1 + 2`) into an [`Expr`]
412pub struct SqlToRel<'a, S: ContextProvider> {
413    pub(crate) context_provider: &'a S,
414    pub(crate) options: ParserOptions,
415    pub(crate) ident_normalizer: IdentNormalizer,
416}
417
418impl<'a, S: ContextProvider> SqlToRel<'a, S> {
419    /// Create a new query planner.
420    ///
421    /// The query planner derives the parser options from the context provider.
422    pub fn new(context_provider: &'a S) -> Self {
423        let parser_options = ParserOptions::from(&context_provider.options().sql_parser);
424        Self::new_with_options(context_provider, parser_options)
425    }
426
427    /// Create a new query planner with the given parser options.
428    ///
429    /// The query planner ignores the parser options from the context provider
430    /// and uses the given parser options instead.
431    pub fn new_with_options(context_provider: &'a S, options: ParserOptions) -> Self {
432        let ident_normalize = options.enable_ident_normalization;
433
434        SqlToRel {
435            context_provider,
436            options,
437            ident_normalizer: IdentNormalizer::new(ident_normalize),
438        }
439    }
440
441    pub fn build_schema(&self, columns: Vec<SQLColumnDef>) -> Result<Schema> {
442        let mut fields = Vec::with_capacity(columns.len());
443
444        for column in columns {
445            let data_type = self.convert_data_type_to_field(&column.data_type)?;
446            let not_nullable = column
447                .options
448                .iter()
449                .any(|x| x.option == ColumnOption::NotNull);
450            fields.push(
451                data_type
452                    .as_ref()
453                    .clone()
454                    .with_name(self.ident_normalizer.normalize(column.name))
455                    .with_nullable(!not_nullable),
456            );
457        }
458
459        Ok(Schema::new(fields))
460    }
461
462    /// Returns a vector of (column_name, default_expr) pairs
463    pub(super) fn build_column_defaults(
464        &self,
465        columns: &Vec<SQLColumnDef>,
466        planner_context: &mut PlannerContext,
467    ) -> Result<Vec<(String, Expr)>> {
468        let mut column_defaults = vec![];
469        // Default expressions are restricted, column references are not allowed
470        let empty_schema = DFSchema::empty();
471        let error_desc = |e: DataFusionError| match e {
472            DataFusionError::SchemaError(ref err, _)
473                if matches!(**err, SchemaError::FieldNotFound { .. }) =>
474            {
475                plan_datafusion_err!(
476                    "Column reference is not allowed in the DEFAULT expression : {}",
477                    e
478                )
479            }
480            _ => e,
481        };
482
483        for column in columns {
484            if let Some(default_sql_expr) =
485                column.options.iter().find_map(|o| match &o.option {
486                    ColumnOption::Default(expr) => Some(expr),
487                    _ => None,
488                })
489            {
490                let default_expr = self
491                    .sql_to_expr(default_sql_expr.clone(), &empty_schema, planner_context)
492                    .map_err(error_desc)?;
493                column_defaults.push((
494                    self.ident_normalizer.normalize(column.name.clone()),
495                    default_expr,
496                ));
497            }
498        }
499        Ok(column_defaults)
500    }
501
502    /// Apply the given TableAlias to the input plan
503    pub(crate) fn apply_table_alias(
504        &self,
505        plan: LogicalPlan,
506        alias: TableAlias,
507    ) -> Result<LogicalPlan> {
508        let idents = alias.columns.into_iter().map(|c| c.name).collect();
509        let plan = self.apply_expr_alias(plan, idents)?;
510
511        LogicalPlanBuilder::from(plan)
512            .alias(TableReference::bare(
513                self.ident_normalizer.normalize(alias.name),
514            ))?
515            .build()
516    }
517
518    pub(crate) fn apply_expr_alias(
519        &self,
520        plan: LogicalPlan,
521        idents: Vec<Ident>,
522    ) -> Result<LogicalPlan> {
523        if idents.is_empty() {
524            Ok(plan)
525        } else if idents.len() != plan.schema().fields().len() {
526            plan_err!(
527                "Source table contains {} columns but only {} \
528                names given as column alias",
529                plan.schema().fields().len(),
530                idents.len()
531            )
532        } else {
533            let fields = plan.schema().fields().clone();
534            LogicalPlanBuilder::from(plan)
535                .project(fields.iter().zip(idents.into_iter()).map(|(field, ident)| {
536                    col(field.name()).alias(self.ident_normalizer.normalize(ident))
537                }))?
538                .build()
539        }
540    }
541
542    /// Validate the schema provides all of the columns referenced in the expressions.
543    pub(crate) fn validate_schema_satisfies_exprs(
544        &self,
545        schema: &DFSchema,
546        exprs: &[Expr],
547    ) -> Result<()> {
548        exprs
549            .iter()
550            .flat_map(|expr| expr.column_refs())
551            .try_for_each(|col| {
552                match &col.relation {
553                    Some(r) => schema.field_with_qualified_name(r, &col.name).map(|_| ()),
554                    None => {
555                        if !schema.fields_with_unqualified_name(&col.name).is_empty() {
556                            Ok(())
557                        } else {
558                            Err(field_not_found(
559                                col.relation.clone(),
560                                col.name.as_str(),
561                                schema,
562                            ))
563                        }
564                    }
565                }
566                .map_err(|err: DataFusionError| match &err {
567                    DataFusionError::SchemaError(inner, _)
568                        if matches!(
569                            inner.as_ref(),
570                            SchemaError::FieldNotFound { .. }
571                        ) =>
572                    {
573                        let SchemaError::FieldNotFound {
574                            field,
575                            valid_fields,
576                        } = inner.as_ref()
577                        else {
578                            unreachable!()
579                        };
580                        let mut diagnostic = if let Some(relation) = &col.relation {
581                            Diagnostic::new_error(
582                                format!(
583                                    "column '{}' not found in '{}'",
584                                    &col.name, relation
585                                ),
586                                col.spans().first(),
587                            )
588                        } else {
589                            Diagnostic::new_error(
590                                format!("column '{}' not found", &col.name),
591                                col.spans().first(),
592                            )
593                        };
594                        add_possible_columns_to_diag(
595                            &mut diagnostic,
596                            field,
597                            valid_fields,
598                        );
599                        err.with_diagnostic(diagnostic)
600                    }
601                    _ => err,
602                })
603            })
604    }
605
606    pub(crate) fn convert_data_type_to_field(
607        &self,
608        sql_type: &SQLDataType,
609    ) -> Result<FieldRef> {
610        // First check if any of the registered type_planner can handle this type
611        if let Some(type_planner) = self.context_provider.get_type_planner() {
612            if let Some(data_type) = type_planner.plan_type(sql_type)? {
613                return Ok(data_type.into_nullable_field_ref());
614            }
615        }
616
617        // If no type_planner can handle this type, use the default conversion
618        match sql_type {
619            SQLDataType::Array(ArrayElemTypeDef::AngleBracket(inner_sql_type)) => {
620                // Arrays may be multi-dimensional.
621                Ok(self.convert_data_type_to_field(inner_sql_type)?.into_list())
622            }
623            SQLDataType::Array(ArrayElemTypeDef::SquareBracket(
624                inner_sql_type,
625                maybe_array_size,
626            )) => {
627                let inner_field = self.convert_data_type_to_field(inner_sql_type)?;
628                if let Some(array_size) = maybe_array_size {
629                    let array_size: i32 = (*array_size).try_into().map_err(|_| {
630                        plan_datafusion_err!(
631                            "Array size must be a positive 32 bit integer, got {array_size}"
632                        )
633                    })?;
634                    Ok(inner_field.into_fixed_size_list(array_size))
635                } else {
636                    Ok(inner_field.into_list())
637                }
638            }
639            SQLDataType::Array(ArrayElemTypeDef::None) => {
640                not_impl_err!("Arrays with unspecified type is not supported")
641            }
642            other => Ok(self
643                .convert_simple_data_type(other)?
644                .into_nullable_field_ref()),
645        }
646    }
647
648    fn convert_simple_data_type(&self, sql_type: &SQLDataType) -> Result<DataType> {
649        match sql_type {
650            SQLDataType::Boolean | SQLDataType::Bool => Ok(DataType::Boolean),
651            SQLDataType::TinyInt(_) => Ok(DataType::Int8),
652            SQLDataType::SmallInt(_) | SQLDataType::Int2(_) => Ok(DataType::Int16),
653            SQLDataType::Int(_) | SQLDataType::Integer(_) | SQLDataType::Int4(_) => {
654                Ok(DataType::Int32)
655            }
656            SQLDataType::BigInt(_) | SQLDataType::Int8(_) => Ok(DataType::Int64),
657            SQLDataType::TinyIntUnsigned(_) => Ok(DataType::UInt8),
658            SQLDataType::SmallIntUnsigned(_) | SQLDataType::Int2Unsigned(_) => {
659                Ok(DataType::UInt16)
660            }
661            SQLDataType::IntUnsigned(_)
662            | SQLDataType::IntegerUnsigned(_)
663            | SQLDataType::Int4Unsigned(_) => Ok(DataType::UInt32),
664            SQLDataType::Varchar(length) => {
665                match (length, self.options.support_varchar_with_length) {
666                    (Some(_), false) => plan_err!(
667                        "does not support Varchar with length, \
668                    please set `support_varchar_with_length` to be true"
669                    ),
670                    _ => {
671                        if self.options.map_string_types_to_utf8view {
672                            Ok(DataType::Utf8View)
673                        } else {
674                            Ok(DataType::Utf8)
675                        }
676                    }
677                }
678            }
679            SQLDataType::BigIntUnsigned(_) | SQLDataType::Int8Unsigned(_) => {
680                Ok(DataType::UInt64)
681            }
682            SQLDataType::Float(_) => Ok(DataType::Float32),
683            SQLDataType::Real | SQLDataType::Float4 => Ok(DataType::Float32),
684            SQLDataType::Double(ExactNumberInfo::None)
685            | SQLDataType::DoublePrecision
686            | SQLDataType::Float8 => Ok(DataType::Float64),
687            SQLDataType::Double(
688                ExactNumberInfo::Precision(_) | ExactNumberInfo::PrecisionAndScale(_, _),
689            ) => {
690                not_impl_err!(
691                    "Unsupported SQL type (precision/scale not supported) {sql_type}"
692                )
693            }
694            SQLDataType::Char(_) | SQLDataType::Text | SQLDataType::String(_) => {
695                if self.options.map_string_types_to_utf8view {
696                    Ok(DataType::Utf8View)
697                } else {
698                    Ok(DataType::Utf8)
699                }
700            }
701            SQLDataType::Timestamp(precision, tz_info)
702                if precision.is_none() || [0, 3, 6, 9].contains(&precision.unwrap()) =>
703            {
704                let tz = if matches!(tz_info, TimezoneInfo::Tz)
705                    || matches!(tz_info, TimezoneInfo::WithTimeZone)
706                {
707                    // Timestamp With Time Zone
708                    // INPUT : [SQLDataType]   TimestampTz + [Config] Time Zone
709                    // OUTPUT: [ArrowDataType] Timestamp<TimeUnit, Some(Time Zone)>
710                    self.context_provider.options().execution.time_zone.clone()
711                } else {
712                    // Timestamp Without Time zone
713                    None
714                };
715                let precision = match precision {
716                    Some(0) => TimeUnit::Second,
717                    Some(3) => TimeUnit::Millisecond,
718                    Some(6) => TimeUnit::Microsecond,
719                    None | Some(9) => TimeUnit::Nanosecond,
720                    _ => unreachable!(),
721                };
722                Ok(DataType::Timestamp(precision, tz.map(Into::into)))
723            }
724            SQLDataType::Date => Ok(DataType::Date32),
725            SQLDataType::Time(None, tz_info) => {
726                if matches!(tz_info, TimezoneInfo::None)
727                    || matches!(tz_info, TimezoneInfo::WithoutTimeZone)
728                {
729                    Ok(DataType::Time64(TimeUnit::Nanosecond))
730                } else {
731                    // We don't support TIMETZ and TIME WITH TIME ZONE for now
732                    not_impl_err!("Unsupported SQL type {sql_type}")
733                }
734            }
735            SQLDataType::Numeric(exact_number_info)
736            | SQLDataType::Decimal(exact_number_info) => {
737                let (precision, scale) = match *exact_number_info {
738                    ExactNumberInfo::None => (None, None),
739                    ExactNumberInfo::Precision(precision) => (Some(precision), None),
740                    ExactNumberInfo::PrecisionAndScale(precision, scale) => {
741                        (Some(precision), Some(scale))
742                    }
743                };
744                make_decimal_type(precision, scale.map(|s| s as u64))
745            }
746            SQLDataType::Bytea => Ok(DataType::Binary),
747            SQLDataType::Interval { fields, precision } => {
748                if fields.is_some() || precision.is_some() {
749                    return not_impl_err!("Unsupported SQL type {sql_type}");
750                }
751                Ok(DataType::Interval(IntervalUnit::MonthDayNano))
752            }
753            SQLDataType::Struct(fields, _) => {
754                let fields = fields
755                    .iter()
756                    .enumerate()
757                    .map(|(idx, sql_struct_field)| {
758                        let field = self.convert_data_type_to_field(&sql_struct_field.field_type)?;
759                        let field_name = match &sql_struct_field.field_name {
760                            Some(ident) => ident.clone(),
761                            None => Ident::new(format!("c{idx}")),
762                        };
763                        Ok(field.as_ref().clone().with_name(self.ident_normalizer.normalize(field_name)))
764                    })
765                    .collect::<Result<Vec<_>>>()?;
766                Ok(DataType::Struct(Fields::from(fields)))
767            }
768            SQLDataType::Nvarchar(_)
769            | SQLDataType::JSON
770            | SQLDataType::Uuid
771            | SQLDataType::Binary(_)
772            | SQLDataType::Varbinary(_)
773            | SQLDataType::Blob(_)
774            | SQLDataType::Datetime(_)
775            | SQLDataType::Regclass
776            | SQLDataType::Custom(_, _)
777            | SQLDataType::Array(_)
778            | SQLDataType::Enum(_, _)
779            | SQLDataType::Set(_)
780            | SQLDataType::MediumInt(_)
781            | SQLDataType::MediumIntUnsigned(_)
782            | SQLDataType::Character(_)
783            | SQLDataType::CharacterVarying(_)
784            | SQLDataType::CharVarying(_)
785            | SQLDataType::CharacterLargeObject(_)
786            | SQLDataType::CharLargeObject(_)
787            | SQLDataType::Timestamp(_, _)
788            | SQLDataType::Time(Some(_), _)
789            | SQLDataType::Dec(_)
790            | SQLDataType::BigNumeric(_)
791            | SQLDataType::BigDecimal(_)
792            | SQLDataType::Clob(_)
793            | SQLDataType::Bytes(_)
794            | SQLDataType::Int64
795            | SQLDataType::Float64
796            | SQLDataType::JSONB
797            | SQLDataType::Unspecified
798            | SQLDataType::Int16
799            | SQLDataType::Int32
800            | SQLDataType::Int128
801            | SQLDataType::Int256
802            | SQLDataType::UInt8
803            | SQLDataType::UInt16
804            | SQLDataType::UInt32
805            | SQLDataType::UInt64
806            | SQLDataType::UInt128
807            | SQLDataType::UInt256
808            | SQLDataType::Float32
809            | SQLDataType::Date32
810            | SQLDataType::Datetime64(_, _)
811            | SQLDataType::FixedString(_)
812            | SQLDataType::Map(_, _)
813            | SQLDataType::Tuple(_)
814            | SQLDataType::Nested(_)
815            | SQLDataType::Union(_)
816            | SQLDataType::Nullable(_)
817            | SQLDataType::LowCardinality(_)
818            | SQLDataType::Trigger
819            | SQLDataType::TinyBlob
820            | SQLDataType::MediumBlob
821            | SQLDataType::LongBlob
822            | SQLDataType::TinyText
823            | SQLDataType::MediumText
824            | SQLDataType::LongText
825            | SQLDataType::Bit(_)
826            | SQLDataType::BitVarying(_)
827            | SQLDataType::Signed
828            | SQLDataType::SignedInteger
829            | SQLDataType::Unsigned
830            | SQLDataType::UnsignedInteger
831            | SQLDataType::AnyType
832            | SQLDataType::Table(_)
833            | SQLDataType::VarBit(_)
834            | SQLDataType::UTinyInt
835            | SQLDataType::USmallInt
836            | SQLDataType::HugeInt
837            | SQLDataType::UHugeInt
838            | SQLDataType::UBigInt
839            | SQLDataType::TimestampNtz
840            | SQLDataType::NamedTable { .. }
841            | SQLDataType::TsVector
842            | SQLDataType::TsQuery
843            | SQLDataType::GeometricType(_)
844            | SQLDataType::DecimalUnsigned(_) // deprecated mysql type
845            | SQLDataType::FloatUnsigned(_) // deprecated mysql type
846            | SQLDataType::RealUnsigned // deprecated mysql type
847            | SQLDataType::DecUnsigned(_) // deprecated mysql type
848            | SQLDataType::DoubleUnsigned(_) // deprecated mysql type
849            | SQLDataType::DoublePrecisionUnsigned // deprecated mysql type
850            => {
851                not_impl_err!("Unsupported SQL type {sql_type}")
852            }
853        }
854    }
855
856    pub(crate) fn object_name_to_table_reference(
857        &self,
858        object_name: ObjectName,
859    ) -> Result<TableReference> {
860        object_name_to_table_reference(
861            object_name,
862            self.options.enable_ident_normalization,
863        )
864    }
865}
866
867/// Create a [`TableReference`] after normalizing the specified ObjectName
868///
869/// Examples
870/// ```text
871/// ['foo']          -> Bare { table: "foo" }
872/// ['"foo.bar"]]    -> Bare { table: "foo.bar" }
873/// ['foo', 'Bar']   -> Partial { schema: "foo", table: "bar" } <-- note lower case "bar"
874/// ['foo', 'bar']   -> Partial { schema: "foo", table: "bar" }
875/// ['foo', '"Bar"'] -> Partial { schema: "foo", table: "Bar" }
876/// ```
877pub fn object_name_to_table_reference(
878    object_name: ObjectName,
879    enable_normalization: bool,
880) -> Result<TableReference> {
881    // Use destructure to make it clear no fields on ObjectName are ignored
882    let ObjectName(object_name_parts) = object_name;
883    let idents = object_name_parts
884        .into_iter()
885        .map(|object_name_part| {
886            object_name_part.as_ident().cloned().ok_or_else(|| {
887                plan_datafusion_err!(
888                    "Expected identifier, but found: {:?}",
889                    object_name_part
890                )
891            })
892        })
893        .collect::<Result<Vec<_>>>()?;
894    idents_to_table_reference(idents, enable_normalization)
895}
896
897struct IdentTaker {
898    normalizer: IdentNormalizer,
899    idents: Vec<Ident>,
900}
901
902/// Take the next identifier from the back of idents, panic'ing if
903/// there are none left
904impl IdentTaker {
905    fn new(idents: Vec<Ident>, enable_normalization: bool) -> Self {
906        Self {
907            normalizer: IdentNormalizer::new(enable_normalization),
908            idents,
909        }
910    }
911
912    fn take(&mut self) -> String {
913        let ident = self.idents.pop().expect("no more identifiers");
914        self.normalizer.normalize(ident)
915    }
916
917    /// Returns the number of remaining identifiers
918    fn len(&self) -> usize {
919        self.idents.len()
920    }
921}
922
923// impl Display for a nicer error message
924impl std::fmt::Display for IdentTaker {
925    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
926        let mut first = true;
927        for ident in self.idents.iter() {
928            if !first {
929                write!(f, ".")?;
930            }
931            write!(f, "{ident}")?;
932            first = false;
933        }
934
935        Ok(())
936    }
937}
938
939/// Create a [`TableReference`] after normalizing the specified identifier
940pub(crate) fn idents_to_table_reference(
941    idents: Vec<Ident>,
942    enable_normalization: bool,
943) -> Result<TableReference> {
944    let mut taker = IdentTaker::new(idents, enable_normalization);
945
946    match taker.len() {
947        1 => {
948            let table = taker.take();
949            Ok(TableReference::bare(table))
950        }
951        2 => {
952            let table = taker.take();
953            let schema = taker.take();
954            Ok(TableReference::partial(schema, table))
955        }
956        3 => {
957            let table = taker.take();
958            let schema = taker.take();
959            let catalog = taker.take();
960            Ok(TableReference::full(catalog, schema, table))
961        }
962        _ => plan_err!(
963            "Unsupported compound identifier '{}'. Expected 1, 2 or 3 parts, got {}",
964            taker,
965            taker.len()
966        ),
967    }
968}
969
970/// Construct a WHERE qualifier suitable for e.g. information_schema filtering
971/// from the provided object identifiers (catalog, schema and table names).
972pub fn object_name_to_qualifier(
973    sql_table_name: &ObjectName,
974    enable_normalization: bool,
975) -> Result<String> {
976    let columns = vec!["table_name", "table_schema", "table_catalog"].into_iter();
977    let normalizer = IdentNormalizer::new(enable_normalization);
978    sql_table_name
979        .0
980        .iter()
981        .rev()
982        .zip(columns)
983        .map(|(object_name_part, column_name)| {
984            object_name_part
985                .as_ident()
986                .map(|ident| {
987                    format!(
988                        r#"{} = '{}'"#,
989                        column_name,
990                        normalizer.normalize(ident.clone())
991                    )
992                })
993                .ok_or_else(|| {
994                    plan_datafusion_err!(
995                        "Expected identifier, but found: {:?}",
996                        object_name_part
997                    )
998                })
999        })
1000        .collect::<Result<Vec<_>>>()
1001        .map(|parts| parts.join(" AND "))
1002}