1use std::collections::HashMap;
20use std::str::FromStr;
21use std::sync::Arc;
22use std::vec;
23
24use crate::utils::make_decimal_type;
25use arrow::datatypes::*;
26use datafusion_common::config::SqlParserOptions;
27use datafusion_common::datatype::{DataTypeExt, FieldExt};
28use datafusion_common::error::add_possible_columns_to_diag;
29use datafusion_common::TableReference;
30use datafusion_common::{
31 field_not_found, plan_datafusion_err, DFSchemaRef, Diagnostic, HashSet, SchemaError,
32};
33use datafusion_common::{not_impl_err, plan_err, DFSchema, DataFusionError, Result};
34use datafusion_expr::logical_plan::{LogicalPlan, LogicalPlanBuilder};
35pub use datafusion_expr::planner::ContextProvider;
36use datafusion_expr::{col, Expr};
37use sqlparser::ast::{ArrayElemTypeDef, ExactNumberInfo, TimezoneInfo};
38use sqlparser::ast::{ColumnDef as SQLColumnDef, ColumnOption};
39use sqlparser::ast::{DataType as SQLDataType, Ident, ObjectName, TableAlias};
40
41#[derive(Debug, Clone, Copy)]
43pub struct ParserOptions {
44 pub parse_float_as_decimal: bool,
46 pub enable_ident_normalization: bool,
48 pub support_varchar_with_length: bool,
50 pub enable_options_value_normalization: bool,
52 pub collect_spans: bool,
54 pub map_string_types_to_utf8view: bool,
56 pub default_null_ordering: NullOrdering,
58}
59
60impl ParserOptions {
61 pub fn new() -> Self {
72 Self {
73 parse_float_as_decimal: false,
74 enable_ident_normalization: true,
75 support_varchar_with_length: true,
76 map_string_types_to_utf8view: true,
77 enable_options_value_normalization: false,
78 collect_spans: false,
79 default_null_ordering: NullOrdering::NullsMax,
82 }
83 }
84
85 pub fn with_parse_float_as_decimal(mut self, value: bool) -> Self {
95 self.parse_float_as_decimal = value;
96 self
97 }
98
99 pub fn with_enable_ident_normalization(mut self, value: bool) -> Self {
109 self.enable_ident_normalization = value;
110 self
111 }
112
113 pub fn with_support_varchar_with_length(mut self, value: bool) -> Self {
115 self.support_varchar_with_length = value;
116 self
117 }
118
119 pub fn with_map_string_types_to_utf8view(mut self, value: bool) -> Self {
121 self.map_string_types_to_utf8view = value;
122 self
123 }
124
125 pub fn with_enable_options_value_normalization(mut self, value: bool) -> Self {
127 self.enable_options_value_normalization = value;
128 self
129 }
130
131 pub fn with_collect_spans(mut self, value: bool) -> Self {
133 self.collect_spans = value;
134 self
135 }
136
137 pub fn with_default_null_ordering(mut self, value: NullOrdering) -> Self {
139 self.default_null_ordering = value;
140 self
141 }
142}
143
144impl Default for ParserOptions {
145 fn default() -> Self {
146 Self::new()
147 }
148}
149
150impl From<&SqlParserOptions> for ParserOptions {
151 fn from(options: &SqlParserOptions) -> Self {
152 Self {
153 parse_float_as_decimal: options.parse_float_as_decimal,
154 enable_ident_normalization: options.enable_ident_normalization,
155 support_varchar_with_length: options.support_varchar_with_length,
156 map_string_types_to_utf8view: options.map_string_types_to_utf8view,
157 enable_options_value_normalization: options
158 .enable_options_value_normalization,
159 collect_spans: options.collect_spans,
160 default_null_ordering: options.default_null_ordering.as_str().into(),
161 }
162 }
163}
164
165#[derive(Debug, Clone, Copy)]
167pub enum NullOrdering {
168 NullsMax,
170 NullsMin,
172 NullsFirst,
174 NullsLast,
176}
177
178impl NullOrdering {
179 pub fn nulls_first(&self, asc: bool) -> bool {
185 match self {
186 Self::NullsMax => !asc,
187 Self::NullsMin => asc,
188 Self::NullsFirst => true,
189 Self::NullsLast => false,
190 }
191 }
192}
193
194impl FromStr for NullOrdering {
195 type Err = DataFusionError;
196
197 fn from_str(s: &str) -> Result<Self> {
198 match s {
199 "nulls_max" => Ok(Self::NullsMax),
200 "nulls_min" => Ok(Self::NullsMin),
201 "nulls_first" => Ok(Self::NullsFirst),
202 "nulls_last" => Ok(Self::NullsLast),
203 _ => plan_err!("Unknown null ordering: Expected one of 'nulls_first', 'nulls_last', 'nulls_min' or 'nulls_max'. Got {s}"),
204 }
205 }
206}
207
208impl From<&str> for NullOrdering {
209 fn from(s: &str) -> Self {
210 Self::from_str(s).unwrap_or(Self::NullsMax)
211 }
212}
213
214#[derive(Debug)]
216pub struct IdentNormalizer {
217 normalize: bool,
218}
219
220impl Default for IdentNormalizer {
221 fn default() -> Self {
222 Self { normalize: true }
223 }
224}
225
226impl IdentNormalizer {
227 pub fn new(normalize: bool) -> Self {
228 Self { normalize }
229 }
230
231 pub fn normalize(&self, ident: Ident) -> String {
232 if self.normalize {
233 crate::utils::normalize_ident(ident)
234 } else {
235 ident.value
236 }
237 }
238}
239
240#[derive(Debug, Clone)]
254pub struct PlannerContext {
255 prepare_param_data_types: Arc<Vec<FieldRef>>,
258 ctes: HashMap<String, Arc<LogicalPlan>>,
261 outer_query_schema: Option<DFSchemaRef>,
263 outer_from_schema: Option<DFSchemaRef>,
266 create_table_schema: Option<DFSchemaRef>,
268 lambdas_parameters: HashSet<String>,
270}
271
272impl Default for PlannerContext {
273 fn default() -> Self {
274 Self::new()
275 }
276}
277
278impl PlannerContext {
279 pub fn new() -> Self {
281 Self {
282 prepare_param_data_types: Arc::new(vec![]),
283 ctes: HashMap::new(),
284 outer_query_schema: None,
285 outer_from_schema: None,
286 create_table_schema: None,
287 lambdas_parameters: HashSet::new(),
288 }
289 }
290
291 pub fn with_prepare_param_data_types(
293 mut self,
294 prepare_param_data_types: Vec<FieldRef>,
295 ) -> Self {
296 self.prepare_param_data_types = prepare_param_data_types.into();
297 self
298 }
299
300 pub fn outer_query_schema(&self) -> Option<&DFSchema> {
302 self.outer_query_schema.as_ref().map(|s| s.as_ref())
303 }
304
305 pub fn set_outer_query_schema(
308 &mut self,
309 mut schema: Option<DFSchemaRef>,
310 ) -> Option<DFSchemaRef> {
311 std::mem::swap(&mut self.outer_query_schema, &mut schema);
312 schema
313 }
314
315 pub fn set_table_schema(
316 &mut self,
317 mut schema: Option<DFSchemaRef>,
318 ) -> Option<DFSchemaRef> {
319 std::mem::swap(&mut self.create_table_schema, &mut schema);
320 schema
321 }
322
323 pub fn table_schema(&self) -> Option<DFSchemaRef> {
324 self.create_table_schema.clone()
325 }
326
327 pub fn outer_from_schema(&self) -> Option<Arc<DFSchema>> {
329 self.outer_from_schema.clone()
330 }
331
332 pub fn set_outer_from_schema(
334 &mut self,
335 mut schema: Option<DFSchemaRef>,
336 ) -> Option<DFSchemaRef> {
337 std::mem::swap(&mut self.outer_from_schema, &mut schema);
338 schema
339 }
340
341 pub fn extend_outer_from_schema(&mut self, schema: &DFSchemaRef) -> Result<()> {
343 match self.outer_from_schema.as_mut() {
344 Some(from_schema) => Arc::make_mut(from_schema).merge(schema),
345 None => self.outer_from_schema = Some(Arc::clone(schema)),
346 };
347 Ok(())
348 }
349
350 pub fn prepare_param_data_types(&self) -> &[FieldRef] {
352 &self.prepare_param_data_types
353 }
354
355 pub fn contains_cte(&self, cte_name: &str) -> bool {
358 self.ctes.contains_key(cte_name)
359 }
360
361 pub fn insert_cte(&mut self, cte_name: impl Into<String>, plan: LogicalPlan) {
364 let cte_name = cte_name.into();
365 self.ctes.insert(cte_name, Arc::new(plan));
366 }
367
368 pub fn get_cte(&self, cte_name: &str) -> Option<&LogicalPlan> {
371 self.ctes.get(cte_name).map(|cte| cte.as_ref())
372 }
373
374 pub fn lambdas_parameters(&self) -> &HashSet<String> {
375 &self.lambdas_parameters
376 }
377
378 pub fn with_lambda_parameters(
379 mut self,
380 arguments: impl IntoIterator<Item = String>,
381 ) -> Self {
382 self.lambdas_parameters.extend(arguments);
383
384 self
385 }
386
387 pub(super) fn remove_cte(&mut self, cte_name: &str) {
389 self.ctes.remove(cte_name);
390 }
391}
392
393pub struct SqlToRel<'a, S: ContextProvider> {
413 pub(crate) context_provider: &'a S,
414 pub(crate) options: ParserOptions,
415 pub(crate) ident_normalizer: IdentNormalizer,
416}
417
418impl<'a, S: ContextProvider> SqlToRel<'a, S> {
419 pub fn new(context_provider: &'a S) -> Self {
423 let parser_options = ParserOptions::from(&context_provider.options().sql_parser);
424 Self::new_with_options(context_provider, parser_options)
425 }
426
427 pub fn new_with_options(context_provider: &'a S, options: ParserOptions) -> Self {
432 let ident_normalize = options.enable_ident_normalization;
433
434 SqlToRel {
435 context_provider,
436 options,
437 ident_normalizer: IdentNormalizer::new(ident_normalize),
438 }
439 }
440
441 pub fn build_schema(&self, columns: Vec<SQLColumnDef>) -> Result<Schema> {
442 let mut fields = Vec::with_capacity(columns.len());
443
444 for column in columns {
445 let data_type = self.convert_data_type_to_field(&column.data_type)?;
446 let not_nullable = column
447 .options
448 .iter()
449 .any(|x| x.option == ColumnOption::NotNull);
450 fields.push(
451 data_type
452 .as_ref()
453 .clone()
454 .with_name(self.ident_normalizer.normalize(column.name))
455 .with_nullable(!not_nullable),
456 );
457 }
458
459 Ok(Schema::new(fields))
460 }
461
462 pub(super) fn build_column_defaults(
464 &self,
465 columns: &Vec<SQLColumnDef>,
466 planner_context: &mut PlannerContext,
467 ) -> Result<Vec<(String, Expr)>> {
468 let mut column_defaults = vec![];
469 let empty_schema = DFSchema::empty();
471 let error_desc = |e: DataFusionError| match e {
472 DataFusionError::SchemaError(ref err, _)
473 if matches!(**err, SchemaError::FieldNotFound { .. }) =>
474 {
475 plan_datafusion_err!(
476 "Column reference is not allowed in the DEFAULT expression : {}",
477 e
478 )
479 }
480 _ => e,
481 };
482
483 for column in columns {
484 if let Some(default_sql_expr) =
485 column.options.iter().find_map(|o| match &o.option {
486 ColumnOption::Default(expr) => Some(expr),
487 _ => None,
488 })
489 {
490 let default_expr = self
491 .sql_to_expr(default_sql_expr.clone(), &empty_schema, planner_context)
492 .map_err(error_desc)?;
493 column_defaults.push((
494 self.ident_normalizer.normalize(column.name.clone()),
495 default_expr,
496 ));
497 }
498 }
499 Ok(column_defaults)
500 }
501
502 pub(crate) fn apply_table_alias(
504 &self,
505 plan: LogicalPlan,
506 alias: TableAlias,
507 ) -> Result<LogicalPlan> {
508 let idents = alias.columns.into_iter().map(|c| c.name).collect();
509 let plan = self.apply_expr_alias(plan, idents)?;
510
511 LogicalPlanBuilder::from(plan)
512 .alias(TableReference::bare(
513 self.ident_normalizer.normalize(alias.name),
514 ))?
515 .build()
516 }
517
518 pub(crate) fn apply_expr_alias(
519 &self,
520 plan: LogicalPlan,
521 idents: Vec<Ident>,
522 ) -> Result<LogicalPlan> {
523 if idents.is_empty() {
524 Ok(plan)
525 } else if idents.len() != plan.schema().fields().len() {
526 plan_err!(
527 "Source table contains {} columns but only {} \
528 names given as column alias",
529 plan.schema().fields().len(),
530 idents.len()
531 )
532 } else {
533 let fields = plan.schema().fields().clone();
534 LogicalPlanBuilder::from(plan)
535 .project(fields.iter().zip(idents.into_iter()).map(|(field, ident)| {
536 col(field.name()).alias(self.ident_normalizer.normalize(ident))
537 }))?
538 .build()
539 }
540 }
541
542 pub(crate) fn validate_schema_satisfies_exprs(
544 &self,
545 schema: &DFSchema,
546 exprs: &[Expr],
547 ) -> Result<()> {
548 exprs
549 .iter()
550 .flat_map(|expr| expr.column_refs())
551 .try_for_each(|col| {
552 match &col.relation {
553 Some(r) => schema.field_with_qualified_name(r, &col.name).map(|_| ()),
554 None => {
555 if !schema.fields_with_unqualified_name(&col.name).is_empty() {
556 Ok(())
557 } else {
558 Err(field_not_found(
559 col.relation.clone(),
560 col.name.as_str(),
561 schema,
562 ))
563 }
564 }
565 }
566 .map_err(|err: DataFusionError| match &err {
567 DataFusionError::SchemaError(inner, _)
568 if matches!(
569 inner.as_ref(),
570 SchemaError::FieldNotFound { .. }
571 ) =>
572 {
573 let SchemaError::FieldNotFound {
574 field,
575 valid_fields,
576 } = inner.as_ref()
577 else {
578 unreachable!()
579 };
580 let mut diagnostic = if let Some(relation) = &col.relation {
581 Diagnostic::new_error(
582 format!(
583 "column '{}' not found in '{}'",
584 &col.name, relation
585 ),
586 col.spans().first(),
587 )
588 } else {
589 Diagnostic::new_error(
590 format!("column '{}' not found", &col.name),
591 col.spans().first(),
592 )
593 };
594 add_possible_columns_to_diag(
595 &mut diagnostic,
596 field,
597 valid_fields,
598 );
599 err.with_diagnostic(diagnostic)
600 }
601 _ => err,
602 })
603 })
604 }
605
606 pub(crate) fn convert_data_type_to_field(
607 &self,
608 sql_type: &SQLDataType,
609 ) -> Result<FieldRef> {
610 if let Some(type_planner) = self.context_provider.get_type_planner() {
612 if let Some(data_type) = type_planner.plan_type(sql_type)? {
613 return Ok(data_type.into_nullable_field_ref());
614 }
615 }
616
617 match sql_type {
619 SQLDataType::Array(ArrayElemTypeDef::AngleBracket(inner_sql_type)) => {
620 Ok(self.convert_data_type_to_field(inner_sql_type)?.into_list())
622 }
623 SQLDataType::Array(ArrayElemTypeDef::SquareBracket(
624 inner_sql_type,
625 maybe_array_size,
626 )) => {
627 let inner_field = self.convert_data_type_to_field(inner_sql_type)?;
628 if let Some(array_size) = maybe_array_size {
629 let array_size: i32 = (*array_size).try_into().map_err(|_| {
630 plan_datafusion_err!(
631 "Array size must be a positive 32 bit integer, got {array_size}"
632 )
633 })?;
634 Ok(inner_field.into_fixed_size_list(array_size))
635 } else {
636 Ok(inner_field.into_list())
637 }
638 }
639 SQLDataType::Array(ArrayElemTypeDef::None) => {
640 not_impl_err!("Arrays with unspecified type is not supported")
641 }
642 other => Ok(self
643 .convert_simple_data_type(other)?
644 .into_nullable_field_ref()),
645 }
646 }
647
648 fn convert_simple_data_type(&self, sql_type: &SQLDataType) -> Result<DataType> {
649 match sql_type {
650 SQLDataType::Boolean | SQLDataType::Bool => Ok(DataType::Boolean),
651 SQLDataType::TinyInt(_) => Ok(DataType::Int8),
652 SQLDataType::SmallInt(_) | SQLDataType::Int2(_) => Ok(DataType::Int16),
653 SQLDataType::Int(_) | SQLDataType::Integer(_) | SQLDataType::Int4(_) => {
654 Ok(DataType::Int32)
655 }
656 SQLDataType::BigInt(_) | SQLDataType::Int8(_) => Ok(DataType::Int64),
657 SQLDataType::TinyIntUnsigned(_) => Ok(DataType::UInt8),
658 SQLDataType::SmallIntUnsigned(_) | SQLDataType::Int2Unsigned(_) => {
659 Ok(DataType::UInt16)
660 }
661 SQLDataType::IntUnsigned(_)
662 | SQLDataType::IntegerUnsigned(_)
663 | SQLDataType::Int4Unsigned(_) => Ok(DataType::UInt32),
664 SQLDataType::Varchar(length) => {
665 match (length, self.options.support_varchar_with_length) {
666 (Some(_), false) => plan_err!(
667 "does not support Varchar with length, \
668 please set `support_varchar_with_length` to be true"
669 ),
670 _ => {
671 if self.options.map_string_types_to_utf8view {
672 Ok(DataType::Utf8View)
673 } else {
674 Ok(DataType::Utf8)
675 }
676 }
677 }
678 }
679 SQLDataType::BigIntUnsigned(_) | SQLDataType::Int8Unsigned(_) => {
680 Ok(DataType::UInt64)
681 }
682 SQLDataType::Float(_) => Ok(DataType::Float32),
683 SQLDataType::Real | SQLDataType::Float4 => Ok(DataType::Float32),
684 SQLDataType::Double(ExactNumberInfo::None)
685 | SQLDataType::DoublePrecision
686 | SQLDataType::Float8 => Ok(DataType::Float64),
687 SQLDataType::Double(
688 ExactNumberInfo::Precision(_) | ExactNumberInfo::PrecisionAndScale(_, _),
689 ) => {
690 not_impl_err!(
691 "Unsupported SQL type (precision/scale not supported) {sql_type}"
692 )
693 }
694 SQLDataType::Char(_) | SQLDataType::Text | SQLDataType::String(_) => {
695 if self.options.map_string_types_to_utf8view {
696 Ok(DataType::Utf8View)
697 } else {
698 Ok(DataType::Utf8)
699 }
700 }
701 SQLDataType::Timestamp(precision, tz_info)
702 if precision.is_none() || [0, 3, 6, 9].contains(&precision.unwrap()) =>
703 {
704 let tz = if matches!(tz_info, TimezoneInfo::Tz)
705 || matches!(tz_info, TimezoneInfo::WithTimeZone)
706 {
707 self.context_provider.options().execution.time_zone.clone()
711 } else {
712 None
714 };
715 let precision = match precision {
716 Some(0) => TimeUnit::Second,
717 Some(3) => TimeUnit::Millisecond,
718 Some(6) => TimeUnit::Microsecond,
719 None | Some(9) => TimeUnit::Nanosecond,
720 _ => unreachable!(),
721 };
722 Ok(DataType::Timestamp(precision, tz.map(Into::into)))
723 }
724 SQLDataType::Date => Ok(DataType::Date32),
725 SQLDataType::Time(None, tz_info) => {
726 if matches!(tz_info, TimezoneInfo::None)
727 || matches!(tz_info, TimezoneInfo::WithoutTimeZone)
728 {
729 Ok(DataType::Time64(TimeUnit::Nanosecond))
730 } else {
731 not_impl_err!("Unsupported SQL type {sql_type}")
733 }
734 }
735 SQLDataType::Numeric(exact_number_info)
736 | SQLDataType::Decimal(exact_number_info) => {
737 let (precision, scale) = match *exact_number_info {
738 ExactNumberInfo::None => (None, None),
739 ExactNumberInfo::Precision(precision) => (Some(precision), None),
740 ExactNumberInfo::PrecisionAndScale(precision, scale) => {
741 (Some(precision), Some(scale))
742 }
743 };
744 make_decimal_type(precision, scale.map(|s| s as u64))
745 }
746 SQLDataType::Bytea => Ok(DataType::Binary),
747 SQLDataType::Interval { fields, precision } => {
748 if fields.is_some() || precision.is_some() {
749 return not_impl_err!("Unsupported SQL type {sql_type}");
750 }
751 Ok(DataType::Interval(IntervalUnit::MonthDayNano))
752 }
753 SQLDataType::Struct(fields, _) => {
754 let fields = fields
755 .iter()
756 .enumerate()
757 .map(|(idx, sql_struct_field)| {
758 let field = self.convert_data_type_to_field(&sql_struct_field.field_type)?;
759 let field_name = match &sql_struct_field.field_name {
760 Some(ident) => ident.clone(),
761 None => Ident::new(format!("c{idx}")),
762 };
763 Ok(field.as_ref().clone().with_name(self.ident_normalizer.normalize(field_name)))
764 })
765 .collect::<Result<Vec<_>>>()?;
766 Ok(DataType::Struct(Fields::from(fields)))
767 }
768 SQLDataType::Nvarchar(_)
769 | SQLDataType::JSON
770 | SQLDataType::Uuid
771 | SQLDataType::Binary(_)
772 | SQLDataType::Varbinary(_)
773 | SQLDataType::Blob(_)
774 | SQLDataType::Datetime(_)
775 | SQLDataType::Regclass
776 | SQLDataType::Custom(_, _)
777 | SQLDataType::Array(_)
778 | SQLDataType::Enum(_, _)
779 | SQLDataType::Set(_)
780 | SQLDataType::MediumInt(_)
781 | SQLDataType::MediumIntUnsigned(_)
782 | SQLDataType::Character(_)
783 | SQLDataType::CharacterVarying(_)
784 | SQLDataType::CharVarying(_)
785 | SQLDataType::CharacterLargeObject(_)
786 | SQLDataType::CharLargeObject(_)
787 | SQLDataType::Timestamp(_, _)
788 | SQLDataType::Time(Some(_), _)
789 | SQLDataType::Dec(_)
790 | SQLDataType::BigNumeric(_)
791 | SQLDataType::BigDecimal(_)
792 | SQLDataType::Clob(_)
793 | SQLDataType::Bytes(_)
794 | SQLDataType::Int64
795 | SQLDataType::Float64
796 | SQLDataType::JSONB
797 | SQLDataType::Unspecified
798 | SQLDataType::Int16
799 | SQLDataType::Int32
800 | SQLDataType::Int128
801 | SQLDataType::Int256
802 | SQLDataType::UInt8
803 | SQLDataType::UInt16
804 | SQLDataType::UInt32
805 | SQLDataType::UInt64
806 | SQLDataType::UInt128
807 | SQLDataType::UInt256
808 | SQLDataType::Float32
809 | SQLDataType::Date32
810 | SQLDataType::Datetime64(_, _)
811 | SQLDataType::FixedString(_)
812 | SQLDataType::Map(_, _)
813 | SQLDataType::Tuple(_)
814 | SQLDataType::Nested(_)
815 | SQLDataType::Union(_)
816 | SQLDataType::Nullable(_)
817 | SQLDataType::LowCardinality(_)
818 | SQLDataType::Trigger
819 | SQLDataType::TinyBlob
820 | SQLDataType::MediumBlob
821 | SQLDataType::LongBlob
822 | SQLDataType::TinyText
823 | SQLDataType::MediumText
824 | SQLDataType::LongText
825 | SQLDataType::Bit(_)
826 | SQLDataType::BitVarying(_)
827 | SQLDataType::Signed
828 | SQLDataType::SignedInteger
829 | SQLDataType::Unsigned
830 | SQLDataType::UnsignedInteger
831 | SQLDataType::AnyType
832 | SQLDataType::Table(_)
833 | SQLDataType::VarBit(_)
834 | SQLDataType::UTinyInt
835 | SQLDataType::USmallInt
836 | SQLDataType::HugeInt
837 | SQLDataType::UHugeInt
838 | SQLDataType::UBigInt
839 | SQLDataType::TimestampNtz
840 | SQLDataType::NamedTable { .. }
841 | SQLDataType::TsVector
842 | SQLDataType::TsQuery
843 | SQLDataType::GeometricType(_)
844 | SQLDataType::DecimalUnsigned(_) | SQLDataType::FloatUnsigned(_) | SQLDataType::RealUnsigned | SQLDataType::DecUnsigned(_) | SQLDataType::DoubleUnsigned(_) | SQLDataType::DoublePrecisionUnsigned => {
851 not_impl_err!("Unsupported SQL type {sql_type}")
852 }
853 }
854 }
855
856 pub(crate) fn object_name_to_table_reference(
857 &self,
858 object_name: ObjectName,
859 ) -> Result<TableReference> {
860 object_name_to_table_reference(
861 object_name,
862 self.options.enable_ident_normalization,
863 )
864 }
865}
866
867pub fn object_name_to_table_reference(
878 object_name: ObjectName,
879 enable_normalization: bool,
880) -> Result<TableReference> {
881 let ObjectName(object_name_parts) = object_name;
883 let idents = object_name_parts
884 .into_iter()
885 .map(|object_name_part| {
886 object_name_part.as_ident().cloned().ok_or_else(|| {
887 plan_datafusion_err!(
888 "Expected identifier, but found: {:?}",
889 object_name_part
890 )
891 })
892 })
893 .collect::<Result<Vec<_>>>()?;
894 idents_to_table_reference(idents, enable_normalization)
895}
896
897struct IdentTaker {
898 normalizer: IdentNormalizer,
899 idents: Vec<Ident>,
900}
901
902impl IdentTaker {
905 fn new(idents: Vec<Ident>, enable_normalization: bool) -> Self {
906 Self {
907 normalizer: IdentNormalizer::new(enable_normalization),
908 idents,
909 }
910 }
911
912 fn take(&mut self) -> String {
913 let ident = self.idents.pop().expect("no more identifiers");
914 self.normalizer.normalize(ident)
915 }
916
917 fn len(&self) -> usize {
919 self.idents.len()
920 }
921}
922
923impl std::fmt::Display for IdentTaker {
925 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
926 let mut first = true;
927 for ident in self.idents.iter() {
928 if !first {
929 write!(f, ".")?;
930 }
931 write!(f, "{ident}")?;
932 first = false;
933 }
934
935 Ok(())
936 }
937}
938
939pub(crate) fn idents_to_table_reference(
941 idents: Vec<Ident>,
942 enable_normalization: bool,
943) -> Result<TableReference> {
944 let mut taker = IdentTaker::new(idents, enable_normalization);
945
946 match taker.len() {
947 1 => {
948 let table = taker.take();
949 Ok(TableReference::bare(table))
950 }
951 2 => {
952 let table = taker.take();
953 let schema = taker.take();
954 Ok(TableReference::partial(schema, table))
955 }
956 3 => {
957 let table = taker.take();
958 let schema = taker.take();
959 let catalog = taker.take();
960 Ok(TableReference::full(catalog, schema, table))
961 }
962 _ => plan_err!(
963 "Unsupported compound identifier '{}'. Expected 1, 2 or 3 parts, got {}",
964 taker,
965 taker.len()
966 ),
967 }
968}
969
970pub fn object_name_to_qualifier(
973 sql_table_name: &ObjectName,
974 enable_normalization: bool,
975) -> Result<String> {
976 let columns = vec!["table_name", "table_schema", "table_catalog"].into_iter();
977 let normalizer = IdentNormalizer::new(enable_normalization);
978 sql_table_name
979 .0
980 .iter()
981 .rev()
982 .zip(columns)
983 .map(|(object_name_part, column_name)| {
984 object_name_part
985 .as_ident()
986 .map(|ident| {
987 format!(
988 r#"{} = '{}'"#,
989 column_name,
990 normalizer.normalize(ident.clone())
991 )
992 })
993 .ok_or_else(|| {
994 plan_datafusion_err!(
995 "Expected identifier, but found: {:?}",
996 object_name_part
997 )
998 })
999 })
1000 .collect::<Result<Vec<_>>>()
1001 .map(|parts| parts.join(" AND "))
1002}