1use std::collections::{BTreeSet, HashMap, HashSet};
22use std::fmt::{Display, Formatter};
23use std::hash::Hash;
24use std::sync::Arc;
25
26use crate::error::{DataFusionError, Result, _plan_err, _schema_err};
27use crate::{
28 field_not_found, unqualified_field_not_found, Column, FunctionalDependencies,
29 SchemaError, TableReference,
30};
31
32use arrow::compute::can_cast_types;
33use arrow::datatypes::{
34 DataType, Field, FieldRef, Fields, Schema, SchemaBuilder, SchemaRef,
35};
36
37pub type DFSchemaRef = Arc<DFSchema>;
39
40#[derive(Debug, Clone, PartialEq, Eq)]
106pub struct DFSchema {
107 inner: SchemaRef,
109 field_qualifiers: Vec<Option<TableReference>>,
112 functional_dependencies: FunctionalDependencies,
114}
115
116impl DFSchema {
117 pub fn empty() -> Self {
119 Self {
120 inner: Arc::new(Schema::new([])),
121 field_qualifiers: vec![],
122 functional_dependencies: FunctionalDependencies::empty(),
123 }
124 }
125
126 pub fn as_arrow(&self) -> &Schema {
130 self.inner.as_ref()
131 }
132
133 pub fn inner(&self) -> &SchemaRef {
137 &self.inner
138 }
139
140 pub fn new_with_metadata(
142 qualified_fields: Vec<(Option<TableReference>, Arc<Field>)>,
143 metadata: HashMap<String, String>,
144 ) -> Result<Self> {
145 let (qualifiers, fields): (Vec<Option<TableReference>>, Vec<Arc<Field>>) =
146 qualified_fields.into_iter().unzip();
147
148 let schema = Arc::new(Schema::new_with_metadata(fields, metadata));
149
150 let dfschema = Self {
151 inner: schema,
152 field_qualifiers: qualifiers,
153 functional_dependencies: FunctionalDependencies::empty(),
154 };
155 dfschema.check_names()?;
156 Ok(dfschema)
157 }
158
159 pub fn from_unqualified_fields(
161 fields: Fields,
162 metadata: HashMap<String, String>,
163 ) -> Result<Self> {
164 let field_count = fields.len();
165 let schema = Arc::new(Schema::new_with_metadata(fields, metadata));
166 let dfschema = Self {
167 inner: schema,
168 field_qualifiers: vec![None; field_count],
169 functional_dependencies: FunctionalDependencies::empty(),
170 };
171 dfschema.check_names()?;
172 Ok(dfschema)
173 }
174
175 pub fn try_from_qualified_schema(
180 qualifier: impl Into<TableReference>,
181 schema: &Schema,
182 ) -> Result<Self> {
183 let qualifier = qualifier.into();
184 let schema = DFSchema {
185 inner: schema.clone().into(),
186 field_qualifiers: vec![Some(qualifier); schema.fields.len()],
187 functional_dependencies: FunctionalDependencies::empty(),
188 };
189 schema.check_names()?;
190 Ok(schema)
191 }
192
193 pub fn from_field_specific_qualified_schema(
195 qualifiers: Vec<Option<TableReference>>,
196 schema: &SchemaRef,
197 ) -> Result<Self> {
198 let dfschema = Self {
199 inner: Arc::clone(schema),
200 field_qualifiers: qualifiers,
201 functional_dependencies: FunctionalDependencies::empty(),
202 };
203 dfschema.check_names()?;
204 Ok(dfschema)
205 }
206
207 pub fn with_field_specific_qualified_schema(
209 &self,
210 qualifiers: Vec<Option<TableReference>>,
211 ) -> Result<Self> {
212 if qualifiers.len() != self.fields().len() {
213 return _plan_err!(
214 "Number of qualifiers must match number of fields. Expected {}, got {}",
215 self.fields().len(),
216 qualifiers.len()
217 );
218 }
219 Ok(DFSchema {
220 inner: Arc::clone(&self.inner),
221 field_qualifiers: qualifiers,
222 functional_dependencies: self.functional_dependencies.clone(),
223 })
224 }
225
226 pub fn check_names(&self) -> Result<()> {
228 let mut qualified_names = BTreeSet::new();
229 let mut unqualified_names = BTreeSet::new();
230
231 for (field, qualifier) in self.inner.fields().iter().zip(&self.field_qualifiers) {
232 if let Some(qualifier) = qualifier {
233 if !qualified_names.insert((qualifier, field.name())) {
234 return _schema_err!(SchemaError::DuplicateQualifiedField {
235 qualifier: Box::new(qualifier.clone()),
236 name: field.name().to_string(),
237 });
238 }
239 } else if !unqualified_names.insert(field.name()) {
240 return _schema_err!(SchemaError::DuplicateUnqualifiedField {
241 name: field.name().to_string()
242 });
243 }
244 }
245
246 for (qualifier, name) in qualified_names {
247 if unqualified_names.contains(name) {
248 return _schema_err!(SchemaError::AmbiguousReference {
249 field: Box::new(Column::new(Some(qualifier.clone()), name))
250 });
251 }
252 }
253 Ok(())
254 }
255
256 pub fn with_functional_dependencies(
258 mut self,
259 functional_dependencies: FunctionalDependencies,
260 ) -> Result<Self> {
261 if functional_dependencies.is_valid(self.inner.fields.len()) {
262 self.functional_dependencies = functional_dependencies;
263 Ok(self)
264 } else {
265 _plan_err!(
266 "Invalid functional dependency: {:?}",
267 functional_dependencies
268 )
269 }
270 }
271
272 pub fn join(&self, schema: &DFSchema) -> Result<Self> {
275 let mut schema_builder = SchemaBuilder::new();
276 schema_builder.extend(self.inner.fields().iter().cloned());
277 schema_builder.extend(schema.fields().iter().cloned());
278 let new_schema = schema_builder.finish();
279
280 let mut new_metadata = self.inner.metadata.clone();
281 new_metadata.extend(schema.inner.metadata.clone());
282 let new_schema_with_metadata = new_schema.with_metadata(new_metadata);
283
284 let mut new_qualifiers = self.field_qualifiers.clone();
285 new_qualifiers.extend_from_slice(schema.field_qualifiers.as_slice());
286
287 let new_self = Self {
288 inner: Arc::new(new_schema_with_metadata),
289 field_qualifiers: new_qualifiers,
290 functional_dependencies: FunctionalDependencies::empty(),
291 };
292 new_self.check_names()?;
293 Ok(new_self)
294 }
295
296 pub fn merge(&mut self, other_schema: &DFSchema) {
313 if other_schema.inner.fields.is_empty() {
314 return;
315 }
316
317 let self_fields: HashSet<(Option<&TableReference>, &str)> = self
318 .iter()
319 .map(|(qualifier, field)| (qualifier, field.name().as_str()))
320 .collect();
321 let self_unqualified_names: HashSet<&str> = self
322 .inner
323 .fields
324 .iter()
325 .map(|field| field.name().as_str())
326 .collect();
327
328 let mut schema_builder = SchemaBuilder::from(self.inner.fields.clone());
329 let mut qualifiers = Vec::new();
330 for (qualifier, field) in other_schema.iter() {
331 let duplicated_field = match qualifier {
333 Some(q) => {
334 self_fields.contains(&(Some(q), field.name().as_str()))
335 || self_fields.contains(&(None, field.name().as_str()))
336 }
337 None => self_unqualified_names.contains(field.name().as_str()),
339 };
340 if !duplicated_field {
341 schema_builder.push(Arc::clone(field));
342 qualifiers.push(qualifier.cloned());
343 }
344 }
345 let mut metadata = self.inner.metadata.clone();
346 metadata.extend(other_schema.inner.metadata.clone());
347
348 let finished = schema_builder.finish();
349 let finished_with_metadata = finished.with_metadata(metadata);
350 self.inner = finished_with_metadata.into();
351 self.field_qualifiers.extend(qualifiers);
352 }
353
354 pub fn fields(&self) -> &Fields {
356 &self.inner.fields
357 }
358
359 pub fn field(&self, i: usize) -> &Field {
362 &self.inner.fields[i]
363 }
364
365 pub fn qualified_field(&self, i: usize) -> (Option<&TableReference>, &Field) {
368 (self.field_qualifiers[i].as_ref(), self.field(i))
369 }
370
371 pub fn index_of_column_by_name(
372 &self,
373 qualifier: Option<&TableReference>,
374 name: &str,
375 ) -> Option<usize> {
376 let mut matches = self
377 .iter()
378 .enumerate()
379 .filter(|(_, (q, f))| match (qualifier, q) {
380 (Some(q), Some(field_q)) => q.resolved_eq(field_q) && f.name() == name,
384 (Some(_), None) => false,
386 (None, Some(_)) | (None, None) => f.name() == name,
388 })
389 .map(|(idx, _)| idx);
390 matches.next()
391 }
392
393 pub fn maybe_index_of_column(&self, col: &Column) -> Option<usize> {
399 self.index_of_column_by_name(col.relation.as_ref(), &col.name)
400 }
401
402 pub fn index_of_column(&self, col: &Column) -> Result<usize> {
408 self.maybe_index_of_column(col)
409 .ok_or_else(|| field_not_found(col.relation.clone(), &col.name, self))
410 }
411
412 pub fn is_column_from_schema(&self, col: &Column) -> bool {
414 self.index_of_column_by_name(col.relation.as_ref(), &col.name)
415 .is_some()
416 }
417
418 pub fn field_with_name(
420 &self,
421 qualifier: Option<&TableReference>,
422 name: &str,
423 ) -> Result<&Field> {
424 if let Some(qualifier) = qualifier {
425 self.field_with_qualified_name(qualifier, name)
426 } else {
427 self.field_with_unqualified_name(name)
428 }
429 }
430
431 pub fn qualified_field_with_name(
433 &self,
434 qualifier: Option<&TableReference>,
435 name: &str,
436 ) -> Result<(Option<&TableReference>, &Field)> {
437 if let Some(qualifier) = qualifier {
438 let idx = self
439 .index_of_column_by_name(Some(qualifier), name)
440 .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
441 Ok((self.field_qualifiers[idx].as_ref(), self.field(idx)))
442 } else {
443 self.qualified_field_with_unqualified_name(name)
444 }
445 }
446
447 pub fn fields_with_qualified(&self, qualifier: &TableReference) -> Vec<&Field> {
449 self.iter()
450 .filter(|(q, _)| q.map(|q| q.eq(qualifier)).unwrap_or(false))
451 .map(|(_, f)| f.as_ref())
452 .collect()
453 }
454
455 pub fn fields_indices_with_qualified(
457 &self,
458 qualifier: &TableReference,
459 ) -> Vec<usize> {
460 self.iter()
461 .enumerate()
462 .filter_map(|(idx, (q, _))| q.and_then(|q| q.eq(qualifier).then_some(idx)))
463 .collect()
464 }
465
466 pub fn fields_with_unqualified_name(&self, name: &str) -> Vec<&Field> {
468 self.fields()
469 .iter()
470 .filter(|field| field.name() == name)
471 .map(|f| f.as_ref())
472 .collect()
473 }
474
475 pub fn qualified_fields_with_unqualified_name(
477 &self,
478 name: &str,
479 ) -> Vec<(Option<&TableReference>, &Field)> {
480 self.iter()
481 .filter(|(_, field)| field.name() == name)
482 .map(|(qualifier, field)| (qualifier, field.as_ref()))
483 .collect()
484 }
485
486 pub fn columns_with_unqualified_name(&self, name: &str) -> Vec<Column> {
488 self.iter()
489 .filter(|(_, field)| field.name() == name)
490 .map(|(qualifier, field)| Column::new(qualifier.cloned(), field.name()))
491 .collect()
492 }
493
494 pub fn columns(&self) -> Vec<Column> {
496 self.iter()
497 .map(|(qualifier, field)| {
498 Column::new(qualifier.cloned(), field.name().clone())
499 })
500 .collect()
501 }
502
503 pub fn qualified_field_with_unqualified_name(
505 &self,
506 name: &str,
507 ) -> Result<(Option<&TableReference>, &Field)> {
508 let matches = self.qualified_fields_with_unqualified_name(name);
509 match matches.len() {
510 0 => Err(unqualified_field_not_found(name, self)),
511 1 => Ok((matches[0].0, matches[0].1)),
512 _ => {
513 let fields_without_qualifier = matches
521 .iter()
522 .filter(|(q, _)| q.is_none())
523 .collect::<Vec<_>>();
524 if fields_without_qualifier.len() == 1 {
525 Ok((fields_without_qualifier[0].0, fields_without_qualifier[0].1))
526 } else {
527 _schema_err!(SchemaError::AmbiguousReference {
528 field: Box::new(Column::new_unqualified(name.to_string()))
529 })
530 }
531 }
532 }
533 }
534
535 pub fn field_with_unqualified_name(&self, name: &str) -> Result<&Field> {
537 self.qualified_field_with_unqualified_name(name)
538 .map(|(_, field)| field)
539 }
540
541 pub fn field_with_qualified_name(
543 &self,
544 qualifier: &TableReference,
545 name: &str,
546 ) -> Result<&Field> {
547 let idx = self
548 .index_of_column_by_name(Some(qualifier), name)
549 .ok_or_else(|| field_not_found(Some(qualifier.clone()), name, self))?;
550
551 Ok(self.field(idx))
552 }
553
554 pub fn qualified_field_from_column(
556 &self,
557 column: &Column,
558 ) -> Result<(Option<&TableReference>, &Field)> {
559 self.qualified_field_with_name(column.relation.as_ref(), &column.name)
560 }
561
562 pub fn has_column_with_unqualified_name(&self, name: &str) -> bool {
564 self.fields().iter().any(|field| field.name() == name)
565 }
566
567 pub fn has_column_with_qualified_name(
569 &self,
570 qualifier: &TableReference,
571 name: &str,
572 ) -> bool {
573 self.iter()
574 .any(|(q, f)| q.map(|q| q.eq(qualifier)).unwrap_or(false) && f.name() == name)
575 }
576
577 pub fn has_column(&self, column: &Column) -> bool {
579 match &column.relation {
580 Some(r) => self.has_column_with_qualified_name(r, &column.name),
581 None => self.has_column_with_unqualified_name(&column.name),
582 }
583 }
584
585 pub fn matches_arrow_schema(&self, arrow_schema: &Schema) -> bool {
587 self.inner
588 .fields
589 .iter()
590 .zip(arrow_schema.fields().iter())
591 .all(|(dffield, arrowfield)| dffield.name() == arrowfield.name())
592 }
593
594 #[deprecated(since = "47.0.0", note = "This method is no longer used")]
596 pub fn check_arrow_schema_type_compatible(
597 &self,
598 arrow_schema: &Schema,
599 ) -> Result<()> {
600 let self_arrow_schema = self.as_arrow();
601 self_arrow_schema
602 .fields()
603 .iter()
604 .zip(arrow_schema.fields().iter())
605 .try_for_each(|(l_field, r_field)| {
606 if !can_cast_types(r_field.data_type(), l_field.data_type()) {
607 _plan_err!("Column {} (type: {}) is not compatible with column {} (type: {})",
608 r_field.name(),
609 r_field.data_type(),
610 l_field.name(),
611 l_field.data_type())
612 } else {
613 Ok(())
614 }
615 })
616 }
617
618 pub fn logically_equivalent_names_and_types(&self, other: &Self) -> bool {
624 if self.fields().len() != other.fields().len() {
625 return false;
626 }
627 let self_fields = self.iter();
628 let other_fields = other.iter();
629 self_fields.zip(other_fields).all(|((q1, f1), (q2, f2))| {
630 q1 == q2
631 && f1.name() == f2.name()
632 && Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
633 })
634 }
635
636 #[deprecated(since = "47.0.0", note = "Use has_equivalent_names_and_types` instead")]
637 pub fn equivalent_names_and_types(&self, other: &Self) -> bool {
638 self.has_equivalent_names_and_types(other).is_ok()
639 }
640
641 pub fn has_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
653 if self.fields().len() != other.fields().len() {
655 _plan_err!(
656 "Schema mismatch: the schema length are not same \
657 Expected schema length: {}, got: {}",
658 self.fields().len(),
659 other.fields().len()
660 )
661 } else {
662 self.fields()
665 .iter()
666 .zip(other.fields().iter())
667 .try_for_each(|(f1, f2)| {
668 if f1.name() != f2.name()
669 || (!DFSchema::datatype_is_semantically_equal(
670 f1.data_type(),
671 f2.data_type(),
672 ))
673 {
674 _plan_err!(
675 "Schema mismatch: Expected field '{}' with type {}, \
676 but got '{}' with type {}.",
677 f1.name(),
678 f1.data_type(),
679 f2.name(),
680 f2.data_type()
681 )
682 } else {
683 Ok(())
684 }
685 })
686 }
687 }
688
689 pub fn datatype_is_logically_equal(dt1: &DataType, dt2: &DataType) -> bool {
697 match (dt1, dt2) {
699 (DataType::Dictionary(_, v1), DataType::Dictionary(_, v2)) => {
700 v1.as_ref() == v2.as_ref()
701 }
702 (DataType::Dictionary(_, v1), othertype) => v1.as_ref() == othertype,
703 (othertype, DataType::Dictionary(_, v1)) => v1.as_ref() == othertype,
704 (DataType::List(f1), DataType::List(f2))
705 | (DataType::LargeList(f1), DataType::LargeList(f2))
706 | (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _)) => {
707 Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
710 }
711 (DataType::Map(f1, _), DataType::Map(f2, _)) => {
712 match (f1.data_type(), f2.data_type()) {
715 (DataType::Struct(f1_inner), DataType::Struct(f2_inner)) => {
716 f1_inner.len() == f2_inner.len()
717 && f1_inner.iter().zip(f2_inner.iter()).all(|(f1, f2)| {
718 Self::datatype_is_logically_equal(
719 f1.data_type(),
720 f2.data_type(),
721 )
722 })
723 }
724 _ => panic!("Map type should have an inner struct field"),
725 }
726 }
727 (DataType::Struct(fields1), DataType::Struct(fields2)) => {
728 let iter1 = fields1.iter();
729 let iter2 = fields2.iter();
730 fields1.len() == fields2.len() &&
731 iter1
733 .zip(iter2)
734 .all(|(f1, f2)| Self::field_is_logically_equal(f1, f2))
735 }
736 (DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
737 let iter1 = fields1.iter();
738 let iter2 = fields2.iter();
739 fields1.len() == fields2.len() &&
740 iter1
742 .zip(iter2)
743 .all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_logically_equal(f1, f2))
744 }
745 (DataType::Utf8, DataType::Utf8View) => true,
747 (DataType::Utf8View, DataType::Utf8) => true,
748 _ => Self::datatype_is_semantically_equal(dt1, dt2),
749 }
750 }
751
752 pub fn datatype_is_semantically_equal(dt1: &DataType, dt2: &DataType) -> bool {
758 match (dt1, dt2) {
760 (DataType::Dictionary(k1, v1), DataType::Dictionary(k2, v2)) => {
761 Self::datatype_is_semantically_equal(k1.as_ref(), k2.as_ref())
762 && Self::datatype_is_semantically_equal(v1.as_ref(), v2.as_ref())
763 }
764 (DataType::List(f1), DataType::List(f2))
765 | (DataType::LargeList(f1), DataType::LargeList(f2))
766 | (DataType::FixedSizeList(f1, _), DataType::FixedSizeList(f2, _)) => {
767 Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
770 }
771 (DataType::Map(f1, _), DataType::Map(f2, _)) => {
772 match (f1.data_type(), f2.data_type()) {
775 (DataType::Struct(f1_inner), DataType::Struct(f2_inner)) => {
776 f1_inner.len() == f2_inner.len()
777 && f1_inner.iter().zip(f2_inner.iter()).all(|(f1, f2)| {
778 Self::datatype_is_semantically_equal(
779 f1.data_type(),
780 f2.data_type(),
781 )
782 })
783 }
784 _ => panic!("Map type should have an inner struct field"),
785 }
786 }
787 (DataType::Struct(fields1), DataType::Struct(fields2)) => {
788 let iter1 = fields1.iter();
789 let iter2 = fields2.iter();
790 fields1.len() == fields2.len() &&
791 iter1
793 .zip(iter2)
794 .all(|(f1, f2)| Self::field_is_semantically_equal(f1, f2))
795 }
796 (DataType::Union(fields1, _), DataType::Union(fields2, _)) => {
797 let iter1 = fields1.iter();
798 let iter2 = fields2.iter();
799 fields1.len() == fields2.len() &&
800 iter1
802 .zip(iter2)
803 .all(|((t1, f1), (t2, f2))| t1 == t2 && Self::field_is_semantically_equal(f1, f2))
804 }
805 (
806 DataType::Decimal32(_l_precision, _l_scale),
807 DataType::Decimal32(_r_precision, _r_scale),
808 ) => true,
809 (
810 DataType::Decimal64(_l_precision, _l_scale),
811 DataType::Decimal64(_r_precision, _r_scale),
812 ) => true,
813 (
814 DataType::Decimal128(_l_precision, _l_scale),
815 DataType::Decimal128(_r_precision, _r_scale),
816 ) => true,
817 (
818 DataType::Decimal256(_l_precision, _l_scale),
819 DataType::Decimal256(_r_precision, _r_scale),
820 ) => true,
821 (
822 DataType::Timestamp(_l_time_unit, _l_timezone),
823 DataType::Timestamp(_r_time_unit, _r_timezone),
824 ) => true,
825 _ => dt1 == dt2,
826 }
827 }
828
829 fn field_is_logically_equal(f1: &Field, f2: &Field) -> bool {
830 f1.name() == f2.name()
831 && Self::datatype_is_logically_equal(f1.data_type(), f2.data_type())
832 }
833
834 fn field_is_semantically_equal(f1: &Field, f2: &Field) -> bool {
835 f1.name() == f2.name()
836 && Self::datatype_is_semantically_equal(f1.data_type(), f2.data_type())
837 }
838
839 pub fn strip_qualifiers(self) -> Self {
841 DFSchema {
842 field_qualifiers: vec![None; self.inner.fields.len()],
843 inner: self.inner,
844 functional_dependencies: self.functional_dependencies,
845 }
846 }
847
848 pub fn replace_qualifier(self, qualifier: impl Into<TableReference>) -> Self {
850 let qualifier = qualifier.into();
851 DFSchema {
852 field_qualifiers: vec![Some(qualifier); self.inner.fields.len()],
853 inner: self.inner,
854 functional_dependencies: self.functional_dependencies,
855 }
856 }
857
858 pub fn field_names(&self) -> Vec<String> {
860 self.iter()
861 .map(|(qualifier, field)| qualified_name(qualifier, field.name()))
862 .collect::<Vec<_>>()
863 }
864
865 pub fn metadata(&self) -> &HashMap<String, String> {
867 &self.inner.metadata
868 }
869
870 pub fn functional_dependencies(&self) -> &FunctionalDependencies {
872 &self.functional_dependencies
873 }
874
875 pub fn field_qualifiers(&self) -> &[Option<TableReference>] {
877 &self.field_qualifiers
878 }
879
880 pub fn iter(&self) -> impl Iterator<Item = (Option<&TableReference>, &FieldRef)> {
882 self.field_qualifiers
883 .iter()
884 .zip(self.inner.fields().iter())
885 .map(|(qualifier, field)| (qualifier.as_ref(), field))
886 }
887 pub fn tree_string(&self) -> impl Display + '_ {
917 let mut result = String::from("root\n");
918
919 for (qualifier, field) in self.iter() {
920 let field_name = match qualifier {
921 Some(q) => format!("{}.{}", q, field.name()),
922 None => field.name().to_string(),
923 };
924
925 format_field_with_indent(
926 &mut result,
927 &field_name,
928 field.data_type(),
929 field.is_nullable(),
930 " ",
931 );
932 }
933
934 if result.ends_with('\n') {
936 result.pop();
937 }
938
939 result
940 }
941}
942
943fn format_field_with_indent(
945 result: &mut String,
946 field_name: &str,
947 data_type: &DataType,
948 nullable: bool,
949 indent: &str,
950) {
951 let nullable_str = nullable.to_string().to_lowercase();
952 let child_indent = format!("{indent}| ");
953
954 match data_type {
955 DataType::List(field) => {
956 result.push_str(&format!(
957 "{indent}|-- {field_name}: list (nullable = {nullable_str})\n"
958 ));
959 format_field_with_indent(
960 result,
961 field.name(),
962 field.data_type(),
963 field.is_nullable(),
964 &child_indent,
965 );
966 }
967 DataType::LargeList(field) => {
968 result.push_str(&format!(
969 "{indent}|-- {field_name}: large list (nullable = {nullable_str})\n"
970 ));
971 format_field_with_indent(
972 result,
973 field.name(),
974 field.data_type(),
975 field.is_nullable(),
976 &child_indent,
977 );
978 }
979 DataType::FixedSizeList(field, _size) => {
980 result.push_str(&format!(
981 "{indent}|-- {field_name}: fixed size list (nullable = {nullable_str})\n"
982 ));
983 format_field_with_indent(
984 result,
985 field.name(),
986 field.data_type(),
987 field.is_nullable(),
988 &child_indent,
989 );
990 }
991 DataType::Map(field, _) => {
992 result.push_str(&format!(
993 "{indent}|-- {field_name}: map (nullable = {nullable_str})\n"
994 ));
995 if let DataType::Struct(inner_fields) = field.data_type() {
996 if inner_fields.len() == 2 {
997 format_field_with_indent(
998 result,
999 "key",
1000 inner_fields[0].data_type(),
1001 inner_fields[0].is_nullable(),
1002 &child_indent,
1003 );
1004 let value_contains_null =
1005 field.is_nullable().to_string().to_lowercase();
1006 match inner_fields[1].data_type() {
1008 DataType::Struct(_)
1009 | DataType::List(_)
1010 | DataType::LargeList(_)
1011 | DataType::FixedSizeList(_, _)
1012 | DataType::Map(_, _) => {
1013 format_field_with_indent(
1014 result,
1015 "value",
1016 inner_fields[1].data_type(),
1017 inner_fields[1].is_nullable(),
1018 &child_indent,
1019 );
1020 }
1021 _ => {
1022 result.push_str(&format!("{child_indent}|-- value: {} (nullable = {value_contains_null})\n",
1023 format_simple_data_type(inner_fields[1].data_type())));
1024 }
1025 }
1026 }
1027 }
1028 }
1029 DataType::Struct(fields) => {
1030 result.push_str(&format!(
1031 "{indent}|-- {field_name}: struct (nullable = {nullable_str})\n"
1032 ));
1033 for struct_field in fields {
1034 format_field_with_indent(
1035 result,
1036 struct_field.name(),
1037 struct_field.data_type(),
1038 struct_field.is_nullable(),
1039 &child_indent,
1040 );
1041 }
1042 }
1043 _ => {
1044 let type_str = format_simple_data_type(data_type);
1045 result.push_str(&format!(
1046 "{indent}|-- {field_name}: {type_str} (nullable = {nullable_str})\n"
1047 ));
1048 }
1049 }
1050}
1051
1052fn format_simple_data_type(data_type: &DataType) -> String {
1054 match data_type {
1055 DataType::Boolean => "boolean".to_string(),
1056 DataType::Int8 => "int8".to_string(),
1057 DataType::Int16 => "int16".to_string(),
1058 DataType::Int32 => "int32".to_string(),
1059 DataType::Int64 => "int64".to_string(),
1060 DataType::UInt8 => "uint8".to_string(),
1061 DataType::UInt16 => "uint16".to_string(),
1062 DataType::UInt32 => "uint32".to_string(),
1063 DataType::UInt64 => "uint64".to_string(),
1064 DataType::Float16 => "float16".to_string(),
1065 DataType::Float32 => "float32".to_string(),
1066 DataType::Float64 => "float64".to_string(),
1067 DataType::Utf8 => "utf8".to_string(),
1068 DataType::LargeUtf8 => "large_utf8".to_string(),
1069 DataType::Binary => "binary".to_string(),
1070 DataType::LargeBinary => "large_binary".to_string(),
1071 DataType::FixedSizeBinary(_) => "fixed_size_binary".to_string(),
1072 DataType::Date32 => "date32".to_string(),
1073 DataType::Date64 => "date64".to_string(),
1074 DataType::Time32(_) => "time32".to_string(),
1075 DataType::Time64(_) => "time64".to_string(),
1076 DataType::Timestamp(_, tz) => match tz {
1077 Some(tz_str) => format!("timestamp ({tz_str})"),
1078 None => "timestamp".to_string(),
1079 },
1080 DataType::Interval(_) => "interval".to_string(),
1081 DataType::Dictionary(_, value_type) => {
1082 format_simple_data_type(value_type.as_ref())
1083 }
1084 DataType::Decimal32(precision, scale) => {
1085 format!("decimal32({precision}, {scale})")
1086 }
1087 DataType::Decimal64(precision, scale) => {
1088 format!("decimal64({precision}, {scale})")
1089 }
1090 DataType::Decimal128(precision, scale) => {
1091 format!("decimal128({precision}, {scale})")
1092 }
1093 DataType::Decimal256(precision, scale) => {
1094 format!("decimal256({precision}, {scale})")
1095 }
1096 DataType::Null => "null".to_string(),
1097 _ => format!("{data_type}").to_lowercase(),
1098 }
1099}
1100
1101impl AsRef<Schema> for DFSchema {
1103 fn as_ref(&self) -> &Schema {
1104 self.as_arrow()
1105 }
1106}
1107
1108impl AsRef<SchemaRef> for DFSchema {
1111 fn as_ref(&self) -> &SchemaRef {
1112 self.inner()
1113 }
1114}
1115
1116impl TryFrom<Schema> for DFSchema {
1118 type Error = DataFusionError;
1119 fn try_from(schema: Schema) -> Result<Self, Self::Error> {
1120 Self::try_from(Arc::new(schema))
1121 }
1122}
1123
1124impl TryFrom<SchemaRef> for DFSchema {
1125 type Error = DataFusionError;
1126 fn try_from(schema: SchemaRef) -> Result<Self, Self::Error> {
1127 let field_count = schema.fields.len();
1128 let dfschema = Self {
1129 inner: schema,
1130 field_qualifiers: vec![None; field_count],
1131 functional_dependencies: FunctionalDependencies::empty(),
1132 };
1133 Ok(dfschema)
1139 }
1140}
1141
1142impl Hash for DFSchema {
1144 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
1145 self.inner.fields.hash(state);
1146 self.inner.metadata.len().hash(state); }
1148}
1149
1150pub trait ToDFSchema
1152where
1153 Self: Sized,
1154{
1155 fn to_dfschema(self) -> Result<DFSchema>;
1157
1158 fn to_dfschema_ref(self) -> Result<DFSchemaRef> {
1160 Ok(Arc::new(self.to_dfschema()?))
1161 }
1162}
1163
1164impl ToDFSchema for Schema {
1165 fn to_dfschema(self) -> Result<DFSchema> {
1166 DFSchema::try_from(self)
1167 }
1168}
1169
1170impl ToDFSchema for SchemaRef {
1171 fn to_dfschema(self) -> Result<DFSchema> {
1172 DFSchema::try_from(self)
1173 }
1174}
1175
1176impl ToDFSchema for Vec<Field> {
1177 fn to_dfschema(self) -> Result<DFSchema> {
1178 let field_count = self.len();
1179 let schema = Schema {
1180 fields: self.into(),
1181 metadata: HashMap::new(),
1182 };
1183 let dfschema = DFSchema {
1184 inner: schema.into(),
1185 field_qualifiers: vec![None; field_count],
1186 functional_dependencies: FunctionalDependencies::empty(),
1187 };
1188 Ok(dfschema)
1189 }
1190}
1191
1192impl Display for DFSchema {
1193 fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
1194 write!(
1195 f,
1196 "fields:[{}], metadata:{:?}",
1197 self.iter()
1198 .map(|(q, f)| qualified_name(q, f.name()))
1199 .collect::<Vec<String>>()
1200 .join(", "),
1201 self.inner.metadata
1202 )
1203 }
1204}
1205
1206pub trait ExprSchema: std::fmt::Debug {
1212 fn nullable(&self, col: &Column) -> Result<bool> {
1214 Ok(self.field_from_column(col)?.is_nullable())
1215 }
1216
1217 fn data_type(&self, col: &Column) -> Result<&DataType> {
1219 Ok(self.field_from_column(col)?.data_type())
1220 }
1221
1222 fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
1224 Ok(self.field_from_column(col)?.metadata())
1225 }
1226
1227 fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
1229 let field = self.field_from_column(col)?;
1230 Ok((field.data_type(), field.is_nullable()))
1231 }
1232
1233 fn field_from_column(&self, col: &Column) -> Result<&Field>;
1235}
1236
1237impl<P: AsRef<DFSchema> + std::fmt::Debug> ExprSchema for P {
1239 fn nullable(&self, col: &Column) -> Result<bool> {
1240 self.as_ref().nullable(col)
1241 }
1242
1243 fn data_type(&self, col: &Column) -> Result<&DataType> {
1244 self.as_ref().data_type(col)
1245 }
1246
1247 fn metadata(&self, col: &Column) -> Result<&HashMap<String, String>> {
1248 ExprSchema::metadata(self.as_ref(), col)
1249 }
1250
1251 fn data_type_and_nullable(&self, col: &Column) -> Result<(&DataType, bool)> {
1252 self.as_ref().data_type_and_nullable(col)
1253 }
1254
1255 fn field_from_column(&self, col: &Column) -> Result<&Field> {
1256 self.as_ref().field_from_column(col)
1257 }
1258}
1259
1260impl ExprSchema for DFSchema {
1261 fn field_from_column(&self, col: &Column) -> Result<&Field> {
1262 match &col.relation {
1263 Some(r) => self.field_with_qualified_name(r, &col.name),
1264 None => self.field_with_unqualified_name(&col.name),
1265 }
1266 }
1267}
1268
1269pub trait SchemaExt {
1271 fn equivalent_names_and_types(&self, other: &Self) -> bool;
1276
1277 fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()>;
1285}
1286
1287impl SchemaExt for Schema {
1288 fn equivalent_names_and_types(&self, other: &Self) -> bool {
1289 if self.fields().len() != other.fields().len() {
1290 return false;
1291 }
1292
1293 self.fields()
1294 .iter()
1295 .zip(other.fields().iter())
1296 .all(|(f1, f2)| {
1297 f1.name() == f2.name()
1298 && DFSchema::datatype_is_semantically_equal(
1299 f1.data_type(),
1300 f2.data_type(),
1301 )
1302 })
1303 }
1304
1305 fn logically_equivalent_names_and_types(&self, other: &Self) -> Result<()> {
1307 if self.fields().len() != other.fields().len() {
1309 _plan_err!(
1310 "Inserting query must have the same schema length as the table. \
1311 Expected table schema length: {}, got: {}",
1312 self.fields().len(),
1313 other.fields().len()
1314 )
1315 } else {
1316 self.fields()
1319 .iter()
1320 .zip(other.fields().iter())
1321 .try_for_each(|(f1, f2)| {
1322 if f1.name() != f2.name() || (!DFSchema::datatype_is_logically_equal(f1.data_type(), f2.data_type()) && !can_cast_types(f2.data_type(), f1.data_type())) {
1323 _plan_err!(
1324 "Inserting query schema mismatch: Expected table field '{}' with type {}, \
1325 but got '{}' with type {}.",
1326 f1.name(),
1327 f1.data_type(),
1328 f2.name(),
1329 f2.data_type())
1330 } else {
1331 Ok(())
1332 }
1333 })
1334 }
1335 }
1336}
1337
1338pub fn qualified_name(qualifier: Option<&TableReference>, name: &str) -> String {
1339 match qualifier {
1340 Some(q) => format!("{q}.{name}"),
1341 None => name.to_string(),
1342 }
1343}
1344
1345#[cfg(test)]
1346mod tests {
1347 use crate::assert_contains;
1348
1349 use super::*;
1350
1351 #[test]
1352 fn qualifier_in_name() -> Result<()> {
1353 let col = Column::from_name("t1.c0");
1354 let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1355 let err = schema.index_of_column(&col).unwrap_err();
1357 let expected = "Schema error: No field named \"t1.c0\". \
1358 Column names are case sensitive. \
1359 You can use double quotes to refer to the \"\"t1.c0\"\" column \
1360 or set the datafusion.sql_parser.enable_ident_normalization configuration. \
1361 Did you mean 't1.c0'?.";
1362 assert_eq!(err.strip_backtrace(), expected);
1363 Ok(())
1364 }
1365
1366 #[test]
1367 fn quoted_qualifiers_in_name() -> Result<()> {
1368 let col = Column::from_name("t1.c0");
1369 let schema = DFSchema::try_from_qualified_schema(
1370 "t1",
1371 &Schema::new(vec![
1372 Field::new("CapitalColumn", DataType::Boolean, true),
1373 Field::new("field.with.period", DataType::Boolean, true),
1374 ]),
1375 )?;
1376
1377 let err = schema.index_of_column(&col).unwrap_err();
1379 let expected = "Schema error: No field named \"t1.c0\". \
1380 Valid fields are t1.\"CapitalColumn\", t1.\"field.with.period\".";
1381 assert_eq!(err.strip_backtrace(), expected);
1382 Ok(())
1383 }
1384
1385 #[test]
1386 fn from_unqualified_schema() -> Result<()> {
1387 let schema = DFSchema::try_from(test_schema_1())?;
1388 assert_eq!("fields:[c0, c1], metadata:{}", schema.to_string());
1389 Ok(())
1390 }
1391
1392 #[test]
1393 fn from_qualified_schema() -> Result<()> {
1394 let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1395 assert_eq!("fields:[t1.c0, t1.c1], metadata:{}", schema.to_string());
1396 Ok(())
1397 }
1398
1399 #[test]
1400 fn test_from_field_specific_qualified_schema() -> Result<()> {
1401 let schema = DFSchema::from_field_specific_qualified_schema(
1402 vec![Some("t1".into()), None],
1403 &Arc::new(Schema::new(vec![
1404 Field::new("c0", DataType::Boolean, true),
1405 Field::new("c1", DataType::Boolean, true),
1406 ])),
1407 )?;
1408 assert_eq!("fields:[t1.c0, c1], metadata:{}", schema.to_string());
1409 Ok(())
1410 }
1411
1412 #[test]
1413 fn test_from_qualified_fields() -> Result<()> {
1414 let schema = DFSchema::new_with_metadata(
1415 vec![
1416 (
1417 Some("t0".into()),
1418 Arc::new(Field::new("c0", DataType::Boolean, true)),
1419 ),
1420 (None, Arc::new(Field::new("c1", DataType::Boolean, true))),
1421 ],
1422 HashMap::new(),
1423 )?;
1424 assert_eq!("fields:[t0.c0, c1], metadata:{}", schema.to_string());
1425 Ok(())
1426 }
1427
1428 #[test]
1429 fn from_qualified_schema_into_arrow_schema() -> Result<()> {
1430 let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1431 let arrow_schema = schema.as_arrow();
1432 insta::assert_snapshot!(arrow_schema.to_string(), @r#"Field { "c0": nullable Boolean }, Field { "c1": nullable Boolean }"#);
1433 Ok(())
1434 }
1435
1436 #[test]
1437 fn join_qualified() -> Result<()> {
1438 let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1439 let right = DFSchema::try_from_qualified_schema("t2", &test_schema_1())?;
1440 let join = left.join(&right)?;
1441 assert_eq!(
1442 "fields:[t1.c0, t1.c1, t2.c0, t2.c1], metadata:{}",
1443 join.to_string()
1444 );
1445 assert!(join
1447 .field_with_qualified_name(&TableReference::bare("t1"), "c0")
1448 .is_ok());
1449 assert!(join
1450 .field_with_qualified_name(&TableReference::bare("t2"), "c0")
1451 .is_ok());
1452 assert!(join.field_with_unqualified_name("c0").is_err());
1454 assert!(join.field_with_unqualified_name("t1.c0").is_err());
1455 assert!(join.field_with_unqualified_name("t2.c0").is_err());
1456 Ok(())
1457 }
1458
1459 #[test]
1460 fn join_qualified_duplicate() -> Result<()> {
1461 let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1462 let right = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1463 let join = left.join(&right);
1464 assert_eq!(
1465 join.unwrap_err().strip_backtrace(),
1466 "Schema error: Schema contains duplicate qualified field name t1.c0",
1467 );
1468 Ok(())
1469 }
1470
1471 #[test]
1472 fn join_unqualified_duplicate() -> Result<()> {
1473 let left = DFSchema::try_from(test_schema_1())?;
1474 let right = DFSchema::try_from(test_schema_1())?;
1475 let join = left.join(&right);
1476 assert_eq!(
1477 join.unwrap_err().strip_backtrace(),
1478 "Schema error: Schema contains duplicate unqualified field name c0"
1479 );
1480 Ok(())
1481 }
1482
1483 #[test]
1484 fn join_mixed() -> Result<()> {
1485 let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1486 let right = DFSchema::try_from(test_schema_2())?;
1487 let join = left.join(&right)?;
1488 assert_eq!(
1489 "fields:[t1.c0, t1.c1, c100, c101], metadata:{}",
1490 join.to_string()
1491 );
1492 assert!(join
1494 .field_with_qualified_name(&TableReference::bare("t1"), "c0")
1495 .is_ok());
1496 assert!(join.field_with_unqualified_name("c0").is_ok());
1497 assert!(join.field_with_unqualified_name("c100").is_ok());
1498 assert!(join.field_with_name(None, "c100").is_ok());
1499 assert!(join.field_with_unqualified_name("t1.c0").is_err());
1501 assert!(join.field_with_unqualified_name("t1.c100").is_err());
1502 assert!(join
1503 .field_with_qualified_name(&TableReference::bare(""), "c100")
1504 .is_err());
1505 Ok(())
1506 }
1507
1508 #[test]
1509 fn join_mixed_duplicate() -> Result<()> {
1510 let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1511 let right = DFSchema::try_from(test_schema_1())?;
1512 let join = left.join(&right);
1513 assert_contains!(join.unwrap_err().to_string(),
1514 "Schema error: Schema contains qualified \
1515 field name t1.c0 and unqualified field name c0 which would be ambiguous");
1516 Ok(())
1517 }
1518
1519 #[test]
1520 fn helpful_error_messages() -> Result<()> {
1521 let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1522 let expected_help = "Valid fields are t1.c0, t1.c1.";
1523 assert_contains!(
1524 schema
1525 .field_with_qualified_name(&TableReference::bare("x"), "y")
1526 .unwrap_err()
1527 .to_string(),
1528 expected_help
1529 );
1530 assert_contains!(
1531 schema
1532 .field_with_unqualified_name("y")
1533 .unwrap_err()
1534 .to_string(),
1535 expected_help
1536 );
1537 assert!(schema.index_of_column_by_name(None, "y").is_none());
1538 assert!(schema.index_of_column_by_name(None, "t1.c0").is_none());
1539
1540 Ok(())
1541 }
1542
1543 #[test]
1544 fn select_without_valid_fields() {
1545 let schema = DFSchema::empty();
1546
1547 let col = Column::from_qualified_name("t1.c0");
1548 let err = schema.index_of_column(&col).unwrap_err();
1549 let expected = "Schema error: No field named t1.c0.";
1550 assert_eq!(err.strip_backtrace(), expected);
1551
1552 let col = Column::from_name("c0");
1554 let err = schema.index_of_column(&col).err().unwrap();
1555 let expected = "Schema error: No field named c0.";
1556 assert_eq!(err.strip_backtrace(), expected);
1557 }
1558
1559 #[test]
1560 fn into() {
1561 let arrow_schema = Schema::new_with_metadata(
1563 vec![Field::new("c0", DataType::Int64, true)],
1564 test_metadata(),
1565 );
1566 let arrow_schema_ref = Arc::new(arrow_schema.clone());
1567
1568 let df_schema = DFSchema {
1569 inner: Arc::clone(&arrow_schema_ref),
1570 field_qualifiers: vec![None; arrow_schema_ref.fields.len()],
1571 functional_dependencies: FunctionalDependencies::empty(),
1572 };
1573 let df_schema_ref = Arc::new(df_schema.clone());
1574
1575 {
1576 let arrow_schema = arrow_schema.clone();
1577 let arrow_schema_ref = Arc::clone(&arrow_schema_ref);
1578
1579 assert_eq!(df_schema, arrow_schema.to_dfschema().unwrap());
1580 assert_eq!(df_schema, arrow_schema_ref.to_dfschema().unwrap());
1581 }
1582
1583 {
1584 let arrow_schema = arrow_schema.clone();
1585 let arrow_schema_ref = Arc::clone(&arrow_schema_ref);
1586
1587 assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
1588 assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
1589 }
1590
1591 assert_eq!(df_schema_ref, arrow_schema.to_dfschema_ref().unwrap());
1593 assert_eq!(df_schema_ref, arrow_schema_ref.to_dfschema_ref().unwrap());
1594 }
1595
1596 fn test_schema_1() -> Schema {
1597 Schema::new(vec![
1598 Field::new("c0", DataType::Boolean, true),
1599 Field::new("c1", DataType::Boolean, true),
1600 ])
1601 }
1602 #[test]
1603 fn test_dfschema_to_schema_conversion() {
1604 let mut a_metadata = HashMap::new();
1605 a_metadata.insert("key".to_string(), "value".to_string());
1606 let a_field = Field::new("a", DataType::Int64, false).with_metadata(a_metadata);
1607
1608 let mut b_metadata = HashMap::new();
1609 b_metadata.insert("key".to_string(), "value".to_string());
1610 let b_field = Field::new("b", DataType::Int64, false).with_metadata(b_metadata);
1611
1612 let schema = Arc::new(Schema::new(vec![a_field, b_field]));
1613
1614 let df_schema = DFSchema {
1615 inner: Arc::clone(&schema),
1616 field_qualifiers: vec![None; schema.fields.len()],
1617 functional_dependencies: FunctionalDependencies::empty(),
1618 };
1619
1620 assert_eq!(df_schema.inner.metadata(), schema.metadata())
1621 }
1622
1623 #[test]
1624 fn test_contain_column() -> Result<()> {
1625 {
1627 let col = Column::from_qualified_name("t1.c0");
1628 let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1629 assert!(schema.is_column_from_schema(&col));
1630 }
1631
1632 {
1634 let col = Column::from_qualified_name("t1.c2");
1635 let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1636 assert!(!schema.is_column_from_schema(&col));
1637 }
1638
1639 {
1641 let col = Column::from_name("c0");
1642 let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1643 assert!(schema.is_column_from_schema(&col));
1644 }
1645
1646 {
1648 let col = Column::from_name("c2");
1649 let schema = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1650 assert!(!schema.is_column_from_schema(&col));
1651 }
1652
1653 Ok(())
1654 }
1655
1656 #[test]
1657 fn test_datatype_is_logically_equal() {
1658 assert!(DFSchema::datatype_is_logically_equal(
1659 &DataType::Int8,
1660 &DataType::Int8
1661 ));
1662
1663 assert!(!DFSchema::datatype_is_logically_equal(
1664 &DataType::Int8,
1665 &DataType::Int16
1666 ));
1667
1668 assert!(DFSchema::datatype_is_logically_equal(
1672 &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1673 &DataType::List(Field::new("element", DataType::Int8, false).into())
1674 ));
1675
1676 assert!(!DFSchema::datatype_is_logically_equal(
1678 &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1679 &DataType::List(Field::new_list_field(DataType::Int16, true).into())
1680 ));
1681
1682 let map_field = DataType::Map(
1684 Field::new(
1685 "entries",
1686 DataType::Struct(Fields::from(vec![
1687 Field::new("key", DataType::Int8, false),
1688 Field::new("value", DataType::Int8, true),
1689 ])),
1690 true,
1691 )
1692 .into(),
1693 true,
1694 );
1695
1696 assert!(DFSchema::datatype_is_logically_equal(
1698 &map_field,
1699 &DataType::Map(
1700 Field::new(
1701 "pairs",
1702 DataType::Struct(Fields::from(vec![
1703 Field::new("one", DataType::Int8, false),
1704 Field::new("two", DataType::Int8, false)
1705 ])),
1706 true
1707 )
1708 .into(),
1709 true
1710 )
1711 ));
1712 assert!(!DFSchema::datatype_is_logically_equal(
1714 &map_field,
1715 &DataType::Map(
1716 Field::new(
1717 "entries",
1718 DataType::Struct(Fields::from(vec![
1719 Field::new("key", DataType::Int8, false),
1720 Field::new("value", DataType::Int16, true)
1721 ])),
1722 true
1723 )
1724 .into(),
1725 true
1726 )
1727 ));
1728
1729 assert!(!DFSchema::datatype_is_logically_equal(
1731 &map_field,
1732 &DataType::Map(
1733 Field::new(
1734 "entries",
1735 DataType::Struct(Fields::from(vec![
1736 Field::new("key", DataType::Int16, false),
1737 Field::new("value", DataType::Int8, true)
1738 ])),
1739 true
1740 )
1741 .into(),
1742 true
1743 )
1744 ));
1745
1746 let struct_field = DataType::Struct(Fields::from(vec![
1749 Field::new("a", DataType::Int8, true),
1750 Field::new("b", DataType::Int8, true),
1751 ]));
1752
1753 assert!(DFSchema::datatype_is_logically_equal(
1755 &struct_field,
1756 &DataType::Struct(Fields::from(vec![
1757 Field::new("a", DataType::Int8, false),
1758 Field::new("b", DataType::Int8, true),
1759 ]))
1760 ));
1761
1762 assert!(!DFSchema::datatype_is_logically_equal(
1764 &struct_field,
1765 &DataType::Struct(Fields::from(vec![
1766 Field::new("x", DataType::Int8, true),
1767 Field::new("y", DataType::Int8, true),
1768 ]))
1769 ));
1770
1771 assert!(!DFSchema::datatype_is_logically_equal(
1773 &struct_field,
1774 &DataType::Struct(Fields::from(vec![
1775 Field::new("a", DataType::Int16, true),
1776 Field::new("b", DataType::Int8, true),
1777 ]))
1778 ));
1779
1780 assert!(!DFSchema::datatype_is_logically_equal(
1782 &struct_field,
1783 &DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int8, true),]))
1784 ));
1785 }
1786
1787 #[test]
1788 fn test_datatype_is_logically_equivalent_to_dictionary() {
1789 assert!(DFSchema::datatype_is_logically_equal(
1791 &DataType::Utf8,
1792 &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
1793 ));
1794 }
1795
1796 #[test]
1797 fn test_datatype_is_semantically_equal() {
1798 assert!(DFSchema::datatype_is_semantically_equal(
1799 &DataType::Int8,
1800 &DataType::Int8
1801 ));
1802
1803 assert!(!DFSchema::datatype_is_semantically_equal(
1804 &DataType::Int8,
1805 &DataType::Int16
1806 ));
1807
1808 assert!(DFSchema::datatype_is_semantically_equal(
1810 &DataType::Decimal32(1, 2),
1811 &DataType::Decimal32(2, 1),
1812 ));
1813
1814 assert!(DFSchema::datatype_is_semantically_equal(
1815 &DataType::Decimal64(1, 2),
1816 &DataType::Decimal64(2, 1),
1817 ));
1818
1819 assert!(DFSchema::datatype_is_semantically_equal(
1820 &DataType::Decimal128(1, 2),
1821 &DataType::Decimal128(2, 1),
1822 ));
1823
1824 assert!(DFSchema::datatype_is_semantically_equal(
1825 &DataType::Decimal256(1, 2),
1826 &DataType::Decimal256(2, 1),
1827 ));
1828
1829 assert!(DFSchema::datatype_is_semantically_equal(
1831 &DataType::Timestamp(
1832 arrow::datatypes::TimeUnit::Microsecond,
1833 Some("UTC".into())
1834 ),
1835 &DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
1836 ));
1837
1838 assert!(DFSchema::datatype_is_semantically_equal(
1842 &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1843 &DataType::List(Field::new("element", DataType::Int8, false).into())
1844 ));
1845
1846 assert!(!DFSchema::datatype_is_semantically_equal(
1848 &DataType::List(Field::new_list_field(DataType::Int8, true).into()),
1849 &DataType::List(Field::new_list_field(DataType::Int16, true).into())
1850 ));
1851
1852 let map_field = DataType::Map(
1854 Field::new(
1855 "entries",
1856 DataType::Struct(Fields::from(vec![
1857 Field::new("key", DataType::Int8, false),
1858 Field::new("value", DataType::Int8, true),
1859 ])),
1860 true,
1861 )
1862 .into(),
1863 true,
1864 );
1865
1866 assert!(DFSchema::datatype_is_semantically_equal(
1868 &map_field,
1869 &DataType::Map(
1870 Field::new(
1871 "pairs",
1872 DataType::Struct(Fields::from(vec![
1873 Field::new("one", DataType::Int8, false),
1874 Field::new("two", DataType::Int8, false)
1875 ])),
1876 true
1877 )
1878 .into(),
1879 true
1880 )
1881 ));
1882 assert!(!DFSchema::datatype_is_semantically_equal(
1884 &map_field,
1885 &DataType::Map(
1886 Field::new(
1887 "entries",
1888 DataType::Struct(Fields::from(vec![
1889 Field::new("key", DataType::Int8, false),
1890 Field::new("value", DataType::Int16, true)
1891 ])),
1892 true
1893 )
1894 .into(),
1895 true
1896 )
1897 ));
1898
1899 assert!(!DFSchema::datatype_is_semantically_equal(
1901 &map_field,
1902 &DataType::Map(
1903 Field::new(
1904 "entries",
1905 DataType::Struct(Fields::from(vec![
1906 Field::new("key", DataType::Int16, false),
1907 Field::new("value", DataType::Int8, true)
1908 ])),
1909 true
1910 )
1911 .into(),
1912 true
1913 )
1914 ));
1915
1916 let struct_field = DataType::Struct(Fields::from(vec![
1919 Field::new("a", DataType::Int8, true),
1920 Field::new("b", DataType::Int8, true),
1921 ]));
1922
1923 assert!(DFSchema::datatype_is_logically_equal(
1925 &struct_field,
1926 &DataType::Struct(Fields::from(vec![
1927 Field::new("a", DataType::Int8, false),
1928 Field::new("b", DataType::Int8, true),
1929 ]))
1930 ));
1931
1932 assert!(!DFSchema::datatype_is_logically_equal(
1934 &struct_field,
1935 &DataType::Struct(Fields::from(vec![
1936 Field::new("x", DataType::Int8, true),
1937 Field::new("y", DataType::Int8, true),
1938 ]))
1939 ));
1940
1941 assert!(!DFSchema::datatype_is_logically_equal(
1943 &struct_field,
1944 &DataType::Struct(Fields::from(vec![
1945 Field::new("a", DataType::Int16, true),
1946 Field::new("b", DataType::Int8, true),
1947 ]))
1948 ));
1949
1950 assert!(!DFSchema::datatype_is_logically_equal(
1952 &struct_field,
1953 &DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int8, true),]))
1954 ));
1955 }
1956
1957 #[test]
1958 fn test_datatype_is_not_semantically_equivalent_to_dictionary() {
1959 assert!(!DFSchema::datatype_is_semantically_equal(
1961 &DataType::Utf8,
1962 &DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8))
1963 ));
1964 }
1965
1966 fn test_schema_2() -> Schema {
1967 Schema::new(vec![
1968 Field::new("c100", DataType::Boolean, true),
1969 Field::new("c101", DataType::Boolean, true),
1970 ])
1971 }
1972
1973 fn test_metadata() -> HashMap<String, String> {
1974 test_metadata_n(2)
1975 }
1976
1977 fn test_metadata_n(n: usize) -> HashMap<String, String> {
1978 (0..n).map(|i| (format!("k{i}"), format!("v{i}"))).collect()
1979 }
1980
1981 #[test]
1982 fn test_print_schema_unqualified() {
1983 let schema = DFSchema::from_unqualified_fields(
1984 vec![
1985 Field::new("id", DataType::Int32, false),
1986 Field::new("name", DataType::Utf8, true),
1987 Field::new("age", DataType::Int64, true),
1988 Field::new("active", DataType::Boolean, false),
1989 ]
1990 .into(),
1991 HashMap::new(),
1992 )
1993 .unwrap();
1994
1995 let output = schema.tree_string();
1996
1997 insta::assert_snapshot!(output, @r"
1998 root
1999 |-- id: int32 (nullable = false)
2000 |-- name: utf8 (nullable = true)
2001 |-- age: int64 (nullable = true)
2002 |-- active: boolean (nullable = false)
2003 ");
2004 }
2005
2006 #[test]
2007 fn test_print_schema_qualified() {
2008 let schema = DFSchema::try_from_qualified_schema(
2009 "table1",
2010 &Schema::new(vec![
2011 Field::new("id", DataType::Int32, false),
2012 Field::new("name", DataType::Utf8, true),
2013 ]),
2014 )
2015 .unwrap();
2016
2017 let output = schema.tree_string();
2018
2019 insta::assert_snapshot!(output, @r"
2020 root
2021 |-- table1.id: int32 (nullable = false)
2022 |-- table1.name: utf8 (nullable = true)
2023 ");
2024 }
2025
2026 #[test]
2027 fn test_print_schema_complex_types() {
2028 let struct_field = Field::new(
2029 "address",
2030 DataType::Struct(Fields::from(vec![
2031 Field::new("street", DataType::Utf8, true),
2032 Field::new("city", DataType::Utf8, true),
2033 ])),
2034 true,
2035 );
2036
2037 let list_field = Field::new(
2038 "tags",
2039 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
2040 true,
2041 );
2042
2043 let schema = DFSchema::from_unqualified_fields(
2044 vec![
2045 Field::new("id", DataType::Int32, false),
2046 struct_field,
2047 list_field,
2048 Field::new("score", DataType::Decimal128(10, 2), true),
2049 ]
2050 .into(),
2051 HashMap::new(),
2052 )
2053 .unwrap();
2054
2055 let output = schema.tree_string();
2056 insta::assert_snapshot!(output, @r"
2057 root
2058 |-- id: int32 (nullable = false)
2059 |-- address: struct (nullable = true)
2060 | |-- street: utf8 (nullable = true)
2061 | |-- city: utf8 (nullable = true)
2062 |-- tags: list (nullable = true)
2063 | |-- item: utf8 (nullable = true)
2064 |-- score: decimal128(10, 2) (nullable = true)
2065 ");
2066 }
2067
2068 #[test]
2069 fn test_print_schema_empty() {
2070 let schema = DFSchema::empty();
2071 let output = schema.tree_string();
2072 insta::assert_snapshot!(output, @r###"root"###);
2073 }
2074
2075 #[test]
2076 fn test_print_schema_deeply_nested_types() {
2077 let inner_struct = Field::new(
2079 "inner",
2080 DataType::Struct(Fields::from(vec![
2081 Field::new("level1", DataType::Utf8, true),
2082 Field::new("level2", DataType::Int32, false),
2083 ])),
2084 true,
2085 );
2086
2087 let nested_list = Field::new(
2088 "nested_list",
2089 DataType::List(Arc::new(Field::new(
2090 "item",
2091 DataType::Struct(Fields::from(vec![
2092 Field::new("id", DataType::Int64, false),
2093 Field::new("value", DataType::Float64, true),
2094 ])),
2095 true,
2096 ))),
2097 true,
2098 );
2099
2100 let map_field = Field::new(
2101 "map_data",
2102 DataType::Map(
2103 Arc::new(Field::new(
2104 "entries",
2105 DataType::Struct(Fields::from(vec![
2106 Field::new("key", DataType::Utf8, false),
2107 Field::new(
2108 "value",
2109 DataType::List(Arc::new(Field::new(
2110 "item",
2111 DataType::Int32,
2112 true,
2113 ))),
2114 true,
2115 ),
2116 ])),
2117 false,
2118 )),
2119 false,
2120 ),
2121 true,
2122 );
2123
2124 let schema = DFSchema::from_unqualified_fields(
2125 vec![
2126 Field::new("simple_field", DataType::Utf8, true),
2127 inner_struct,
2128 nested_list,
2129 map_field,
2130 Field::new(
2131 "timestamp_field",
2132 DataType::Timestamp(
2133 arrow::datatypes::TimeUnit::Microsecond,
2134 Some("UTC".into()),
2135 ),
2136 false,
2137 ),
2138 ]
2139 .into(),
2140 HashMap::new(),
2141 )
2142 .unwrap();
2143
2144 let output = schema.tree_string();
2145
2146 insta::assert_snapshot!(output, @r"
2147 root
2148 |-- simple_field: utf8 (nullable = true)
2149 |-- inner: struct (nullable = true)
2150 | |-- level1: utf8 (nullable = true)
2151 | |-- level2: int32 (nullable = false)
2152 |-- nested_list: list (nullable = true)
2153 | |-- item: struct (nullable = true)
2154 | | |-- id: int64 (nullable = false)
2155 | | |-- value: float64 (nullable = true)
2156 |-- map_data: map (nullable = true)
2157 | |-- key: utf8 (nullable = false)
2158 | |-- value: list (nullable = true)
2159 | | |-- item: int32 (nullable = true)
2160 |-- timestamp_field: timestamp (UTC) (nullable = false)
2161 ");
2162 }
2163
2164 #[test]
2165 fn test_print_schema_mixed_qualified_unqualified() {
2166 let schema = DFSchema::new_with_metadata(
2168 vec![
2169 (
2170 Some("table1".into()),
2171 Arc::new(Field::new("id", DataType::Int32, false)),
2172 ),
2173 (None, Arc::new(Field::new("name", DataType::Utf8, true))),
2174 (
2175 Some("table2".into()),
2176 Arc::new(Field::new("score", DataType::Float64, true)),
2177 ),
2178 (
2179 None,
2180 Arc::new(Field::new("active", DataType::Boolean, false)),
2181 ),
2182 ],
2183 HashMap::new(),
2184 )
2185 .unwrap();
2186
2187 let output = schema.tree_string();
2188
2189 insta::assert_snapshot!(output, @r"
2190 root
2191 |-- table1.id: int32 (nullable = false)
2192 |-- name: utf8 (nullable = true)
2193 |-- table2.score: float64 (nullable = true)
2194 |-- active: boolean (nullable = false)
2195 ");
2196 }
2197
2198 #[test]
2199 fn test_print_schema_array_of_map() {
2200 let map_field = Field::new(
2202 "entries",
2203 DataType::Struct(Fields::from(vec![
2204 Field::new("key", DataType::Utf8, false),
2205 Field::new("value", DataType::Utf8, false),
2206 ])),
2207 false,
2208 );
2209
2210 let array_of_map_field = Field::new(
2211 "array_map_field",
2212 DataType::List(Arc::new(Field::new(
2213 "item",
2214 DataType::Map(Arc::new(map_field), false),
2215 false,
2216 ))),
2217 false,
2218 );
2219
2220 let schema = DFSchema::from_unqualified_fields(
2221 vec![array_of_map_field].into(),
2222 HashMap::new(),
2223 )
2224 .unwrap();
2225
2226 let output = schema.tree_string();
2227
2228 insta::assert_snapshot!(output, @r"
2229 root
2230 |-- array_map_field: list (nullable = false)
2231 | |-- item: map (nullable = false)
2232 | | |-- key: utf8 (nullable = false)
2233 | | |-- value: utf8 (nullable = false)
2234 ");
2235 }
2236
2237 #[test]
2238 fn test_print_schema_complex_type_combinations() {
2239 let list_of_structs = Field::new(
2243 "list_of_structs",
2244 DataType::List(Arc::new(Field::new(
2245 "item",
2246 DataType::Struct(Fields::from(vec![
2247 Field::new("id", DataType::Int32, false),
2248 Field::new("name", DataType::Utf8, true),
2249 Field::new("score", DataType::Float64, true),
2250 ])),
2251 true,
2252 ))),
2253 true,
2254 );
2255
2256 let struct_with_lists = Field::new(
2258 "struct_with_lists",
2259 DataType::Struct(Fields::from(vec![
2260 Field::new(
2261 "tags",
2262 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
2263 true,
2264 ),
2265 Field::new(
2266 "scores",
2267 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
2268 false,
2269 ),
2270 Field::new("metadata", DataType::Utf8, true),
2271 ])),
2272 false,
2273 );
2274
2275 let map_with_struct_values = Field::new(
2277 "map_with_struct_values",
2278 DataType::Map(
2279 Arc::new(Field::new(
2280 "entries",
2281 DataType::Struct(Fields::from(vec![
2282 Field::new("key", DataType::Utf8, false),
2283 Field::new(
2284 "value",
2285 DataType::Struct(Fields::from(vec![
2286 Field::new("count", DataType::Int64, false),
2287 Field::new("active", DataType::Boolean, true),
2288 ])),
2289 true,
2290 ),
2291 ])),
2292 false,
2293 )),
2294 false,
2295 ),
2296 true,
2297 );
2298
2299 let list_of_maps = Field::new(
2301 "list_of_maps",
2302 DataType::List(Arc::new(Field::new(
2303 "item",
2304 DataType::Map(
2305 Arc::new(Field::new(
2306 "entries",
2307 DataType::Struct(Fields::from(vec![
2308 Field::new("key", DataType::Utf8, false),
2309 Field::new("value", DataType::Int32, true),
2310 ])),
2311 false,
2312 )),
2313 false,
2314 ),
2315 true,
2316 ))),
2317 true,
2318 );
2319
2320 let deeply_nested = Field::new(
2322 "deeply_nested",
2323 DataType::Struct(Fields::from(vec![
2324 Field::new("level1", DataType::Utf8, true),
2325 Field::new(
2326 "level2",
2327 DataType::List(Arc::new(Field::new(
2328 "item",
2329 DataType::Struct(Fields::from(vec![
2330 Field::new("id", DataType::Int32, false),
2331 Field::new(
2332 "properties",
2333 DataType::Map(
2334 Arc::new(Field::new(
2335 "entries",
2336 DataType::Struct(Fields::from(vec![
2337 Field::new("key", DataType::Utf8, false),
2338 Field::new("value", DataType::Float64, true),
2339 ])),
2340 false,
2341 )),
2342 false,
2343 ),
2344 true,
2345 ),
2346 ])),
2347 true,
2348 ))),
2349 false,
2350 ),
2351 ])),
2352 true,
2353 );
2354
2355 let schema = DFSchema::from_unqualified_fields(
2356 vec![
2357 list_of_structs,
2358 struct_with_lists,
2359 map_with_struct_values,
2360 list_of_maps,
2361 deeply_nested,
2362 ]
2363 .into(),
2364 HashMap::new(),
2365 )
2366 .unwrap();
2367
2368 let output = schema.tree_string();
2369
2370 insta::assert_snapshot!(output, @r"
2371 root
2372 |-- list_of_structs: list (nullable = true)
2373 | |-- item: struct (nullable = true)
2374 | | |-- id: int32 (nullable = false)
2375 | | |-- name: utf8 (nullable = true)
2376 | | |-- score: float64 (nullable = true)
2377 |-- struct_with_lists: struct (nullable = false)
2378 | |-- tags: list (nullable = true)
2379 | | |-- item: utf8 (nullable = true)
2380 | |-- scores: list (nullable = false)
2381 | | |-- item: int32 (nullable = true)
2382 | |-- metadata: utf8 (nullable = true)
2383 |-- map_with_struct_values: map (nullable = true)
2384 | |-- key: utf8 (nullable = false)
2385 | |-- value: struct (nullable = true)
2386 | | |-- count: int64 (nullable = false)
2387 | | |-- active: boolean (nullable = true)
2388 |-- list_of_maps: list (nullable = true)
2389 | |-- item: map (nullable = true)
2390 | | |-- key: utf8 (nullable = false)
2391 | | |-- value: int32 (nullable = false)
2392 |-- deeply_nested: struct (nullable = true)
2393 | |-- level1: utf8 (nullable = true)
2394 | |-- level2: list (nullable = false)
2395 | | |-- item: struct (nullable = true)
2396 | | | |-- id: int32 (nullable = false)
2397 | | | |-- properties: map (nullable = true)
2398 | | | | |-- key: utf8 (nullable = false)
2399 | | | | |-- value: float64 (nullable = false)
2400 ");
2401 }
2402
2403 #[test]
2404 fn test_print_schema_edge_case_types() {
2405 let schema = DFSchema::from_unqualified_fields(
2407 vec![
2408 Field::new("null_field", DataType::Null, true),
2409 Field::new("binary_field", DataType::Binary, false),
2410 Field::new("large_binary", DataType::LargeBinary, true),
2411 Field::new("large_utf8", DataType::LargeUtf8, false),
2412 Field::new("fixed_size_binary", DataType::FixedSizeBinary(16), true),
2413 Field::new(
2414 "fixed_size_list",
2415 DataType::FixedSizeList(
2416 Arc::new(Field::new("item", DataType::Int32, true)),
2417 5,
2418 ),
2419 false,
2420 ),
2421 Field::new("decimal32", DataType::Decimal32(9, 4), true),
2422 Field::new("decimal64", DataType::Decimal64(9, 4), true),
2423 Field::new("decimal128", DataType::Decimal128(18, 4), true),
2424 Field::new("decimal256", DataType::Decimal256(38, 10), false),
2425 Field::new("date32", DataType::Date32, true),
2426 Field::new("date64", DataType::Date64, false),
2427 Field::new(
2428 "time32_seconds",
2429 DataType::Time32(arrow::datatypes::TimeUnit::Second),
2430 true,
2431 ),
2432 Field::new(
2433 "time64_nanoseconds",
2434 DataType::Time64(arrow::datatypes::TimeUnit::Nanosecond),
2435 false,
2436 ),
2437 ]
2438 .into(),
2439 HashMap::new(),
2440 )
2441 .unwrap();
2442
2443 let output = schema.tree_string();
2444
2445 insta::assert_snapshot!(output, @r"
2446 root
2447 |-- null_field: null (nullable = true)
2448 |-- binary_field: binary (nullable = false)
2449 |-- large_binary: large_binary (nullable = true)
2450 |-- large_utf8: large_utf8 (nullable = false)
2451 |-- fixed_size_binary: fixed_size_binary (nullable = true)
2452 |-- fixed_size_list: fixed size list (nullable = false)
2453 | |-- item: int32 (nullable = true)
2454 |-- decimal32: decimal32(9, 4) (nullable = true)
2455 |-- decimal64: decimal64(9, 4) (nullable = true)
2456 |-- decimal128: decimal128(18, 4) (nullable = true)
2457 |-- decimal256: decimal256(38, 10) (nullable = false)
2458 |-- date32: date32 (nullable = true)
2459 |-- date64: date64 (nullable = false)
2460 |-- time32_seconds: time32 (nullable = true)
2461 |-- time64_nanoseconds: time64 (nullable = false)
2462 ");
2463 }
2464}