datafusion_common/
dfschema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! DFSchema is an extended schema struct that DataFusion uses to provide support for
19//! fields with optional relation names.
20
21use std::collections::{BTreeSet, HashMap, HashSet};
22use std::fmt::{Display, Formatter};
23use std::hash::Hash;
24use std::sync::Arc;
25
26use crate::error::{DataFusionError, Result, _plan_err, _schema_err};
27use crate::{
28    field_not_found, unqualified_field_not_found, Column, FunctionalDependencies,
29    SchemaError, TableReference,
30};
31
32use arrow::compute::can_cast_types;
33use arrow::datatypes::{
34    DataType, Field, FieldRef, Fields, Schema, SchemaBuilder, SchemaRef,
35};
36
37/// A reference-counted reference to a [DFSchema].
38pub type DFSchemaRef = Arc<DFSchema>;
39
40/// DFSchema wraps an Arrow schema and adds relation names.
41///
42/// The schema may hold the fields across multiple tables. Some fields may be
43/// qualified and some unqualified. A qualified field is a field that has a
44/// relation name associated with it.
45///
46/// Unqualified fields must be unique not only amongst themselves, but also must
47/// have a distinct name from any qualified field names. This allows finding a
48/// qualified field by name to be possible, so long as there aren't multiple
49/// qualified fields with the same name.
50///
51/// There is an alias to `Arc<DFSchema>` named [DFSchemaRef].
52///
53/// # Creating qualified schemas
54///
55/// Use [DFSchema::try_from_qualified_schema] to create a qualified schema from
56/// an Arrow schema.
57///
58/// ```rust
59/// use arrow::datatypes::{DataType, Field, Schema};
60/// use datafusion_common::{Column, DFSchema};
61///
62/// let arrow_schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
63///
64/// let df_schema = DFSchema::try_from_qualified_schema("t1", &arrow_schema).unwrap();
65/// let column = Column::from_qualified_name("t1.c1");
66/// assert!(df_schema.has_column(&column));
67///
68/// // Can also access qualified fields with unqualified name, if it's unambiguous
69/// let column = Column::from_qualified_name("c1");
70/// assert!(df_schema.has_column(&column));
71/// ```
72///
73/// # Creating unqualified schemas
74///
75/// Create an unqualified schema using TryFrom:
76///
77/// ```rust
78/// use arrow::datatypes::{DataType, Field, Schema};
79/// use datafusion_common::{Column, DFSchema};
80///
81/// let arrow_schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
82///
83/// let df_schema = DFSchema::try_from(arrow_schema).unwrap();
84/// let column = Column::new_unqualified("c1");
85/// assert!(df_schema.has_column(&column));
86/// ```
87///
88/// # Converting back to Arrow schema
89///
90/// Use the `Into` trait to convert `DFSchema` into an Arrow schema:
91///
92/// ```rust
93/// use arrow::datatypes::{Field, Schema};
94/// use datafusion_common::DFSchema;
95/// use std::collections::HashMap;
96///
97/// let df_schema = DFSchema::from_unqualified_fields(
98///     vec![Field::new("c1", arrow::datatypes::DataType::Int32, false)].into(),
99///     HashMap::new(),
100/// )
101/// .unwrap();
102/// let schema: &Schema = df_schema.as_arrow();
103/// assert_eq!(schema.fields().len(), 1);
104/// ```
105#[derive(Debug, Clone, PartialEq, Eq)]
106pub struct DFSchema {
107    /// Inner Arrow schema reference.
108    inner: SchemaRef,
109    /// Optional qualifiers for each column in this schema. In the same order as
110    /// the `self.inner.fields()`
111    field_qualifiers: Vec<Option<TableReference>>,
112    /// Stores functional dependencies in the schema.
113    functional_dependencies: FunctionalDependencies,
114}
115
116impl DFSchema {
117    /// Creates an empty `DFSchema`
118    pub fn empty() -> Self {
119        Self {
120            inner: Arc::new(Schema::new([])),
121            field_qualifiers: vec![],
122            functional_dependencies: FunctionalDependencies::empty(),
123        }
124    }
125
126    /// Return a reference to the inner Arrow [`Schema`]
127    ///
128    /// Note this does not have the qualifier information
129    pub fn as_arrow(&self) -> &Schema {
130        self.inner.as_ref()
131    }
132
133    /// Return a reference to the inner Arrow [`SchemaRef`]
134    ///
135    /// Note this does not have the qualifier information
136    pub fn inner(&self) -> &SchemaRef {
137        &self.inner
138    }
139
140    /// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier
141    pub fn new_with_metadata(
142        qualified_fields: Vec<(Option<TableReference>, Arc<Field>)>,
143        metadata: HashMap<String, String>,
144    ) -> Result<Self> {
145        let (qualifiers, fields): (Vec<Option<TableReference>>, Vec<Arc<Field>>) =
146            qualified_fields.into_iter().unzip();
147
148        let schema = Arc::new(Schema::new_with_metadata(fields, metadata));
149
150        let dfschema = Self {
151            inner: schema,
152            field_qualifiers: qualifiers,
153            functional_dependencies: FunctionalDependencies::empty(),
154        };
155        dfschema.check_names()?;
156        Ok(dfschema)
157    }
158
159    /// Create a new `DFSchema` from a list of Arrow [Field]s
160    pub fn from_unqualified_fields(
161        fields: Fields,
162        metadata: HashMap<String, String>,
163    ) -> Result<Self> {
164        let field_count = fields.len();
165        let schema = Arc::new(Schema::new_with_metadata(fields, metadata));
166        let dfschema = Self {
167            inner: schema,
168            field_qualifiers: vec![None; field_count],
169            functional_dependencies: FunctionalDependencies::empty(),
170        };
171        dfschema.check_names()?;
172        Ok(dfschema)
173    }
174
175    /// Create a `DFSchema` from an Arrow schema and a given qualifier
176    ///
177    /// To create a schema from an Arrow schema without a qualifier, use
178    /// `DFSchema::try_from`.
179    pub fn try_from_qualified_schema(
180        qualifier: impl Into<TableReference>,
181        schema: &Schema,
182    ) -> Result<Self> {
183        let qualifier = qualifier.into();
184        let schema = DFSchema {
185            inner: schema.clone().into(),
186            field_qualifiers: vec![Some(qualifier); schema.fields.len()],
187            functional_dependencies: FunctionalDependencies::empty(),
188        };
189        schema.check_names()?;
190        Ok(schema)
191    }
192
193    /// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier
194    pub fn from_field_specific_qualified_schema(
195        qualifiers: Vec<Option<TableReference>>,
196        schema: &SchemaRef,
197    ) -> Result<Self> {
198        let dfschema = Self {
199            inner: Arc::clone(schema),
200            field_qualifiers: qualifiers,
201            functional_dependencies: FunctionalDependencies::empty(),
202        };
203        dfschema.check_names()?;
204        Ok(dfschema)
205    }
206
207    /// Return the same schema, where all fields have a given qualifier.
208    pub fn with_field_specific_qualified_schema(
209        &self,
210        qualifiers: Vec<Option<TableReference>>,
211    ) -> Result<Self> {
212        if qualifiers.len() != self.fields().len() {
213            return _plan_err!(
214                "Number of qualifiers must match number of fields. Expected {}, got {}",
215                self.fields().len(),
216                qualifiers.len()
217            );
218        }
219        Ok(DFSchema {
220            inner: Arc::clone(&self.inner),
221            field_qualifiers: qualifiers,
222            functional_dependencies: self.functional_dependencies.clone(),
223        })
224    }
225
226    /// Check if the schema have some fields with the same name
227    pub fn check_names(&self) -> Result<()> {
228        let mut qualified_names = BTreeSet::new();
229        let mut unqualified_names = BTreeSet::new();
230
231        for (field, qualifier) in self.inner.fields().iter().zip(&self.field_qualifiers) {
232            if let Some(qualifier) = qualifier {
233                if !qualified_names.insert((qualifier, field.name())) {
234                    return _schema_err!(SchemaError::DuplicateQualifiedField {
235                        qualifier: Box::new(qualifier.clone()),
236                        name: field.name().to_string(),
237                    });
238                }
239            } else if !unqualified_names.insert(field.name()) {
240                return _schema_err!(SchemaError::DuplicateUnqualifiedField {
241                    name: field.name().to_string()
242                });
243            }
244        }
245
246        for (qualifier, name) in qualified_names {
247            if unqualified_names.contains(name) {
248                return _schema_err!(SchemaError::AmbiguousReference {
249                    field: Box::new(Column::new(Some(qualifier.clone()), name))
250                });
251            }
252        }
253        Ok(())
254    }
255
256    /// Assigns functional dependencies.
257    pub fn with_functional_dependencies(
258        mut self,
259        functional_dependencies: FunctionalDependencies,
260    ) -> Result<Self> {
261        if functional_dependencies.is_valid(self.inner.fields.len()) {
262            self.functional_dependencies = functional_dependencies;
263            Ok(self)
264        } else {
265            _plan_err!(
266                "Invalid functional dependency: {:?}",
267                functional_dependencies
268            )
269        }
270    }
271
272    /// Create a new schema that contains the fields from this schema followed by the fields
273    /// from the supplied schema. An error will be returned if there are duplicate field names.
274    pub fn join(&self, schema: &DFSchema) -> Result<Self> {
275        let mut schema_builder = SchemaBuilder::new();
276        schema_builder.extend(self.inner.fields().iter().cloned());
277        schema_builder.extend(schema.fields().iter().cloned());
278        let new_schema = schema_builder.finish();
279
280        let mut new_metadata = self.inner.metadata.clone();
281        new_metadata.extend(schema.inner.metadata.clone());
282        let new_schema_with_metadata = new_schema.with_metadata(new_metadata);
283
284        let mut new_qualifiers = self.field_qualifiers.clone();
285        new_qualifiers.extend_from_slice(schema.field_qualifiers.as_slice());
286
287        let new_self = Self {
288            inner: Arc::new(new_schema_with_metadata),
289            field_qualifiers: new_qualifiers,
290            functional_dependencies: FunctionalDependencies::empty(),
291        };
292        new_self.check_names()?;
293        Ok(new_self)
294    }
295
296    /// Modify this schema by appending the fields from the supplied schema, ignoring any
297    /// duplicate fields.
298    ///
299    /// ## Merge Precedence
300    ///
301    /// **Schema-level metadata**: Metadata from both schemas is merged.
302    /// If both schemas have the same metadata key, the value from the `other_schema` parameter takes precedence.
303    ///
304    /// **Field-level merging**: Only non-duplicate fields are added. This means that the
305    /// `self` fields will always take precedence over the `other_schema` fields.
306    /// Duplicate field detection is based on:
307    /// - For qualified fields: both qualifier and field name must match
308    /// - For unqualified fields: only field name needs to match
309    ///
310    /// Take note how the precedence for fields & metadata merging differs;
311    /// merging prefers fields from `self` but prefers metadata from `other_schema`.
312    pub fn merge(&mut self, other_schema: &DFSchema) {
313        if other_schema.inner.fields.is_empty() {
314            return;
315        }
316
317        let self_fields: HashSet<(Option<&TableReference>, &str)> = self
318            .iter()
319            .map(|(qualifier, field)| (qualifier, field.name().as_str()))
320            .collect();
321        let self_unqualified_names: HashSet<&str> = self
322            .inner
323            .fields
324            .iter()
325            .map(|field| field.name().as_str())
326            .collect();
327
328        let mut schema_builder = SchemaBuilder::from(self.inner.fields.clone());
329        let mut qualifiers = Vec::new();
330        for (qualifier, field) in other_schema.iter() {
331            // skip duplicate columns
332            let duplicated_field = match qualifier {
333                Some(q) => {
334                    self_fields.contains(&(Some(q), field.name().as_str()))
335                        || self_fields.contains(&(None, field.name().as_str()))
336                }
337                // for unqualified columns, check as unqualified name
338                None => self_unqualified_names.contains(field.name().as_str()),
339            };
340            if !duplicated_field {
341                schema_builder.push(Arc::clone(field));
342                qualifiers.push(qualifier.cloned());
343            }
344        }
345        let mut metadata = self.inner.metadata.clone();
346        metadata.extend(other_schema.inner.metadata.clone());
347
348        let finished = schema_builder.finish();
349        let finished_with_metadata = finished.with_metadata(metadata);
350        self.inner = finished_with_metadata.into();
351        self.field_qualifiers.extend(qualifiers);
352    }
353
354    /// Get a list of fields
355    pub fn fields(&self) -> &Fields {
356        &self.inner.fields
357    }
358
359    /// Returns an immutable reference of a specific `Field` instance selected using an
360    /// offset within the internal `fields` vector
361    pub fn field(&self, i: usize) -> &Field {
362        &self.inner.fields[i]
363    }
364
365    /// Returns an immutable reference of a specific `Field` instance selected using an
366    /// offset within the internal `fields` vector and its qualifier
367    pub fn qualified_field(&self, i: usize) -> (Option<&TableReference>, &Field) {
368        (self.field_qualifiers[i].as_ref(), self.field(i))
369    }
370
371    pub fn index_of_column_by_name(
372        &self,
373        qualifier: Option<&TableReference>,
374        name: &str,
375    ) -> Option<usize> {
376        let mut matches = self
377            .iter()
378            .enumerate()
379            .filter(|(_, (q, f))| match (qualifier, q) {
380                // field to lookup is qualified.
381                // current field is qualified and not shared between relations, compare both
382                // qualifier and name.
383                (Some(q), Some(field_q)) => q.resolved_eq(field_q) && f.name() == name,
384                // field to lookup is qualified but current field is unqualified.
385                (Some(_), None) => false,
386                // field to lookup is unqualified, no need to compare qualifier
387                (None, Some(_)) | (None, None) => f.name() == name,
388            })
389            .map(|(idx, _)| idx);
390        matches.next()
391    }
392
393    /// Find the index of the column with the given qualifier and name,
394    /// returning `None` if not found
395    ///
396    /// See [Self::index_of_column] for a version that returns an error if the
397    /// column is not found
398    pub fn maybe_index_of_column(&self, col: &Column) -> Option<usize> {
399        self.index_of_column_by_name(col.relation.as_ref(), &col.name)
400    }
401
402    /// Find the index of the column with the given qualifier and name,
403    /// returning `Err` if not found
404    ///
405    /// See [Self::maybe_index_of_column] for a version that returns `None` if
406    /// the column is not found
407    pub fn index_of_column(&self, col: &Column) -> Result<usize> {
408        self.maybe_index_of_column(col)
409            .ok_or_else(|| field_not_found(col.relation.clone(), &col.name, self))
410    }
411
412    /// Check if the column is in the current schema
413    pub fn is_column_from_schema(&self, col: &Column) -> bool {
414        self.index_of_column_by_name(col.relation.as_ref(), &col.name)
415            .is_some()
416    }
417
418    /// Find the field with the given name
419    pub fn field_with_name(
420        &self,
421        qualifier: Option<&TableReference>,
422        name: &str,
423    ) -> Result<&Field> {
424        if let Some(qualifier) = qualifier {
425            self.field_with_qualified_name(qualifier, name)
426        } else {
427            self.field_with_unqualified_name(name)
428        }
429    }
430
431    /// Find the qualified field with the given name
432    pub fn qualified_field_with_name(
433        &self,
434        qualifier: Option<&TableReference>,
435        name: &str,
436    ) -> Result<(Option<&TableReference>, &Field)> {
437        if let Some(qualifier) = qualifier {
438            let idx = self
439                .index_of_column_by_name(Some(qualifier), name)
440                .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
441            Ok((self.field_qualifiers[idx].as_ref(), self.field(idx)))
442        } else {
443            self.qualified_field_with_unqualified_name(name)
444        }
445    }
446
447    /// Find all fields having the given qualifier
448    pub fn fields_with_qualified(&self, qualifier: &TableReference) -> Vec<&Field> {
449        self.iter()
450            .filter(|(q, _)| q.map(|q| q.eq(qualifier)).unwrap_or(false))
451            .map(|(_, f)| f.as_ref())
452            .collect()
453    }
454
455    /// Find all fields indices having the given qualifier
456    pub fn fields_indices_with_qualified(
457        &self,
458        qualifier: &TableReference,
459    ) -> Vec<usize> {
460        self.iter()
461            .enumerate()
462            .filter_map(|(idx, (q, _))| q.and_then(|q| q.eq(qualifier).then_some(idx)))
463            .collect()
464    }
465
466    /// Find all fields that match the given name
467    pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&Field> {
468        self.fields()
469            .iter()
470            .filter(|field| field.name() == name)
471            .map(|f| f.as_ref())
472            .collect()
473    }
474
475    /// Find all fields that match the given name and return them with their qualifier
476    pub fn qualified_fields_with_unqualified_name(
477        &self,
478        name: &str,
479    ) -> Vec<(Option<&TableReference>, &Field)> {
480        self.iter()
481            .filter(|(_, field)| field.name() == name)
482            .map(|(qualifier, field)| (qualifier, field.as_ref()))
483            .collect()
484    }
485
486    /// Find all fields that match the given name and convert to column
487    pub fn columns_with_unqualified_name(&self, name: &str) -> Vec<Column> {
488        self.iter()
489            .filter(|(_, field)| field.name() == name)
490            .map(|(qualifier, field)| Column::new(qualifier.cloned(), field.name()))
491            .collect()
492    }
493
494    /// Return all `Column`s for the schema
495    pub fn columns(&self) -> Vec<Column> {
496        self.iter()
497            .map(|(qualifier, field)| {
498                Column::new(qualifier.cloned(), field.name().clone())
499            })
500            .collect()
501    }
502
503    /// Find the qualified field with the given unqualified name
504    pub fn qualified_field_with_unqualified_name(
505        &self,
506        name: &str,
507    ) -> Result<(Option<&TableReference>, &Field)> {
508        let matches = self.qualified_fields_with_unqualified_name(name);
509        match matches.len() {
510            0 => Err(unqualified_field_not_found(name, self)),
511            1 => Ok((matches[0].0, matches[0].1)),
512            _ => {
513                // When `matches` size > 1, it doesn't necessarily mean an `ambiguous name` problem.
514                // Because name may generate from Alias/... . It means that it don't own qualifier.
515                // For example:
516                //             Join on id = b.id
517                // Project a.id as id   TableScan b id
518                // In this case, there isn't `ambiguous name` problem. When `matches` just contains
519                // one field without qualifier, we should return it.
520                let fields_without_qualifier = matches
521                    .iter()
522                    .filter(|(q, _)| q.is_none())
523                    .collect::<Vec<_>>();
524                if fields_without_qualifier.len() == 1 {
525                    Ok((fields_without_qualifier[0].0, fields_without_qualifier[0].1))
526                } else {
527                    _schema_err!(SchemaError::AmbiguousReference {
528                        field: Box::new(Column::new_unqualified(name.to_string()))
529                    })
530                }
531            }
532        }
533    }
534
535    /// Find the field with the given name
536    pub fn field_with_unqualified_name(&self, name: &str) -> Result<&Field> {
537        self.qualified_field_with_unqualified_name(name)
538            .map(|(_, field)| field)
539    }
540
541    /// Find the field with the given qualified name
542    pub fn field_with_qualified_name(
543        &self,
544        qualifier: &TableReference,
545        name: &str,
546    ) -> Result<&Field> {
547        let idx = self
548            .index_of_column_by_name(Some(qualifier), name)
549            .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
550
551        Ok(self.field(idx))
552    }
553
554    /// Find the field with the given qualified column
555    pub fn qualified_field_from_column(
556        &self,
557        column: &Column,
558    ) -> Result<(Option<&TableReference>, &Field)> {
559        self.qualified_field_with_name(column.relation.as_ref(), &column.name)
560    }
561
562    /// Find if the field exists with the given name
563    pub fn has_column_with_unqualified_name(&self, name: &str) -> bool {
564        self.fields().iter().any(|field| field.name() == name)
565    }
566
567    /// Find if the field exists with the given qualified name
568    pub fn has_column_with_qualified_name(
569        &self,
570        qualifier: &TableReference,
571        name: &str,
572    ) -> bool {
573        self.iter()
574            .any(|(q, f)| q.map(|q| q.eq(qualifier)).unwrap_or(false) && f.name() == name)
575    }
576
577    /// Find if the field exists with the given qualified column
578    pub fn has_column(&self, column: &Column) -> bool {
579        match &column.relation {
580            Some(r) => self.has_column_with_qualified_name(r, &column.name),
581            None => self.has_column_with_unqualified_name(&column.name),
582        }
583    }
584
585    /// Check to see if unqualified field names matches field names in Arrow schema
586    pub fn matches_arrow_schema(&self, arrow_schema: &Schema) -> bool {
587        self.inner
588            .fields
589            .iter()
590            .zip(arrow_schema.fields().iter())
591            .all(|(dffield, arrowfield)| dffield.name() == arrowfield.name())
592    }
593
594    /// Check to see if fields in 2 Arrow schemas are compatible
595    #[deprecated(since = "47.0.0", note = "This method is no longer used")]
596    pub fn check_arrow_schema_type_compatible(
597        &self,
598        arrow_schema: &Schema,
599    ) -> Result<()> {
600        let self_arrow_schema = self.as_arrow();
601        self_arrow_schema
602            .fields()
603            .iter()
604            .zip(arrow_schema.fields().iter())
605            .try_for_each(|(l_field, r_field)| {
606                if !can_cast_types(r_field.data_type(), l_field.data_type()) {
607                    _plan_err!("Column {} (type: {}) is not compatible with column {} (type: {})",
608                                r_field.name(),
609                                r_field.data_type(),
610                                l_field.name(),
611                                l_field.data_type())
612                } else {
613                    Ok(())
614                }
615            })
616    }
617
618    /// Returns true if the two schemas have the same qualified named
619    /// fields with logically equivalent data types. Returns false otherwise.
620    ///
621    /// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
622    /// equivalence checking.
623    pub fn logically_equivalent_names_and_types(&self, other: &Self) -> bool {
624        if self.fields().len() != other.fields().len() {
625            return false;
626        }
627        let self_fields = self.iter();
628        let other_fields = other.iter();
629        self_fields.zip(other_fields).all(|((q1, f1), (q2, f2))| {
630            q1 == q2
631                && f1.name() == f2.name()
632                && Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
633        })
634    }
635
636    #[deprecated(since = "47.0.0", note = "Use has_equivalent_names_and_types` instead")]
637    pub fn equivalent_names_and_types(&self, other: &Self) -> bool {
638        self.has_equivalent_names_and_types(other).is_ok()
639    }
640
641    /// Returns Ok if the two schemas have the same qualified named
642    /// fields with the compatible data types.
643    ///
644    /// Returns an `Err` with a message otherwise.
645    ///
646    /// This is a specialized version of Eq that ignores differences in
647    /// nullability and metadata.
648    ///
649    /// Use [DFSchema]::logically_equivalent_names_and_types for a weaker
650    /// logical type checking, which for example would consider a dictionary
651    /// encoded UTF8 array to be equivalent to a plain UTF8 array.
652    pub fn has_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
653        // case 1 : schema length mismatch
654        if self.fields().len() != other.fields().len() {
655            _plan_err!(
656                "Schema mismatch: the schema length are not same \
657            Expected schema length: {}, got: {}",
658                self.fields().len(),
659                other.fields().len()
660            )
661        } else {
662            // case 2 : schema length match, but fields mismatch
663            // check if the fields name are the same and have the same data types
664            self.fields()
665                .iter()
666                .zip(other.fields().iter())
667                .try_for_each(|(f1, f2)| {
668                    if f1.name() != f2.name()
669                        || (!DFSchema::datatype_is_semantically_equal(
670                            f1.data_type(),
671                            f2.data_type(),
672                        ))
673                    {
674                        _plan_err!(
675                            "Schema mismatch: Expected field '{}' with type {}, \
676                            but got '{}' with type {}.",
677                            f1.name(),
678                            f1.data_type(),
679                            f2.name(),
680                            f2.data_type()
681                        )
682                    } else {
683                        Ok(())
684                    }
685                })
686        }
687    }
688
689    /// Checks if two [`DataType`]s are logically equal. This is a notably weaker constraint
690    /// than datatype_is_semantically_equal in that different representations of same data can be
691    /// logically but not semantically equivalent. Semantically equivalent types are always also
692    /// logically equivalent. For example:
693    /// - a Dictionary<K,V> type is logically equal to a plain V type
694    /// - a Dictionary<K1, V1> is also logically equal to Dictionary<K2, V1>
695    /// - Utf8 and Utf8View are logically equal
696    pub fn datatype_is_logically_equal(dt1: &DataType, dt2: &DataType) -> bool {
697        // check nested fields
698        match (dt1, dt2) {
699            (DataType::Dictionary(_, v1), DataType::Dictionary(_, v2)) => {
700                v1.as_ref() == v2.as_ref()
701            }
702            (DataType::Dictionary(_, v1), othertype) => v1.as_ref() == othertype,
703            (othertype, DataType::Dictionary(_, v1)) => v1.as_ref() == othertype,
704            (DataType::List(f1), DataType::List(f2))
705            | (DataType::LargeList(f1), DataType::LargeList(f2))
706            | (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _)) => {
707                // Don't compare the names of the technical inner field
708                // Usually "item" but that's not mandated
709                Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
710            }
711            (DataType::Map(f1, _), DataType::Map(f2, _)) => {
712                // Don't compare the names of the technical inner fields
713                // Usually "entries", "key", "value" but that's not mandated
714                match (f1.data_type(), f2.data_type()) {
715                    (DataType::Struct(f1_inner), DataType::Struct(f2_inner)) => {
716                        f1_inner.len() == f2_inner.len()
717                            && f1_inner.iter().zip(f2_inner.iter()).all(|(f1, f2)| {
718                                Self::datatype_is_logically_equal(
719                                    f1.data_type(),
720                                    f2.data_type(),
721                                )
722                            })
723                    }
724                    _ => panic!("Map type should have an inner struct field"),
725                }
726            }
727            (DataType::Struct(fields1), DataType::Struct(fields2)) => {
728                let iter1 = fields1.iter();
729                let iter2 = fields2.iter();
730                fields1.len() == fields2.len() &&
731                        // all fields have to be the same
732                    iter1
733                    .zip(iter2)
734                        .all(|(f1, f2)| Self::field_is_logically_equal(f1, f2))
735            }
736            (DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
737                let iter1 = fields1.iter();
738                let iter2 = fields2.iter();
739                fields1.len() == fields2.len() &&
740                    // all fields have to be the same
741                    iter1
742                        .zip(iter2)
743                        .all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_logically_equal(f1, f2))
744            }
745            // Utf8 and Utf8View are logically equivalent
746            (DataType::Utf8, DataType::Utf8View) => true,
747            (DataType::Utf8View, DataType::Utf8) => true,
748            _ => Self::datatype_is_semantically_equal(dt1, dt2),
749        }
750    }
751
752    /// Returns true of two [`DataType`]s are semantically equal (same
753    /// name and type), ignoring both metadata and nullability, decimal precision/scale,
754    /// and timezone time units/timezones.
755    ///
756    /// request to upstream: <https://github.com/apache/arrow-rs/issues/3199>
757    pub fn datatype_is_semantically_equal(dt1: &DataType, dt2: &DataType) -> bool {
758        // check nested fields
759        match (dt1, dt2) {
760            (DataType::Dictionary(k1, v1), DataType::Dictionary(k2, v2)) => {
761                Self::datatype_is_semantically_equal(k1.as_ref(), k2.as_ref())
762                    && Self::datatype_is_semantically_equal(v1.as_ref(), v2.as_ref())
763            }
764            (DataType::List(f1), DataType::List(f2))
765            | (DataType::LargeList(f1), DataType::LargeList(f2))
766            | (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _)) => {
767                // Don't compare the names of the technical inner field
768                // Usually "item" but that's not mandated
769                Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
770            }
771            (DataType::Map(f1, _), DataType::Map(f2, _)) => {
772                // Don't compare the names of the technical inner fields
773                // Usually "entries", "key", "value" but that's not mandated
774                match (f1.data_type(), f2.data_type()) {
775                    (DataType::Struct(f1_inner), DataType::Struct(f2_inner)) => {
776                        f1_inner.len() == f2_inner.len()
777                            && f1_inner.iter().zip(f2_inner.iter()).all(|(f1, f2)| {
778                                Self::datatype_is_semantically_equal(
779                                    f1.data_type(),
780                                    f2.data_type(),
781                                )
782                            })
783                    }
784                    _ => panic!("Map type should have an inner struct field"),
785                }
786            }
787            (DataType::Struct(fields1), DataType::Struct(fields2)) => {
788                let iter1 = fields1.iter();
789                let iter2 = fields2.iter();
790                fields1.len() == fields2.len() &&
791                        // all fields have to be the same
792                    iter1
793                    .zip(iter2)
794                        .all(|(f1, f2)| Self::field_is_semantically_equal(f1, f2))
795            }
796            (DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
797                let iter1 = fields1.iter();
798                let iter2 = fields2.iter();
799                fields1.len() == fields2.len() &&
800                    // all fields have to be the same
801                    iter1
802                        .zip(iter2)
803                        .all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_semantically_equal(f1, f2))
804            }
805            (
806                DataType::Decimal32(_l_precision, _l_scale),
807                DataType::Decimal32(_r_precision, _r_scale),
808            ) => true,
809            (
810                DataType::Decimal64(_l_precision, _l_scale),
811                DataType::Decimal64(_r_precision, _r_scale),
812            ) => true,
813            (
814                DataType::Decimal128(_l_precision, _l_scale),
815                DataType::Decimal128(_r_precision, _r_scale),
816            ) => true,
817            (
818                DataType::Decimal256(_l_precision, _l_scale),
819                DataType::Decimal256(_r_precision, _r_scale),
820            ) => true,
821            (
822                DataType::Timestamp(_l_time_unit, _l_timezone),
823                DataType::Timestamp(_r_time_unit, _r_timezone),
824            ) => true,
825            _ => dt1 == dt2,
826        }
827    }
828
829    fn field_is_logically_equal(f1: &Field, f2: &Field) -> bool {
830        f1.name() == f2.name()
831            && Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
832    }
833
834    fn field_is_semantically_equal(f1: &Field, f2: &Field) -> bool {
835        f1.name() == f2.name()
836            && Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
837    }
838
839    /// Strip all field qualifier in schema
840    pub fn strip_qualifiers(self) -> Self {
841        DFSchema {
842            field_qualifiers: vec![None; self.inner.fields.len()],
843            inner: self.inner,
844            functional_dependencies: self.functional_dependencies,
845        }
846    }
847
848    /// Replace all field qualifier with new value in schema
849    pub fn replace_qualifier(self, qualifier: impl Into<TableReference>) -> Self {
850        let qualifier = qualifier.into();
851        DFSchema {
852            field_qualifiers: vec![Some(qualifier); self.inner.fields.len()],
853            inner: self.inner,
854            functional_dependencies: self.functional_dependencies,
855        }
856    }
857
858    /// Get list of fully-qualified field names in this schema
859    pub fn field_names(&self) -> Vec<String> {
860        self.iter()
861            .map(|(qualifier, field)| qualified_name(qualifier, field.name()))
862            .collect::<Vec<_>>()
863    }
864
865    /// Get metadata of this schema
866    pub fn metadata(&self) -> &HashMap<String, String> {
867        &self.inner.metadata
868    }
869
870    /// Get functional dependencies
871    pub fn functional_dependencies(&self) -> &FunctionalDependencies {
872        &self.functional_dependencies
873    }
874
875    /// Get functional dependencies
876    pub fn field_qualifiers(&self) -> &[Option<TableReference>] {
877        &self.field_qualifiers
878    }
879
880    /// Iterate over the qualifiers and fields in the DFSchema
881    pub fn iter(&self) -> impl Iterator<Item = (Option<&TableReference>, &FieldRef)> {
882        self.field_qualifiers
883            .iter()
884            .zip(self.inner.fields().iter())
885            .map(|(qualifier, field)| (qualifier.as_ref(), field))
886    }
887    /// Returns a tree-like string representation of the schema.
888    ///
889    /// This method formats the schema
890    /// with a tree-like structure showing field names, types, and nullability.
891    ///
892    /// # Example
893    ///
894    /// ```
895    /// use arrow::datatypes::{DataType, Field, Schema};
896    /// use datafusion_common::DFSchema;
897    /// use std::collections::HashMap;
898    ///
899    /// let schema = DFSchema::from_unqualified_fields(
900    ///     vec![
901    ///         Field::new("id", DataType::Int32, false),
902    ///         Field::new("name", DataType::Utf8, true),
903    ///     ]
904    ///     .into(),
905    ///     HashMap::new(),
906    /// )
907    /// .unwrap();
908    ///
909    /// assert_eq!(
910    ///     schema.tree_string().to_string(),
911    ///     r#"root
912    ///  |-- id: int32 (nullable = false)
913    ///  |-- name: utf8 (nullable = true)"#
914    /// );
915    /// ```
916    pub fn tree_string(&self) -> impl Display + '_ {
917        let mut result = String::from("root\n");
918
919        for (qualifier, field) in self.iter() {
920            let field_name = match qualifier {
921                Some(q) => format!("{}.{}", q, field.name()),
922                None => field.name().to_string(),
923            };
924
925            format_field_with_indent(
926                &mut result,
927                &field_name,
928                field.data_type(),
929                field.is_nullable(),
930                " ",
931            );
932        }
933
934        // Remove the trailing newline
935        if result.ends_with('\n') {
936            result.pop();
937        }
938
939        result
940    }
941}
942
943/// Format field with proper nested indentation for complex types
944fn format_field_with_indent(
945    result: &mut String,
946    field_name: &str,
947    data_type: &DataType,
948    nullable: bool,
949    indent: &str,
950) {
951    let nullable_str = nullable.to_string().to_lowercase();
952    let child_indent = format!("{indent}|    ");
953
954    match data_type {
955        DataType::List(field) => {
956            result.push_str(&format!(
957                "{indent}|-- {field_name}: list (nullable = {nullable_str})\n"
958            ));
959            format_field_with_indent(
960                result,
961                field.name(),
962                field.data_type(),
963                field.is_nullable(),
964                &child_indent,
965            );
966        }
967        DataType::LargeList(field) => {
968            result.push_str(&format!(
969                "{indent}|-- {field_name}: large list (nullable = {nullable_str})\n"
970            ));
971            format_field_with_indent(
972                result,
973                field.name(),
974                field.data_type(),
975                field.is_nullable(),
976                &child_indent,
977            );
978        }
979        DataType::FixedSizeList(field, _size) => {
980            result.push_str(&format!(
981                "{indent}|-- {field_name}: fixed size list (nullable = {nullable_str})\n"
982            ));
983            format_field_with_indent(
984                result,
985                field.name(),
986                field.data_type(),
987                field.is_nullable(),
988                &child_indent,
989            );
990        }
991        DataType::Map(field, _) => {
992            result.push_str(&format!(
993                "{indent}|-- {field_name}: map (nullable = {nullable_str})\n"
994            ));
995            if let DataType::Struct(inner_fields) = field.data_type() {
996                if inner_fields.len() == 2 {
997                    format_field_with_indent(
998                        result,
999                        "key",
1000                        inner_fields[0].data_type(),
1001                        inner_fields[0].is_nullable(),
1002                        &child_indent,
1003                    );
1004                    let value_contains_null =
1005                        field.is_nullable().to_string().to_lowercase();
1006                    // Handle complex value types properly
1007                    match inner_fields[1].data_type() {
1008                        DataType::Struct(_)
1009                        | DataType::List(_)
1010                        | DataType::LargeList(_)
1011                        | DataType::FixedSizeList(_, _)
1012                        | DataType::Map(_, _) => {
1013                            format_field_with_indent(
1014                                result,
1015                                "value",
1016                                inner_fields[1].data_type(),
1017                                inner_fields[1].is_nullable(),
1018                                &child_indent,
1019                            );
1020                        }
1021                        _ => {
1022                            result.push_str(&format!("{child_indent}|-- value: {} (nullable = {value_contains_null})\n",
1023                                format_simple_data_type(inner_fields[1].data_type())));
1024                        }
1025                    }
1026                }
1027            }
1028        }
1029        DataType::Struct(fields) => {
1030            result.push_str(&format!(
1031                "{indent}|-- {field_name}: struct (nullable = {nullable_str})\n"
1032            ));
1033            for struct_field in fields {
1034                format_field_with_indent(
1035                    result,
1036                    struct_field.name(),
1037                    struct_field.data_type(),
1038                    struct_field.is_nullable(),
1039                    &child_indent,
1040                );
1041            }
1042        }
1043        _ => {
1044            let type_str = format_simple_data_type(data_type);
1045            result.push_str(&format!(
1046                "{indent}|-- {field_name}: {type_str} (nullable = {nullable_str})\n"
1047            ));
1048        }
1049    }
1050}
1051
1052/// Format simple DataType in lowercase format (for leaf nodes)
1053fn format_simple_data_type(data_type: &DataType) -> String {
1054    match data_type {
1055        DataType::Boolean => "boolean".to_string(),
1056        DataType::Int8 => "int8".to_string(),
1057        DataType::Int16 => "int16".to_string(),
1058        DataType::Int32 => "int32".to_string(),
1059        DataType::Int64 => "int64".to_string(),
1060        DataType::UInt8 => "uint8".to_string(),
1061        DataType::UInt16 => "uint16".to_string(),
1062        DataType::UInt32 => "uint32".to_string(),
1063        DataType::UInt64 => "uint64".to_string(),
1064        DataType::Float16 => "float16".to_string(),
1065        DataType::Float32 => "float32".to_string(),
1066        DataType::Float64 => "float64".to_string(),
1067        DataType::Utf8 => "utf8".to_string(),
1068        DataType::LargeUtf8 => "large_utf8".to_string(),
1069        DataType::Binary => "binary".to_string(),
1070        DataType::LargeBinary => "large_binary".to_string(),
1071        DataType::FixedSizeBinary(_) => "fixed_size_binary".to_string(),
1072        DataType::Date32 => "date32".to_string(),
1073        DataType::Date64 => "date64".to_string(),
1074        DataType::Time32(_) => "time32".to_string(),
1075        DataType::Time64(_) => "time64".to_string(),
1076        DataType::Timestamp(_, tz) => match tz {
1077            Some(tz_str) => format!("timestamp ({tz_str})"),
1078            None => "timestamp".to_string(),
1079        },
1080        DataType::Interval(_) => "interval".to_string(),
1081        DataType::Dictionary(_, value_type) => {
1082            format_simple_data_type(value_type.as_ref())
1083        }
1084        DataType::Decimal32(precision, scale) => {
1085            format!("decimal32({precision}, {scale})")
1086        }
1087        DataType::Decimal64(precision, scale) => {
1088            format!("decimal64({precision}, {scale})")
1089        }
1090        DataType::Decimal128(precision, scale) => {
1091            format!("decimal128({precision}, {scale})")
1092        }
1093        DataType::Decimal256(precision, scale) => {
1094            format!("decimal256({precision}, {scale})")
1095        }
1096        DataType::Null => "null".to_string(),
1097        _ => format!("{data_type}").to_lowercase(),
1098    }
1099}
1100
1101/// Allow DFSchema to be converted into an Arrow `&Schema`
1102impl AsRef<Schema> for DFSchema {
1103    fn as_ref(&self) -> &Schema {
1104        self.as_arrow()
1105    }
1106}
1107
1108/// Allow DFSchema to be converted into an Arrow `&SchemaRef` (to clone, for
1109/// example)
1110impl AsRef<SchemaRef> for DFSchema {
1111    fn as_ref(&self) -> &SchemaRef {
1112        self.inner()
1113    }
1114}
1115
1116/// Create a `DFSchema` from an Arrow schema
1117impl TryFrom<Schema> for DFSchema {
1118    type Error = DataFusionError;
1119    fn try_from(schema: Schema) -> Result<Self, Self::Error> {
1120        Self::try_from(Arc::new(schema))
1121    }
1122}
1123
1124impl TryFrom<SchemaRef> for DFSchema {
1125    type Error = DataFusionError;
1126    fn try_from(schema: SchemaRef) -> Result<Self, Self::Error> {
1127        let field_count = schema.fields.len();
1128        let dfschema = Self {
1129            inner: schema,
1130            field_qualifiers: vec![None; field_count],
1131            functional_dependencies: FunctionalDependencies::empty(),
1132        };
1133        // Without checking names, because schema here may have duplicate field names.
1134        // For example, Partial AggregateMode will generate duplicate field names from
1135        // state_fields.
1136        // See <https://github.com/apache/datafusion/issues/17715>
1137        // dfschema.check_names()?;
1138        Ok(dfschema)
1139    }
1140}
1141
1142// Hashing refers to a subset of fields considered in PartialEq.
1143impl Hash for DFSchema {
1144    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1145        self.inner.fields.hash(state);
1146        self.inner.metadata.len().hash(state); // HashMap is not hashable
1147    }
1148}
1149
1150/// Convenience trait to convert Schema like things to DFSchema and DFSchemaRef with fewer keystrokes
1151pub trait ToDFSchema
1152where
1153    Self: Sized,
1154{
1155    /// Attempt to create a DSSchema
1156    fn to_dfschema(self) -> Result<DFSchema>;
1157
1158    /// Attempt to create a DSSchemaRef
1159    fn to_dfschema_ref(self) -> Result<DFSchemaRef> {
1160        Ok(Arc::new(self.to_dfschema()?))
1161    }
1162}
1163
1164impl ToDFSchema for Schema {
1165    fn to_dfschema(self) -> Result<DFSchema> {
1166        DFSchema::try_from(self)
1167    }
1168}
1169
1170impl ToDFSchema for SchemaRef {
1171    fn to_dfschema(self) -> Result<DFSchema> {
1172        DFSchema::try_from(self)
1173    }
1174}
1175
1176impl ToDFSchema for Vec<Field> {
1177    fn to_dfschema(self) -> Result<DFSchema> {
1178        let field_count = self.len();
1179        let schema = Schema {
1180            fields: self.into(),
1181            metadata: HashMap::new(),
1182        };
1183        let dfschema = DFSchema {
1184            inner: schema.into(),
1185            field_qualifiers: vec![None; field_count],
1186            functional_dependencies: FunctionalDependencies::empty(),
1187        };
1188        Ok(dfschema)
1189    }
1190}
1191
1192impl Display for DFSchema {
1193    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
1194        write!(
1195            f,
1196            "fields:[{}], metadata:{:?}",
1197            self.iter()
1198                .map(|(q, f)| qualified_name(q, f.name()))
1199                .collect::<Vec<String>>()
1200                .join(", "),
1201            self.inner.metadata
1202        )
1203    }
1204}
1205
1206/// Provides schema information needed by certain methods of `Expr`
1207/// (defined in the datafusion-common crate).
1208///
1209/// Note that this trait is implemented for &[DFSchema] which is
1210/// widely used in the DataFusion codebase.
1211pub trait ExprSchema: std::fmt::Debug {
1212    /// Is this column reference nullable?
1213    fn nullable(&self, col: &Column) -> Result<bool> {
1214        Ok(self.field_from_column(col)?.is_nullable())
1215    }
1216
1217    /// What is the datatype of this column?
1218    fn data_type(&self, col: &Column) -> Result<&DataType> {
1219        Ok(self.field_from_column(col)?.data_type())
1220    }
1221
1222    /// Returns the column's optional metadata.
1223    fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
1224        Ok(self.field_from_column(col)?.metadata())
1225    }
1226
1227    /// Return the column's datatype and nullability
1228    fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
1229        let field = self.field_from_column(col)?;
1230        Ok((field.data_type(), field.is_nullable()))
1231    }
1232
1233    // Return the column's field
1234    fn field_from_column(&self, col: &Column) -> Result<&Field>;
1235}
1236
1237// Implement `ExprSchema` for `Arc<DFSchema>`
1238impl<P: AsRef<DFSchema> + std::fmt::Debug> ExprSchema for P {
1239    fn nullable(&self, col: &Column) -> Result<bool> {
1240        self.as_ref().nullable(col)
1241    }
1242
1243    fn data_type(&self, col: &Column) -> Result<&DataType> {
1244        self.as_ref().data_type(col)
1245    }
1246
1247    fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
1248        ExprSchema::metadata(self.as_ref(), col)
1249    }
1250
1251    fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
1252        self.as_ref().data_type_and_nullable(col)
1253    }
1254
1255    fn field_from_column(&self, col: &Column) -> Result<&Field> {
1256        self.as_ref().field_from_column(col)
1257    }
1258}
1259
1260impl ExprSchema for DFSchema {
1261    fn field_from_column(&self, col: &Column) -> Result<&Field> {
1262        match &col.relation {
1263            Some(r) => self.field_with_qualified_name(r, &col.name),
1264            None => self.field_with_unqualified_name(&col.name),
1265        }
1266    }
1267}
1268
1269/// DataFusion-specific extensions to [`Schema`].
1270pub trait SchemaExt {
1271    /// This is a specialized version of Eq that ignores differences
1272    /// in nullability and metadata.
1273    ///
1274    /// It works the same as [`DFSchema::equivalent_names_and_types`].
1275    fn equivalent_names_and_types(&self, other: &Self) -> bool;
1276
1277    /// Returns nothing if the two schemas have the same qualified named
1278    /// fields with logically equivalent data types. Returns internal error otherwise.
1279    ///
1280    /// Use [DFSchema]::equivalent_names_and_types for stricter semantic type
1281    /// equivalence checking.
1282    ///
1283    /// It is only used by insert into cases.
1284    fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()>;
1285}
1286
1287impl SchemaExt for Schema {
1288    fn equivalent_names_and_types(&self, other: &Self) -> bool {
1289        if self.fields().len() != other.fields().len() {
1290            return false;
1291        }
1292
1293        self.fields()
1294            .iter()
1295            .zip(other.fields().iter())
1296            .all(|(f1, f2)| {
1297                f1.name() == f2.name()
1298                    && DFSchema::datatype_is_semantically_equal(
1299                        f1.data_type(),
1300                        f2.data_type(),
1301                    )
1302            })
1303    }
1304
1305    // It is only used by insert into cases.
1306    fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
1307        // case 1 : schema length mismatch
1308        if self.fields().len() != other.fields().len() {
1309            _plan_err!(
1310                "Inserting query must have the same schema length as the table. \
1311            Expected table schema length: {}, got: {}",
1312                self.fields().len(),
1313                other.fields().len()
1314            )
1315        } else {
1316            // case 2 : schema length match, but fields mismatch
1317            // check if the fields name are the same and have the same data types
1318            self.fields()
1319                .iter()
1320                .zip(other.fields().iter())
1321                .try_for_each(|(f1, f2)| {
1322                    if f1.name() != f2.name() || (!DFSchema::datatype_is_logically_equal(f1.data_type(), f2.data_type()) && !can_cast_types(f2.data_type(), f1.data_type())) {
1323                        _plan_err!(
1324                            "Inserting query schema mismatch: Expected table field '{}' with type {}, \
1325                            but got '{}' with type {}.",
1326                            f1.name(),
1327                            f1.data_type(),
1328                            f2.name(),
1329                            f2.data_type())
1330                    } else {
1331                        Ok(())
1332                    }
1333                })
1334        }
1335    }
1336}
1337
1338pub fn qualified_name(qualifier: Option<&TableReference>, name: &str) -> String {
1339    match qualifier {
1340        Some(q) => format!("{q}.{name}"),
1341        None => name.to_string(),
1342    }
1343}
1344
1345#[cfg(test)]
1346mod tests {
1347    use crate::assert_contains;
1348
1349    use super::*;
1350
1351    #[test]
1352    fn qualifier_in_name() -> Result<()> {
1353        let col = Column::from_name("t1.c0");
1354        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1355        // lookup with unqualified name "t1.c0"
1356        let err = schema.index_of_column(&col).unwrap_err();
1357        let expected = "Schema error: No field named \"t1.c0\". \
1358            Column names are case sensitive. \
1359            You can use double quotes to refer to the \"\"t1.c0\"\" column \
1360            or set the datafusion.sql_parser.enable_ident_normalization configuration. \
1361            Did you mean 't1.c0'?.";
1362        assert_eq!(err.strip_backtrace(), expected);
1363        Ok(())
1364    }
1365
1366    #[test]
1367    fn quoted_qualifiers_in_name() -> Result<()> {
1368        let col = Column::from_name("t1.c0");
1369        let schema = DFSchema::try_from_qualified_schema(
1370            "t1",
1371            &Schema::new(vec![
1372                Field::new("CapitalColumn", DataType::Boolean, true),
1373                Field::new("field.with.period", DataType::Boolean, true),
1374            ]),
1375        )?;
1376
1377        // lookup with unqualified name "t1.c0"
1378        let err = schema.index_of_column(&col).unwrap_err();
1379        let expected = "Schema error: No field named \"t1.c0\". \
1380            Valid fields are t1.\"CapitalColumn\", t1.\"field.with.period\".";
1381        assert_eq!(err.strip_backtrace(), expected);
1382        Ok(())
1383    }
1384
1385    #[test]
1386    fn from_unqualified_schema() -> Result<()> {
1387        let schema = DFSchema::try_from(test_schema_1())?;
1388        assert_eq!("fields:[c0, c1], metadata:{}", schema.to_string());
1389        Ok(())
1390    }
1391
1392    #[test]
1393    fn from_qualified_schema() -> Result<()> {
1394        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1395        assert_eq!("fields:[t1.c0, t1.c1], metadata:{}", schema.to_string());
1396        Ok(())
1397    }
1398
1399    #[test]
1400    fn test_from_field_specific_qualified_schema() -> Result<()> {
1401        let schema = DFSchema::from_field_specific_qualified_schema(
1402            vec![Some("t1".into()), None],
1403            &Arc::new(Schema::new(vec![
1404                Field::new("c0", DataType::Boolean, true),
1405                Field::new("c1", DataType::Boolean, true),
1406            ])),
1407        )?;
1408        assert_eq!("fields:[t1.c0, c1], metadata:{}", schema.to_string());
1409        Ok(())
1410    }
1411
1412    #[test]
1413    fn test_from_qualified_fields() -> Result<()> {
1414        let schema = DFSchema::new_with_metadata(
1415            vec![
1416                (
1417                    Some("t0".into()),
1418                    Arc::new(Field::new("c0", DataType::Boolean, true)),
1419                ),
1420                (None, Arc::new(Field::new("c1", DataType::Boolean, true))),
1421            ],
1422            HashMap::new(),
1423        )?;
1424        assert_eq!("fields:[t0.c0, c1], metadata:{}", schema.to_string());
1425        Ok(())
1426    }
1427
1428    #[test]
1429    fn from_qualified_schema_into_arrow_schema() -> Result<()> {
1430        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1431        let arrow_schema = schema.as_arrow();
1432        insta::assert_snapshot!(arrow_schema.to_string(), @r#"Field { "c0": nullable Boolean }, Field { "c1": nullable Boolean }"#);
1433        Ok(())
1434    }
1435
1436    #[test]
1437    fn join_qualified() -> Result<()> {
1438        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1439        let right = DFSchema::try_from_qualified_schema("t2", &test_schema_1())?;
1440        let join = left.join(&right)?;
1441        assert_eq!(
1442            "fields:[t1.c0, t1.c1, t2.c0, t2.c1], metadata:{}",
1443            join.to_string()
1444        );
1445        // test valid access
1446        assert!(join
1447            .field_with_qualified_name(&TableReference::bare("t1"), "c0")
1448            .is_ok());
1449        assert!(join
1450            .field_with_qualified_name(&TableReference::bare("t2"), "c0")
1451            .is_ok());
1452        // test invalid access
1453        assert!(join.field_with_unqualified_name("c0").is_err());
1454        assert!(join.field_with_unqualified_name("t1.c0").is_err());
1455        assert!(join.field_with_unqualified_name("t2.c0").is_err());
1456        Ok(())
1457    }
1458
1459    #[test]
1460    fn join_qualified_duplicate() -> Result<()> {
1461        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1462        let right = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1463        let join = left.join(&right);
1464        assert_eq!(
1465            join.unwrap_err().strip_backtrace(),
1466            "Schema error: Schema contains duplicate qualified field name t1.c0",
1467        );
1468        Ok(())
1469    }
1470
1471    #[test]
1472    fn join_unqualified_duplicate() -> Result<()> {
1473        let left = DFSchema::try_from(test_schema_1())?;
1474        let right = DFSchema::try_from(test_schema_1())?;
1475        let join = left.join(&right);
1476        assert_eq!(
1477            join.unwrap_err().strip_backtrace(),
1478            "Schema error: Schema contains duplicate unqualified field name c0"
1479        );
1480        Ok(())
1481    }
1482
1483    #[test]
1484    fn join_mixed() -> Result<()> {
1485        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1486        let right = DFSchema::try_from(test_schema_2())?;
1487        let join = left.join(&right)?;
1488        assert_eq!(
1489            "fields:[t1.c0, t1.c1, c100, c101], metadata:{}",
1490            join.to_string()
1491        );
1492        // test valid access
1493        assert!(join
1494            .field_with_qualified_name(&TableReference::bare("t1"), "c0")
1495            .is_ok());
1496        assert!(join.field_with_unqualified_name("c0").is_ok());
1497        assert!(join.field_with_unqualified_name("c100").is_ok());
1498        assert!(join.field_with_name(None, "c100").is_ok());
1499        // test invalid access
1500        assert!(join.field_with_unqualified_name("t1.c0").is_err());
1501        assert!(join.field_with_unqualified_name("t1.c100").is_err());
1502        assert!(join
1503            .field_with_qualified_name(&TableReference::bare(""), "c100")
1504            .is_err());
1505        Ok(())
1506    }
1507
1508    #[test]
1509    fn join_mixed_duplicate() -> Result<()> {
1510        let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1511        let right = DFSchema::try_from(test_schema_1())?;
1512        let join = left.join(&right);
1513        assert_contains!(join.unwrap_err().to_string(),
1514                         "Schema error: Schema contains qualified \
1515                          field name t1.c0 and unqualified field name c0 which would be ambiguous");
1516        Ok(())
1517    }
1518
1519    #[test]
1520    fn helpful_error_messages() -> Result<()> {
1521        let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1522        let expected_help = "Valid fields are t1.c0, t1.c1.";
1523        assert_contains!(
1524            schema
1525                .field_with_qualified_name(&TableReference::bare("x"), "y")
1526                .unwrap_err()
1527                .to_string(),
1528            expected_help
1529        );
1530        assert_contains!(
1531            schema
1532                .field_with_unqualified_name("y")
1533                .unwrap_err()
1534                .to_string(),
1535            expected_help
1536        );
1537        assert!(schema.index_of_column_by_name(None, "y").is_none());
1538        assert!(schema.index_of_column_by_name(None, "t1.c0").is_none());
1539
1540        Ok(())
1541    }
1542
1543    #[test]
1544    fn select_without_valid_fields() {
1545        let schema = DFSchema::empty();
1546
1547        let col = Column::from_qualified_name("t1.c0");
1548        let err = schema.index_of_column(&col).unwrap_err();
1549        let expected = "Schema error: No field named t1.c0.";
1550        assert_eq!(err.strip_backtrace(), expected);
1551
1552        // the same check without qualifier
1553        let col = Column::from_name("c0");
1554        let err = schema.index_of_column(&col).err().unwrap();
1555        let expected = "Schema error: No field named c0.";
1556        assert_eq!(err.strip_backtrace(), expected);
1557    }
1558
1559    #[test]
1560    fn into() {
1561        // Demonstrate how to convert back and forth between Schema, SchemaRef, DFSchema, and DFSchemaRef
1562        let arrow_schema = Schema::new_with_metadata(
1563            vec![Field::new("c0", DataType::Int64, true)],
1564            test_metadata(),
1565        );
1566        let arrow_schema_ref = Arc::new(arrow_schema.clone());
1567
1568        let df_schema = DFSchema {
1569            inner: Arc::clone(&arrow_schema_ref),
1570            field_qualifiers: vec![None; arrow_schema_ref.fields.len()],
1571            functional_dependencies: FunctionalDependencies::empty(),
1572        };
1573        let df_schema_ref = Arc::new(df_schema.clone());
1574
1575        {
1576            let arrow_schema = arrow_schema.clone();
1577            let arrow_schema_ref = Arc::clone(&arrow_schema_ref);
1578
1579            assert_eq!(df_schema, arrow_schema.to_dfschema().unwrap());
1580            assert_eq!(df_schema, arrow_schema_ref.to_dfschema().unwrap());
1581        }
1582
1583        {
1584            let arrow_schema = arrow_schema.clone();
1585            let arrow_schema_ref = Arc::clone(&arrow_schema_ref);
1586
1587            assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
1588            assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
1589        }
1590
1591        // Now, consume the refs
1592        assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
1593        assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
1594    }
1595
1596    fn test_schema_1() -> Schema {
1597        Schema::new(vec![
1598            Field::new("c0", DataType::Boolean, true),
1599            Field::new("c1", DataType::Boolean, true),
1600        ])
1601    }
1602    #[test]
1603    fn test_dfschema_to_schema_conversion() {
1604        let mut a_metadata = HashMap::new();
1605        a_metadata.insert("key".to_string(), "value".to_string());
1606        let a_field = Field::new("a", DataType::Int64, false).with_metadata(a_metadata);
1607
1608        let mut b_metadata = HashMap::new();
1609        b_metadata.insert("key".to_string(), "value".to_string());
1610        let b_field = Field::new("b", DataType::Int64, false).with_metadata(b_metadata);
1611
1612        let schema = Arc::new(Schema::new(vec![a_field, b_field]));
1613
1614        let df_schema = DFSchema {
1615            inner: Arc::clone(&schema),
1616            field_qualifiers: vec![None; schema.fields.len()],
1617            functional_dependencies: FunctionalDependencies::empty(),
1618        };
1619
1620        assert_eq!(df_schema.inner.metadata(), schema.metadata())
1621    }
1622
1623    #[test]
1624    fn test_contain_column() -> Result<()> {
1625        // qualified exists
1626        {
1627            let col = Column::from_qualified_name("t1.c0");
1628            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1629            assert!(schema.is_column_from_schema(&col));
1630        }
1631
1632        // qualified not exists
1633        {
1634            let col = Column::from_qualified_name("t1.c2");
1635            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1636            assert!(!schema.is_column_from_schema(&col));
1637        }
1638
1639        // unqualified exists
1640        {
1641            let col = Column::from_name("c0");
1642            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1643            assert!(schema.is_column_from_schema(&col));
1644        }
1645
1646        // unqualified not exists
1647        {
1648            let col = Column::from_name("c2");
1649            let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1650            assert!(!schema.is_column_from_schema(&col));
1651        }
1652
1653        Ok(())
1654    }
1655
1656    #[test]
1657    fn test_datatype_is_logically_equal() {
1658        assert!(DFSchema::datatype_is_logically_equal(
1659            &DataType::Int8,
1660            &DataType::Int8
1661        ));
1662
1663        assert!(!DFSchema::datatype_is_logically_equal(
1664            &DataType::Int8,
1665            &DataType::Int16
1666        ));
1667
1668        // Test lists
1669
1670        // Succeeds if both have the same element type, disregards names and nullability
1671        assert!(DFSchema::datatype_is_logically_equal(
1672            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1673            &DataType::List(Field::new("element", DataType::Int8, false).into())
1674        ));
1675
1676        // Fails if element type is different
1677        assert!(!DFSchema::datatype_is_logically_equal(
1678            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1679            &DataType::List(Field::new_list_field(DataType::Int16, true).into())
1680        ));
1681
1682        // Test maps
1683        let map_field = DataType::Map(
1684            Field::new(
1685                "entries",
1686                DataType::Struct(Fields::from(vec![
1687                    Field::new("key", DataType::Int8, false),
1688                    Field::new("value", DataType::Int8, true),
1689                ])),
1690                true,
1691            )
1692            .into(),
1693            true,
1694        );
1695
1696        // Succeeds if both maps have the same key and value types, disregards names and nullability
1697        assert!(DFSchema::datatype_is_logically_equal(
1698            &map_field,
1699            &DataType::Map(
1700                Field::new(
1701                    "pairs",
1702                    DataType::Struct(Fields::from(vec![
1703                        Field::new("one", DataType::Int8, false),
1704                        Field::new("two", DataType::Int8, false)
1705                    ])),
1706                    true
1707                )
1708                .into(),
1709                true
1710            )
1711        ));
1712        // Fails if value type is different
1713        assert!(!DFSchema::datatype_is_logically_equal(
1714            &map_field,
1715            &DataType::Map(
1716                Field::new(
1717                    "entries",
1718                    DataType::Struct(Fields::from(vec![
1719                        Field::new("key", DataType::Int8, false),
1720                        Field::new("value", DataType::Int16, true)
1721                    ])),
1722                    true
1723                )
1724                .into(),
1725                true
1726            )
1727        ));
1728
1729        // Fails if key type is different
1730        assert!(!DFSchema::datatype_is_logically_equal(
1731            &map_field,
1732            &DataType::Map(
1733                Field::new(
1734                    "entries",
1735                    DataType::Struct(Fields::from(vec![
1736                        Field::new("key", DataType::Int16, false),
1737                        Field::new("value", DataType::Int8, true)
1738                    ])),
1739                    true
1740                )
1741                .into(),
1742                true
1743            )
1744        ));
1745
1746        // Test structs
1747
1748        let struct_field = DataType::Struct(Fields::from(vec![
1749            Field::new("a", DataType::Int8, true),
1750            Field::new("b", DataType::Int8, true),
1751        ]));
1752
1753        // Succeeds if both have same names and datatypes, ignores nullability
1754        assert!(DFSchema::datatype_is_logically_equal(
1755            &struct_field,
1756            &DataType::Struct(Fields::from(vec![
1757                Field::new("a", DataType::Int8, false),
1758                Field::new("b", DataType::Int8, true),
1759            ]))
1760        ));
1761
1762        // Fails if field names are different
1763        assert!(!DFSchema::datatype_is_logically_equal(
1764            &struct_field,
1765            &DataType::Struct(Fields::from(vec![
1766                Field::new("x", DataType::Int8, true),
1767                Field::new("y", DataType::Int8, true),
1768            ]))
1769        ));
1770
1771        // Fails if types are different
1772        assert!(!DFSchema::datatype_is_logically_equal(
1773            &struct_field,
1774            &DataType::Struct(Fields::from(vec![
1775                Field::new("a", DataType::Int16, true),
1776                Field::new("b", DataType::Int8, true),
1777            ]))
1778        ));
1779
1780        // Fails if more or less fields
1781        assert!(!DFSchema::datatype_is_logically_equal(
1782            &struct_field,
1783            &DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int8, true),]))
1784        ));
1785    }
1786
1787    #[test]
1788    fn test_datatype_is_logically_equivalent_to_dictionary() {
1789        // Dictionary is logically equal to its value type
1790        assert!(DFSchema::datatype_is_logically_equal(
1791            &DataType::Utf8,
1792            &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
1793        ));
1794    }
1795
1796    #[test]
1797    fn test_datatype_is_semantically_equal() {
1798        assert!(DFSchema::datatype_is_semantically_equal(
1799            &DataType::Int8,
1800            &DataType::Int8
1801        ));
1802
1803        assert!(!DFSchema::datatype_is_semantically_equal(
1804            &DataType::Int8,
1805            &DataType::Int16
1806        ));
1807
1808        // Succeeds if decimal precision and scale are different
1809        assert!(DFSchema::datatype_is_semantically_equal(
1810            &DataType::Decimal32(1, 2),
1811            &DataType::Decimal32(2, 1),
1812        ));
1813
1814        assert!(DFSchema::datatype_is_semantically_equal(
1815            &DataType::Decimal64(1, 2),
1816            &DataType::Decimal64(2, 1),
1817        ));
1818
1819        assert!(DFSchema::datatype_is_semantically_equal(
1820            &DataType::Decimal128(1, 2),
1821            &DataType::Decimal128(2, 1),
1822        ));
1823
1824        assert!(DFSchema::datatype_is_semantically_equal(
1825            &DataType::Decimal256(1, 2),
1826            &DataType::Decimal256(2, 1),
1827        ));
1828
1829        // Any two timestamp types should match
1830        assert!(DFSchema::datatype_is_semantically_equal(
1831            &DataType::Timestamp(
1832                arrow::datatypes::TimeUnit::Microsecond,
1833                Some("UTC".into())
1834            ),
1835            &DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1836        ));
1837
1838        // Test lists
1839
1840        // Succeeds if both have the same element type, disregards names and nullability
1841        assert!(DFSchema::datatype_is_semantically_equal(
1842            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1843            &DataType::List(Field::new("element", DataType::Int8, false).into())
1844        ));
1845
1846        // Fails if element type is different
1847        assert!(!DFSchema::datatype_is_semantically_equal(
1848            &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1849            &DataType::List(Field::new_list_field(DataType::Int16, true).into())
1850        ));
1851
1852        // Test maps
1853        let map_field = DataType::Map(
1854            Field::new(
1855                "entries",
1856                DataType::Struct(Fields::from(vec![
1857                    Field::new("key", DataType::Int8, false),
1858                    Field::new("value", DataType::Int8, true),
1859                ])),
1860                true,
1861            )
1862            .into(),
1863            true,
1864        );
1865
1866        // Succeeds if both maps have the same key and value types, disregards names and nullability
1867        assert!(DFSchema::datatype_is_semantically_equal(
1868            &map_field,
1869            &DataType::Map(
1870                Field::new(
1871                    "pairs",
1872                    DataType::Struct(Fields::from(vec![
1873                        Field::new("one", DataType::Int8, false),
1874                        Field::new("two", DataType::Int8, false)
1875                    ])),
1876                    true
1877                )
1878                .into(),
1879                true
1880            )
1881        ));
1882        // Fails if value type is different
1883        assert!(!DFSchema::datatype_is_semantically_equal(
1884            &map_field,
1885            &DataType::Map(
1886                Field::new(
1887                    "entries",
1888                    DataType::Struct(Fields::from(vec![
1889                        Field::new("key", DataType::Int8, false),
1890                        Field::new("value", DataType::Int16, true)
1891                    ])),
1892                    true
1893                )
1894                .into(),
1895                true
1896            )
1897        ));
1898
1899        // Fails if key type is different
1900        assert!(!DFSchema::datatype_is_semantically_equal(
1901            &map_field,
1902            &DataType::Map(
1903                Field::new(
1904                    "entries",
1905                    DataType::Struct(Fields::from(vec![
1906                        Field::new("key", DataType::Int16, false),
1907                        Field::new("value", DataType::Int8, true)
1908                    ])),
1909                    true
1910                )
1911                .into(),
1912                true
1913            )
1914        ));
1915
1916        // Test structs
1917
1918        let struct_field = DataType::Struct(Fields::from(vec![
1919            Field::new("a", DataType::Int8, true),
1920            Field::new("b", DataType::Int8, true),
1921        ]));
1922
1923        // Succeeds if both have same names and datatypes, ignores nullability
1924        assert!(DFSchema::datatype_is_logically_equal(
1925            &struct_field,
1926            &DataType::Struct(Fields::from(vec![
1927                Field::new("a", DataType::Int8, false),
1928                Field::new("b", DataType::Int8, true),
1929            ]))
1930        ));
1931
1932        // Fails if field names are different
1933        assert!(!DFSchema::datatype_is_logically_equal(
1934            &struct_field,
1935            &DataType::Struct(Fields::from(vec![
1936                Field::new("x", DataType::Int8, true),
1937                Field::new("y", DataType::Int8, true),
1938            ]))
1939        ));
1940
1941        // Fails if types are different
1942        assert!(!DFSchema::datatype_is_logically_equal(
1943            &struct_field,
1944            &DataType::Struct(Fields::from(vec![
1945                Field::new("a", DataType::Int16, true),
1946                Field::new("b", DataType::Int8, true),
1947            ]))
1948        ));
1949
1950        // Fails if more or less fields
1951        assert!(!DFSchema::datatype_is_logically_equal(
1952            &struct_field,
1953            &DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int8, true),]))
1954        ));
1955    }
1956
1957    #[test]
1958    fn test_datatype_is_not_semantically_equivalent_to_dictionary() {
1959        // Dictionary is not semantically equal to its value type
1960        assert!(!DFSchema::datatype_is_semantically_equal(
1961            &DataType::Utf8,
1962            &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
1963        ));
1964    }
1965
1966    fn test_schema_2() -> Schema {
1967        Schema::new(vec![
1968            Field::new("c100", DataType::Boolean, true),
1969            Field::new("c101", DataType::Boolean, true),
1970        ])
1971    }
1972
1973    fn test_metadata() -> HashMap<String, String> {
1974        test_metadata_n(2)
1975    }
1976
1977    fn test_metadata_n(n: usize) -> HashMap<String, String> {
1978        (0..n).map(|i| (format!("k{i}"), format!("v{i}"))).collect()
1979    }
1980
1981    #[test]
1982    fn test_print_schema_unqualified() {
1983        let schema = DFSchema::from_unqualified_fields(
1984            vec![
1985                Field::new("id", DataType::Int32, false),
1986                Field::new("name", DataType::Utf8, true),
1987                Field::new("age", DataType::Int64, true),
1988                Field::new("active", DataType::Boolean, false),
1989            ]
1990            .into(),
1991            HashMap::new(),
1992        )
1993        .unwrap();
1994
1995        let output = schema.tree_string();
1996
1997        insta::assert_snapshot!(output, @r"
1998        root
1999         |-- id: int32 (nullable = false)
2000         |-- name: utf8 (nullable = true)
2001         |-- age: int64 (nullable = true)
2002         |-- active: boolean (nullable = false)
2003        ");
2004    }
2005
2006    #[test]
2007    fn test_print_schema_qualified() {
2008        let schema = DFSchema::try_from_qualified_schema(
2009            "table1",
2010            &Schema::new(vec![
2011                Field::new("id", DataType::Int32, false),
2012                Field::new("name", DataType::Utf8, true),
2013            ]),
2014        )
2015        .unwrap();
2016
2017        let output = schema.tree_string();
2018
2019        insta::assert_snapshot!(output, @r"
2020        root
2021         |-- table1.id: int32 (nullable = false)
2022         |-- table1.name: utf8 (nullable = true)
2023        ");
2024    }
2025
2026    #[test]
2027    fn test_print_schema_complex_types() {
2028        let struct_field = Field::new(
2029            "address",
2030            DataType::Struct(Fields::from(vec![
2031                Field::new("street", DataType::Utf8, true),
2032                Field::new("city", DataType::Utf8, true),
2033            ])),
2034            true,
2035        );
2036
2037        let list_field = Field::new(
2038            "tags",
2039            DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
2040            true,
2041        );
2042
2043        let schema = DFSchema::from_unqualified_fields(
2044            vec![
2045                Field::new("id", DataType::Int32, false),
2046                struct_field,
2047                list_field,
2048                Field::new("score", DataType::Decimal128(10, 2), true),
2049            ]
2050            .into(),
2051            HashMap::new(),
2052        )
2053        .unwrap();
2054
2055        let output = schema.tree_string();
2056        insta::assert_snapshot!(output, @r"
2057        root
2058         |-- id: int32 (nullable = false)
2059         |-- address: struct (nullable = true)
2060         |    |-- street: utf8 (nullable = true)
2061         |    |-- city: utf8 (nullable = true)
2062         |-- tags: list (nullable = true)
2063         |    |-- item: utf8 (nullable = true)
2064         |-- score: decimal128(10, 2) (nullable = true)
2065        ");
2066    }
2067
2068    #[test]
2069    fn test_print_schema_empty() {
2070        let schema = DFSchema::empty();
2071        let output = schema.tree_string();
2072        insta::assert_snapshot!(output, @r###"root"###);
2073    }
2074
2075    #[test]
2076    fn test_print_schema_deeply_nested_types() {
2077        // Create a deeply nested structure to test indentation and complex type formatting
2078        let inner_struct = Field::new(
2079            "inner",
2080            DataType::Struct(Fields::from(vec![
2081                Field::new("level1", DataType::Utf8, true),
2082                Field::new("level2", DataType::Int32, false),
2083            ])),
2084            true,
2085        );
2086
2087        let nested_list = Field::new(
2088            "nested_list",
2089            DataType::List(Arc::new(Field::new(
2090                "item",
2091                DataType::Struct(Fields::from(vec![
2092                    Field::new("id", DataType::Int64, false),
2093                    Field::new("value", DataType::Float64, true),
2094                ])),
2095                true,
2096            ))),
2097            true,
2098        );
2099
2100        let map_field = Field::new(
2101            "map_data",
2102            DataType::Map(
2103                Arc::new(Field::new(
2104                    "entries",
2105                    DataType::Struct(Fields::from(vec![
2106                        Field::new("key", DataType::Utf8, false),
2107                        Field::new(
2108                            "value",
2109                            DataType::List(Arc::new(Field::new(
2110                                "item",
2111                                DataType::Int32,
2112                                true,
2113                            ))),
2114                            true,
2115                        ),
2116                    ])),
2117                    false,
2118                )),
2119                false,
2120            ),
2121            true,
2122        );
2123
2124        let schema = DFSchema::from_unqualified_fields(
2125            vec![
2126                Field::new("simple_field", DataType::Utf8, true),
2127                inner_struct,
2128                nested_list,
2129                map_field,
2130                Field::new(
2131                    "timestamp_field",
2132                    DataType::Timestamp(
2133                        arrow::datatypes::TimeUnit::Microsecond,
2134                        Some("UTC".into()),
2135                    ),
2136                    false,
2137                ),
2138            ]
2139            .into(),
2140            HashMap::new(),
2141        )
2142        .unwrap();
2143
2144        let output = schema.tree_string();
2145
2146        insta::assert_snapshot!(output, @r"
2147        root
2148         |-- simple_field: utf8 (nullable = true)
2149         |-- inner: struct (nullable = true)
2150         |    |-- level1: utf8 (nullable = true)
2151         |    |-- level2: int32 (nullable = false)
2152         |-- nested_list: list (nullable = true)
2153         |    |-- item: struct (nullable = true)
2154         |    |    |-- id: int64 (nullable = false)
2155         |    |    |-- value: float64 (nullable = true)
2156         |-- map_data: map (nullable = true)
2157         |    |-- key: utf8 (nullable = false)
2158         |    |-- value: list (nullable = true)
2159         |    |    |-- item: int32 (nullable = true)
2160         |-- timestamp_field: timestamp (UTC) (nullable = false)
2161        ");
2162    }
2163
2164    #[test]
2165    fn test_print_schema_mixed_qualified_unqualified() {
2166        // Test a schema with mixed qualified and unqualified fields
2167        let schema = DFSchema::new_with_metadata(
2168            vec![
2169                (
2170                    Some("table1".into()),
2171                    Arc::new(Field::new("id", DataType::Int32, false)),
2172                ),
2173                (None, Arc::new(Field::new("name", DataType::Utf8, true))),
2174                (
2175                    Some("table2".into()),
2176                    Arc::new(Field::new("score", DataType::Float64, true)),
2177                ),
2178                (
2179                    None,
2180                    Arc::new(Field::new("active", DataType::Boolean, false)),
2181                ),
2182            ],
2183            HashMap::new(),
2184        )
2185        .unwrap();
2186
2187        let output = schema.tree_string();
2188
2189        insta::assert_snapshot!(output, @r"
2190        root
2191         |-- table1.id: int32 (nullable = false)
2192         |-- name: utf8 (nullable = true)
2193         |-- table2.score: float64 (nullable = true)
2194         |-- active: boolean (nullable = false)
2195        ");
2196    }
2197
2198    #[test]
2199    fn test_print_schema_array_of_map() {
2200        // Test the specific example from user feedback: array of map
2201        let map_field = Field::new(
2202            "entries",
2203            DataType::Struct(Fields::from(vec![
2204                Field::new("key", DataType::Utf8, false),
2205                Field::new("value", DataType::Utf8, false),
2206            ])),
2207            false,
2208        );
2209
2210        let array_of_map_field = Field::new(
2211            "array_map_field",
2212            DataType::List(Arc::new(Field::new(
2213                "item",
2214                DataType::Map(Arc::new(map_field), false),
2215                false,
2216            ))),
2217            false,
2218        );
2219
2220        let schema = DFSchema::from_unqualified_fields(
2221            vec![array_of_map_field].into(),
2222            HashMap::new(),
2223        )
2224        .unwrap();
2225
2226        let output = schema.tree_string();
2227
2228        insta::assert_snapshot!(output, @r"
2229        root
2230         |-- array_map_field: list (nullable = false)
2231         |    |-- item: map (nullable = false)
2232         |    |    |-- key: utf8 (nullable = false)
2233         |    |    |-- value: utf8 (nullable = false)
2234        ");
2235    }
2236
2237    #[test]
2238    fn test_print_schema_complex_type_combinations() {
2239        // Test various combinations of list, struct, and map types
2240
2241        // List of structs
2242        let list_of_structs = Field::new(
2243            "list_of_structs",
2244            DataType::List(Arc::new(Field::new(
2245                "item",
2246                DataType::Struct(Fields::from(vec![
2247                    Field::new("id", DataType::Int32, false),
2248                    Field::new("name", DataType::Utf8, true),
2249                    Field::new("score", DataType::Float64, true),
2250                ])),
2251                true,
2252            ))),
2253            true,
2254        );
2255
2256        // Struct containing lists
2257        let struct_with_lists = Field::new(
2258            "struct_with_lists",
2259            DataType::Struct(Fields::from(vec![
2260                Field::new(
2261                    "tags",
2262                    DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
2263                    true,
2264                ),
2265                Field::new(
2266                    "scores",
2267                    DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
2268                    false,
2269                ),
2270                Field::new("metadata", DataType::Utf8, true),
2271            ])),
2272            false,
2273        );
2274
2275        // Map with struct values
2276        let map_with_struct_values = Field::new(
2277            "map_with_struct_values",
2278            DataType::Map(
2279                Arc::new(Field::new(
2280                    "entries",
2281                    DataType::Struct(Fields::from(vec![
2282                        Field::new("key", DataType::Utf8, false),
2283                        Field::new(
2284                            "value",
2285                            DataType::Struct(Fields::from(vec![
2286                                Field::new("count", DataType::Int64, false),
2287                                Field::new("active", DataType::Boolean, true),
2288                            ])),
2289                            true,
2290                        ),
2291                    ])),
2292                    false,
2293                )),
2294                false,
2295            ),
2296            true,
2297        );
2298
2299        // List of maps
2300        let list_of_maps = Field::new(
2301            "list_of_maps",
2302            DataType::List(Arc::new(Field::new(
2303                "item",
2304                DataType::Map(
2305                    Arc::new(Field::new(
2306                        "entries",
2307                        DataType::Struct(Fields::from(vec![
2308                            Field::new("key", DataType::Utf8, false),
2309                            Field::new("value", DataType::Int32, true),
2310                        ])),
2311                        false,
2312                    )),
2313                    false,
2314                ),
2315                true,
2316            ))),
2317            true,
2318        );
2319
2320        // Deeply nested: struct containing list of structs containing maps
2321        let deeply_nested = Field::new(
2322            "deeply_nested",
2323            DataType::Struct(Fields::from(vec![
2324                Field::new("level1", DataType::Utf8, true),
2325                Field::new(
2326                    "level2",
2327                    DataType::List(Arc::new(Field::new(
2328                        "item",
2329                        DataType::Struct(Fields::from(vec![
2330                            Field::new("id", DataType::Int32, false),
2331                            Field::new(
2332                                "properties",
2333                                DataType::Map(
2334                                    Arc::new(Field::new(
2335                                        "entries",
2336                                        DataType::Struct(Fields::from(vec![
2337                                            Field::new("key", DataType::Utf8, false),
2338                                            Field::new("value", DataType::Float64, true),
2339                                        ])),
2340                                        false,
2341                                    )),
2342                                    false,
2343                                ),
2344                                true,
2345                            ),
2346                        ])),
2347                        true,
2348                    ))),
2349                    false,
2350                ),
2351            ])),
2352            true,
2353        );
2354
2355        let schema = DFSchema::from_unqualified_fields(
2356            vec![
2357                list_of_structs,
2358                struct_with_lists,
2359                map_with_struct_values,
2360                list_of_maps,
2361                deeply_nested,
2362            ]
2363            .into(),
2364            HashMap::new(),
2365        )
2366        .unwrap();
2367
2368        let output = schema.tree_string();
2369
2370        insta::assert_snapshot!(output, @r"
2371        root
2372         |-- list_of_structs: list (nullable = true)
2373         |    |-- item: struct (nullable = true)
2374         |    |    |-- id: int32 (nullable = false)
2375         |    |    |-- name: utf8 (nullable = true)
2376         |    |    |-- score: float64 (nullable = true)
2377         |-- struct_with_lists: struct (nullable = false)
2378         |    |-- tags: list (nullable = true)
2379         |    |    |-- item: utf8 (nullable = true)
2380         |    |-- scores: list (nullable = false)
2381         |    |    |-- item: int32 (nullable = true)
2382         |    |-- metadata: utf8 (nullable = true)
2383         |-- map_with_struct_values: map (nullable = true)
2384         |    |-- key: utf8 (nullable = false)
2385         |    |-- value: struct (nullable = true)
2386         |    |    |-- count: int64 (nullable = false)
2387         |    |    |-- active: boolean (nullable = true)
2388         |-- list_of_maps: list (nullable = true)
2389         |    |-- item: map (nullable = true)
2390         |    |    |-- key: utf8 (nullable = false)
2391         |    |    |-- value: int32 (nullable = false)
2392         |-- deeply_nested: struct (nullable = true)
2393         |    |-- level1: utf8 (nullable = true)
2394         |    |-- level2: list (nullable = false)
2395         |    |    |-- item: struct (nullable = true)
2396         |    |    |    |-- id: int32 (nullable = false)
2397         |    |    |    |-- properties: map (nullable = true)
2398         |    |    |    |    |-- key: utf8 (nullable = false)
2399         |    |    |    |    |-- value: float64 (nullable = false)
2400        ");
2401    }
2402
2403    #[test]
2404    fn test_print_schema_edge_case_types() {
2405        // Test edge cases and special types
2406        let schema = DFSchema::from_unqualified_fields(
2407            vec![
2408                Field::new("null_field", DataType::Null, true),
2409                Field::new("binary_field", DataType::Binary, false),
2410                Field::new("large_binary", DataType::LargeBinary, true),
2411                Field::new("large_utf8", DataType::LargeUtf8, false),
2412                Field::new("fixed_size_binary", DataType::FixedSizeBinary(16), true),
2413                Field::new(
2414                    "fixed_size_list",
2415                    DataType::FixedSizeList(
2416                        Arc::new(Field::new("item", DataType::Int32, true)),
2417                        5,
2418                    ),
2419                    false,
2420                ),
2421                Field::new("decimal32", DataType::Decimal32(9, 4), true),
2422                Field::new("decimal64", DataType::Decimal64(9, 4), true),
2423                Field::new("decimal128", DataType::Decimal128(18, 4), true),
2424                Field::new("decimal256", DataType::Decimal256(38, 10), false),
2425                Field::new("date32", DataType::Date32, true),
2426                Field::new("date64", DataType::Date64, false),
2427                Field::new(
2428                    "time32_seconds",
2429                    DataType::Time32(arrow::datatypes::TimeUnit::Second),
2430                    true,
2431                ),
2432                Field::new(
2433                    "time64_nanoseconds",
2434                    DataType::Time64(arrow::datatypes::TimeUnit::Nanosecond),
2435                    false,
2436                ),
2437            ]
2438            .into(),
2439            HashMap::new(),
2440        )
2441        .unwrap();
2442
2443        let output = schema.tree_string();
2444
2445        insta::assert_snapshot!(output, @r"
2446        root
2447         |-- null_field: null (nullable = true)
2448         |-- binary_field: binary (nullable = false)
2449         |-- large_binary: large_binary (nullable = true)
2450         |-- large_utf8: large_utf8 (nullable = false)
2451         |-- fixed_size_binary: fixed_size_binary (nullable = true)
2452         |-- fixed_size_list: fixed size list (nullable = false)
2453         |    |-- item: int32 (nullable = true)
2454         |-- decimal32: decimal32(9, 4) (nullable = true)
2455         |-- decimal64: decimal64(9, 4) (nullable = true)
2456         |-- decimal128: decimal128(18, 4) (nullable = true)
2457         |-- decimal256: decimal256(38, 10) (nullable = false)
2458         |-- date32: date32 (nullable = true)
2459         |-- date64: date64 (nullable = false)
2460         |-- time32_seconds: time32 (nullable = true)
2461         |-- time64_nanoseconds: time64 (nullable = false)
2462        ");
2463    }
2464}