1use super::{Between, Expr, Like};
19use crate::expr::FieldMetadata;
20use crate::expr::{
21 AggregateFunction, AggregateFunctionParams, Alias, BinaryExpr, Cast, InList,
22 InSubquery, Lambda, Placeholder, ScalarFunction, TryCast, Unnest, WindowFunction,
23 WindowFunctionParams,
24};
25use crate::type_coercion::functions::{
26 fields_with_aggregate_udf, fields_with_window_udf,
27};
28use crate::{
29 type_coercion::functions::data_types_with_scalar_udf, udf::ReturnFieldArgs, utils,
30 LogicalPlan, Projection, Subquery, WindowFunctionDefinition,
31};
32use arrow::datatypes::FieldRef;
33use arrow::{
34 compute::can_cast_types,
35 datatypes::{DataType, Field},
36};
37use datafusion_common::{
38 not_impl_err, plan_datafusion_err, plan_err, Column, DataFusionError, ExprSchema,
39 Result, Spans, TableReference,
40};
41use datafusion_expr_common::type_coercion::binary::BinaryTypeCoercer;
42use datafusion_functions_window_common::field::WindowUDFFieldArgs;
43use std::sync::Arc;
44
45pub trait ExprSchemable {
47 fn get_type(&self, schema: &dyn ExprSchema) -> Result<DataType>;
49
50 fn nullable(&self, input_schema: &dyn ExprSchema) -> Result<bool>;
52
53 fn metadata(&self, schema: &dyn ExprSchema) -> Result<FieldMetadata>;
55
56 fn to_field(
58 &self,
59 input_schema: &dyn ExprSchema,
60 ) -> Result<(Option<TableReference>, Arc<Field>)>;
61
62 fn cast_to(self, cast_to_type: &DataType, schema: &dyn ExprSchema) -> Result<Expr>;
64
65 fn data_type_and_nullable(&self, schema: &dyn ExprSchema)
67 -> Result<(DataType, bool)>;
68}
69
70impl ExprSchemable for Expr {
71 #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
112 fn get_type(&self, schema: &dyn ExprSchema) -> Result<DataType> {
113 match self {
114 Expr::Alias(Alias { expr, name, .. }) => match &**expr {
115 Expr::Placeholder(Placeholder { field, .. }) => match &field {
116 None => schema.data_type(&Column::from_name(name)).cloned(),
117 Some(field) => Ok(field.data_type().clone()),
118 },
119 _ => expr.get_type(schema),
120 },
121 Expr::Negative(expr) => expr.get_type(schema),
122 Expr::Column(c) => Ok(schema.data_type(c)?.clone()),
123 Expr::OuterReferenceColumn(field, _) => Ok(field.data_type().clone()),
124 Expr::ScalarVariable(ty, _) => Ok(ty.clone()),
125 Expr::Literal(l, _) => Ok(l.data_type()),
126 Expr::Case(case) => {
127 for (_, then_expr) in &case.when_then_expr {
128 let then_type = then_expr.get_type(schema)?;
129 if !then_type.is_null() {
130 return Ok(then_type);
131 }
132 }
133 case.else_expr
134 .as_ref()
135 .map_or(Ok(DataType::Null), |e| e.get_type(schema))
136 }
137 Expr::Cast(Cast { data_type, .. })
138 | Expr::TryCast(TryCast { data_type, .. }) => Ok(data_type.clone()),
139 Expr::Unnest(Unnest { expr }) => {
140 let arg_data_type = expr.get_type(schema)?;
141 match arg_data_type {
143 DataType::List(field)
144 | DataType::LargeList(field)
145 | DataType::FixedSizeList(field, _) => Ok(field.data_type().clone()),
146 DataType::Struct(_) => Ok(arg_data_type),
147 DataType::Null => {
148 not_impl_err!("unnest() does not support null yet")
149 }
150 _ => {
151 plan_err!(
152 "unnest() can only be applied to array, struct and null"
153 )
154 }
155 }
156 }
157 Expr::ScalarFunction(_func) => {
158 let (return_type, _) = self.data_type_and_nullable(schema)?;
159 Ok(return_type)
160 }
161 Expr::WindowFunction(window_function) => self
162 .data_type_and_nullable_with_window_function(schema, window_function)
163 .map(|(return_type, _)| return_type),
164 Expr::AggregateFunction(AggregateFunction {
165 func,
166 params: AggregateFunctionParams { args, .. },
167 }) => {
168 let fields = args
169 .iter()
170 .map(|e| e.to_field(schema).map(|(_, f)| f))
171 .collect::<Result<Vec<_>>>()?;
172 let new_fields = fields_with_aggregate_udf(&fields, func)
173 .map_err(|err| {
174 let data_types = fields
175 .iter()
176 .map(|f| f.data_type().clone())
177 .collect::<Vec<_>>();
178 plan_datafusion_err!(
179 "{} {}",
180 match err {
181 DataFusionError::Plan(msg) => msg,
182 err => err.to_string(),
183 },
184 utils::generate_signature_error_msg(
185 func.name(),
186 func.signature().clone(),
187 &data_types
188 )
189 )
190 })?
191 .into_iter()
192 .collect::<Vec<_>>();
193 Ok(func.return_field(&new_fields)?.data_type().clone())
194 }
195 Expr::Not(_)
196 | Expr::IsNull(_)
197 | Expr::Exists { .. }
198 | Expr::InSubquery(_)
199 | Expr::Between { .. }
200 | Expr::InList { .. }
201 | Expr::IsNotNull(_)
202 | Expr::IsTrue(_)
203 | Expr::IsFalse(_)
204 | Expr::IsUnknown(_)
205 | Expr::IsNotTrue(_)
206 | Expr::IsNotFalse(_)
207 | Expr::IsNotUnknown(_) => Ok(DataType::Boolean),
208 Expr::ScalarSubquery(subquery) => {
209 Ok(subquery.subquery.schema().field(0).data_type().clone())
210 }
211 Expr::BinaryExpr(BinaryExpr {
212 ref left,
213 ref right,
214 ref op,
215 }) => BinaryTypeCoercer::new(
216 &left.get_type(schema)?,
217 op,
218 &right.get_type(schema)?,
219 )
220 .get_result_type(),
221 Expr::Like { .. } | Expr::SimilarTo { .. } => Ok(DataType::Boolean),
222 Expr::Placeholder(Placeholder { field, .. }) => {
223 if let Some(field) = field {
224 Ok(field.data_type().clone())
225 } else {
226 Ok(DataType::Null)
229 }
230 }
231 #[expect(deprecated)]
232 Expr::Wildcard { .. } => Ok(DataType::Null),
233 Expr::GroupingSet(_) => {
234 Ok(DataType::Null)
236 }
237 Expr::Lambda { .. } => Ok(DataType::Null),
238 }
239 }
240
241 fn nullable(&self, input_schema: &dyn ExprSchema) -> Result<bool> {
253 match self {
254 Expr::Alias(Alias { expr, .. }) | Expr::Not(expr) | Expr::Negative(expr) => {
255 expr.nullable(input_schema)
256 }
257
258 Expr::InList(InList { expr, list, .. }) => {
259 const MAX_INSPECT_LIMIT: usize = 6;
261 let has_nullable = std::iter::once(expr.as_ref())
263 .chain(list)
264 .take(MAX_INSPECT_LIMIT)
265 .find_map(|e| {
266 e.nullable(input_schema)
267 .map(|nullable| if nullable { Some(()) } else { None })
268 .transpose()
269 })
270 .transpose()?;
271 Ok(match has_nullable {
272 Some(_) => true,
274 None if list.len() + 1 > MAX_INSPECT_LIMIT => true,
276 _ => false,
278 })
279 }
280
281 Expr::Between(Between {
282 expr, low, high, ..
283 }) => Ok(expr.nullable(input_schema)?
284 || low.nullable(input_schema)?
285 || high.nullable(input_schema)?),
286
287 Expr::Column(c) => input_schema.nullable(c),
288 Expr::OuterReferenceColumn(field, _) => Ok(field.is_nullable()),
289 Expr::Literal(value, _) => Ok(value.is_null()),
290 Expr::Case(case) => {
291 let then_nullable = case
293 .when_then_expr
294 .iter()
295 .map(|(_, t)| t.nullable(input_schema))
296 .collect::<Result<Vec<_>>>()?;
297 if then_nullable.contains(&true) {
298 Ok(true)
299 } else if let Some(e) = &case.else_expr {
300 e.nullable(input_schema)
301 } else {
302 Ok(true)
305 }
306 }
307 Expr::Cast(Cast { expr, .. }) => expr.nullable(input_schema),
308 Expr::ScalarFunction(_func) => {
309 let (_, nullable) = self.data_type_and_nullable(input_schema)?;
310 Ok(nullable)
311 }
312 Expr::AggregateFunction(AggregateFunction { func, .. }) => {
313 Ok(func.is_nullable())
314 }
315 Expr::WindowFunction(window_function) => self
316 .data_type_and_nullable_with_window_function(
317 input_schema,
318 window_function,
319 )
320 .map(|(_, nullable)| nullable),
321 Expr::Placeholder(Placeholder { id: _, field }) => {
322 Ok(field.as_ref().map(|f| f.is_nullable()).unwrap_or(true))
323 }
324 Expr::ScalarVariable(_, _) | Expr::TryCast { .. } | Expr::Unnest(_) => {
325 Ok(true)
326 }
327 Expr::IsNull(_)
328 | Expr::IsNotNull(_)
329 | Expr::IsTrue(_)
330 | Expr::IsFalse(_)
331 | Expr::IsUnknown(_)
332 | Expr::IsNotTrue(_)
333 | Expr::IsNotFalse(_)
334 | Expr::IsNotUnknown(_)
335 | Expr::Exists { .. } => Ok(false),
336 Expr::InSubquery(InSubquery { expr, .. }) => expr.nullable(input_schema),
337 Expr::ScalarSubquery(subquery) => {
338 Ok(subquery.subquery.schema().field(0).is_nullable())
339 }
340 Expr::BinaryExpr(BinaryExpr {
341 ref left,
342 ref right,
343 ..
344 }) => Ok(left.nullable(input_schema)? || right.nullable(input_schema)?),
345 Expr::Like(Like { expr, pattern, .. })
346 | Expr::SimilarTo(Like { expr, pattern, .. }) => {
347 Ok(expr.nullable(input_schema)? || pattern.nullable(input_schema)?)
348 }
349 #[expect(deprecated)]
350 Expr::Wildcard { .. } => Ok(false),
351 Expr::GroupingSet(_) => {
352 Ok(true)
355 }
356 Expr::Lambda { .. } => Ok(false),
357 }
358 }
359
360 fn metadata(&self, schema: &dyn ExprSchema) -> Result<FieldMetadata> {
361 self.to_field(schema)
362 .map(|(_, field)| FieldMetadata::from(field.metadata()))
363 }
364
365 fn data_type_and_nullable(
376 &self,
377 schema: &dyn ExprSchema,
378 ) -> Result<(DataType, bool)> {
379 let field = self.to_field(schema)?.1;
380
381 Ok((field.data_type().clone(), field.is_nullable()))
382 }
383
384 fn to_field(
435 &self,
436 schema: &dyn ExprSchema,
437 ) -> Result<(Option<TableReference>, Arc<Field>)> {
438 let (relation, schema_name) = self.qualified_name();
439 #[allow(deprecated)]
440 let field = match self {
441 Expr::Alias(Alias {
442 expr,
443 name: _,
444 metadata,
445 ..
446 }) => {
447 let field = expr.to_field(schema).map(|(_, f)| f.as_ref().clone())?;
448
449 let mut combined_metadata = expr.metadata(schema)?;
450 if let Some(metadata) = metadata {
451 combined_metadata.extend(metadata.clone());
452 }
453
454 Ok(Arc::new(combined_metadata.add_to_field(field)))
455 }
456 Expr::Negative(expr) => expr.to_field(schema).map(|(_, f)| f),
457 Expr::Column(c) => schema.field_from_column(c).map(|f| Arc::new(f.clone())),
458 Expr::OuterReferenceColumn(field, _) => {
459 Ok(Arc::new(field.as_ref().clone().with_name(&schema_name)))
460 }
461 Expr::ScalarVariable(ty, _) => {
462 Ok(Arc::new(Field::new(&schema_name, ty.clone(), true)))
463 }
464 Expr::Literal(l, metadata) => {
465 let mut field = Field::new(&schema_name, l.data_type(), l.is_null());
466 if let Some(metadata) = metadata {
467 field = metadata.add_to_field(field);
468 }
469 Ok(Arc::new(field))
470 }
471 Expr::IsNull(_)
472 | Expr::IsNotNull(_)
473 | Expr::IsTrue(_)
474 | Expr::IsFalse(_)
475 | Expr::IsUnknown(_)
476 | Expr::IsNotTrue(_)
477 | Expr::IsNotFalse(_)
478 | Expr::IsNotUnknown(_)
479 | Expr::Exists { .. } => {
480 Ok(Arc::new(Field::new(&schema_name, DataType::Boolean, false)))
481 }
482 Expr::ScalarSubquery(subquery) => {
483 Ok(Arc::clone(&subquery.subquery.schema().fields()[0]))
484 }
485 Expr::BinaryExpr(BinaryExpr {
486 ref left,
487 ref right,
488 ref op,
489 }) => {
490 let (lhs_type, lhs_nullable) = left.data_type_and_nullable(schema)?;
491 let (rhs_type, rhs_nullable) = right.data_type_and_nullable(schema)?;
492 let mut coercer = BinaryTypeCoercer::new(&lhs_type, op, &rhs_type);
493 coercer.set_lhs_spans(left.spans().cloned().unwrap_or_default());
494 coercer.set_rhs_spans(right.spans().cloned().unwrap_or_default());
495 Ok(Arc::new(Field::new(
496 &schema_name,
497 coercer.get_result_type()?,
498 lhs_nullable || rhs_nullable,
499 )))
500 }
501 Expr::WindowFunction(window_function) => {
502 let (dt, nullable) = self.data_type_and_nullable_with_window_function(
503 schema,
504 window_function,
505 )?;
506 Ok(Arc::new(Field::new(&schema_name, dt, nullable)))
507 }
508 Expr::AggregateFunction(aggregate_function) => {
509 let AggregateFunction {
510 func,
511 params: AggregateFunctionParams { args, .. },
512 ..
513 } = aggregate_function;
514
515 let fields = args
516 .iter()
517 .map(|e| e.to_field(schema).map(|(_, f)| f))
518 .collect::<Result<Vec<_>>>()?;
519 let new_fields = fields_with_aggregate_udf(&fields, func)
521 .map_err(|err| {
522 let arg_types = fields
523 .iter()
524 .map(|f| f.data_type())
525 .cloned()
526 .collect::<Vec<_>>();
527 plan_datafusion_err!(
528 "{} {}",
529 match err {
530 DataFusionError::Plan(msg) => msg,
531 err => err.to_string(),
532 },
533 utils::generate_signature_error_msg(
534 func.name(),
535 func.signature().clone(),
536 &arg_types,
537 )
538 )
539 })?
540 .into_iter()
541 .collect::<Vec<_>>();
542
543 func.return_field(&new_fields)
544 }
545 Expr::ScalarFunction(ScalarFunction { func, args }) => {
547 let fields = if args.iter().any(|arg| matches!(arg, Expr::Lambda(_))) {
548 let lambdas_schemas = func.arguments_expr_schema(args, schema)?;
549
550 std::iter::zip(args, lambdas_schemas)
551 .map(|(e, schema)| match e {
553 Expr::Lambda(Lambda { params: _, body }) => {
554 body.to_field(&schema).map(|(_, f)| f)
555 }
556 _ => e.to_field(&schema).map(|(_, f)| f),
557 })
558 .collect::<Result<Vec<_>>>()?
559 } else {
560 args.iter()
561 .map(|e| e.to_field(schema).map(|(_, f)| f))
562 .collect::<Result<Vec<_>>>()?
563 };
564
565 let arg_types = fields
566 .iter()
567 .map(|f| f.data_type().clone())
568 .collect::<Vec<_>>();
569
570 let new_data_types = data_types_with_scalar_udf(&arg_types, func)
572 .map_err(|err| {
573 plan_datafusion_err!(
574 "{} {}",
575 match err {
576 DataFusionError::Plan(msg) => msg,
577 err => err.to_string(),
578 },
579 utils::generate_signature_error_msg(
580 func.name(),
581 func.signature().clone(),
582 &arg_types,
583 )
584 )
585 })?;
586 let new_fields = fields
587 .into_iter()
588 .zip(new_data_types)
589 .map(|(f, d)| f.as_ref().clone().with_data_type(d))
590 .map(Arc::new)
591 .collect::<Vec<FieldRef>>();
592
593 let arguments = args
594 .iter()
595 .map(|e| match e {
596 Expr::Literal(sv, _) => Some(sv),
597 _ => None,
598 })
599 .collect::<Vec<_>>();
600
601 let lambdas = args
602 .iter()
603 .map(|e| matches!(e, Expr::Lambda { .. }))
604 .collect::<Vec<_>>();
605
606 let args = ReturnFieldArgs {
607 arg_fields: &new_fields,
608 scalar_arguments: &arguments,
609 lambdas: &lambdas,
610 };
611
612 func.return_field_from_args(args)
613 }
614 Expr::Cast(Cast { expr, data_type }) => expr
616 .to_field(schema)
617 .map(|(_, f)| f.as_ref().clone().with_data_type(data_type.clone()))
618 .map(Arc::new),
619 Expr::Placeholder(Placeholder {
620 id: _,
621 field: Some(field),
622 }) => Ok(field.as_ref().clone().with_name(&schema_name).into()),
623 Expr::Like(_)
624 | Expr::SimilarTo(_)
625 | Expr::Not(_)
626 | Expr::Between(_)
627 | Expr::Case(_)
628 | Expr::TryCast(_)
629 | Expr::InList(_)
630 | Expr::InSubquery(_)
631 | Expr::Wildcard { .. }
632 | Expr::GroupingSet(_)
633 | Expr::Placeholder(_)
634 | Expr::Unnest(_)
635 | Expr::Lambda(_) => Ok(Arc::new(Field::new(
636 &schema_name,
637 self.get_type(schema)?,
638 self.nullable(schema)?,
639 ))),
640 }?;
641
642 Ok((
643 relation,
644 Arc::new(field.as_ref().clone().with_name(schema_name)),
645 ))
646 }
647
648 fn cast_to(self, cast_to_type: &DataType, schema: &dyn ExprSchema) -> Result<Expr> {
655 let this_type = self.get_type(schema)?;
656 if this_type == *cast_to_type {
657 return Ok(self);
658 }
659
660 if can_cast_types(&this_type, cast_to_type) {
665 match self {
666 Expr::ScalarSubquery(subquery) => {
667 Ok(Expr::ScalarSubquery(cast_subquery(subquery, cast_to_type)?))
668 }
669 _ => Ok(Expr::Cast(Cast::new(Box::new(self), cast_to_type.clone()))),
670 }
671 } else {
672 plan_err!("Cannot automatically convert {this_type} to {cast_to_type}")
673 }
674 }
675}
676
677impl Expr {
678 fn data_type_and_nullable_with_window_function(
688 &self,
689 schema: &dyn ExprSchema,
690 window_function: &WindowFunction,
691 ) -> Result<(DataType, bool)> {
692 let WindowFunction {
693 fun,
694 params: WindowFunctionParams { args, .. },
695 ..
696 } = window_function;
697
698 let fields = args
699 .iter()
700 .map(|e| e.to_field(schema).map(|(_, f)| f))
701 .collect::<Result<Vec<_>>>()?;
702 match fun {
703 WindowFunctionDefinition::AggregateUDF(udaf) => {
704 let data_types = fields
705 .iter()
706 .map(|f| f.data_type())
707 .cloned()
708 .collect::<Vec<_>>();
709 let new_fields = fields_with_aggregate_udf(&fields, udaf)
710 .map_err(|err| {
711 plan_datafusion_err!(
712 "{} {}",
713 match err {
714 DataFusionError::Plan(msg) => msg,
715 err => err.to_string(),
716 },
717 utils::generate_signature_error_msg(
718 fun.name(),
719 fun.signature(),
720 &data_types
721 )
722 )
723 })?
724 .into_iter()
725 .collect::<Vec<_>>();
726
727 let return_field = udaf.return_field(&new_fields)?;
728
729 Ok((return_field.data_type().clone(), return_field.is_nullable()))
730 }
731 WindowFunctionDefinition::WindowUDF(udwf) => {
732 let data_types = fields
733 .iter()
734 .map(|f| f.data_type())
735 .cloned()
736 .collect::<Vec<_>>();
737 let new_fields = fields_with_window_udf(&fields, udwf)
738 .map_err(|err| {
739 plan_datafusion_err!(
740 "{} {}",
741 match err {
742 DataFusionError::Plan(msg) => msg,
743 err => err.to_string(),
744 },
745 utils::generate_signature_error_msg(
746 fun.name(),
747 fun.signature(),
748 &data_types
749 )
750 )
751 })?
752 .into_iter()
753 .collect::<Vec<_>>();
754 let (_, function_name) = self.qualified_name();
755 let field_args = WindowUDFFieldArgs::new(&new_fields, &function_name);
756
757 udwf.field(field_args)
758 .map(|field| (field.data_type().clone(), field.is_nullable()))
759 }
760 }
761 }
762}
763
764pub fn cast_subquery(subquery: Subquery, cast_to_type: &DataType) -> Result<Subquery> {
772 if subquery.subquery.schema().field(0).data_type() == cast_to_type {
773 return Ok(subquery);
774 }
775
776 let plan = subquery.subquery.as_ref();
777 let new_plan = match plan {
778 LogicalPlan::Projection(projection) => {
779 let cast_expr = projection.expr[0]
780 .clone()
781 .cast_to(cast_to_type, projection.input.schema())?;
782 LogicalPlan::Projection(Projection::try_new(
783 vec![cast_expr],
784 Arc::clone(&projection.input),
785 )?)
786 }
787 _ => {
788 let cast_expr = Expr::Column(Column::from(plan.schema().qualified_field(0)))
789 .cast_to(cast_to_type, subquery.subquery.schema())?;
790 LogicalPlan::Projection(Projection::try_new(
791 vec![cast_expr],
792 subquery.subquery,
793 )?)
794 }
795 };
796 Ok(Subquery {
797 subquery: Arc::new(new_plan),
798 outer_ref_columns: subquery.outer_ref_columns,
799 spans: Spans::new(),
800 })
801}
802
803#[cfg(test)]
804mod tests {
805 use std::collections::HashMap;
806
807 use super::*;
808 use crate::{col, lit, out_ref_col_with_metadata};
809
810 use datafusion_common::{internal_err, DFSchema, ScalarValue};
811
812 macro_rules! test_is_expr_nullable {
813 ($EXPR_TYPE:ident) => {{
814 let expr = lit(ScalarValue::Null).$EXPR_TYPE();
815 assert!(!expr.nullable(&MockExprSchema::new()).unwrap());
816 }};
817 }
818
819 #[test]
820 fn expr_schema_nullability() {
821 let expr = col("foo").eq(lit(1));
822 assert!(!expr.nullable(&MockExprSchema::new()).unwrap());
823 assert!(expr
824 .nullable(&MockExprSchema::new().with_nullable(true))
825 .unwrap());
826
827 test_is_expr_nullable!(is_null);
828 test_is_expr_nullable!(is_not_null);
829 test_is_expr_nullable!(is_true);
830 test_is_expr_nullable!(is_not_true);
831 test_is_expr_nullable!(is_false);
832 test_is_expr_nullable!(is_not_false);
833 test_is_expr_nullable!(is_unknown);
834 test_is_expr_nullable!(is_not_unknown);
835 }
836
837 #[test]
838 fn test_between_nullability() {
839 let get_schema = |nullable| {
840 MockExprSchema::new()
841 .with_data_type(DataType::Int32)
842 .with_nullable(nullable)
843 };
844
845 let expr = col("foo").between(lit(1), lit(2));
846 assert!(!expr.nullable(&get_schema(false)).unwrap());
847 assert!(expr.nullable(&get_schema(true)).unwrap());
848
849 let null = lit(ScalarValue::Int32(None));
850
851 let expr = col("foo").between(null.clone(), lit(2));
852 assert!(expr.nullable(&get_schema(false)).unwrap());
853
854 let expr = col("foo").between(lit(1), null.clone());
855 assert!(expr.nullable(&get_schema(false)).unwrap());
856
857 let expr = col("foo").between(null.clone(), null);
858 assert!(expr.nullable(&get_schema(false)).unwrap());
859 }
860
861 #[test]
862 fn test_inlist_nullability() {
863 let get_schema = |nullable| {
864 MockExprSchema::new()
865 .with_data_type(DataType::Int32)
866 .with_nullable(nullable)
867 };
868
869 let expr = col("foo").in_list(vec![lit(1); 5], false);
870 assert!(!expr.nullable(&get_schema(false)).unwrap());
871 assert!(expr.nullable(&get_schema(true)).unwrap());
872 assert!(expr
874 .nullable(&get_schema(false).with_error_on_nullable(true))
875 .is_err());
876
877 let null = lit(ScalarValue::Int32(None));
878 let expr = col("foo").in_list(vec![null, lit(1)], false);
879 assert!(expr.nullable(&get_schema(false)).unwrap());
880
881 let expr = col("foo").in_list(vec![lit(1); 6], false);
883 assert!(expr.nullable(&get_schema(false)).unwrap());
884 }
885
886 #[test]
887 fn test_like_nullability() {
888 let get_schema = |nullable| {
889 MockExprSchema::new()
890 .with_data_type(DataType::Utf8)
891 .with_nullable(nullable)
892 };
893
894 let expr = col("foo").like(lit("bar"));
895 assert!(!expr.nullable(&get_schema(false)).unwrap());
896 assert!(expr.nullable(&get_schema(true)).unwrap());
897
898 let expr = col("foo").like(lit(ScalarValue::Utf8(None)));
899 assert!(expr.nullable(&get_schema(false)).unwrap());
900 }
901
902 #[test]
903 fn expr_schema_data_type() {
904 let expr = col("foo");
905 assert_eq!(
906 DataType::Utf8,
907 expr.get_type(&MockExprSchema::new().with_data_type(DataType::Utf8))
908 .unwrap()
909 );
910 }
911
912 #[test]
913 fn test_expr_metadata() {
914 let mut meta = HashMap::new();
915 meta.insert("bar".to_string(), "buzz".to_string());
916 let meta = FieldMetadata::from(meta);
917 let expr = col("foo");
918 let schema = MockExprSchema::new()
919 .with_data_type(DataType::Int32)
920 .with_metadata(meta.clone());
921
922 assert_eq!(meta, expr.metadata(&schema).unwrap());
924 assert_eq!(meta, expr.clone().alias("bar").metadata(&schema).unwrap());
925 assert_eq!(
926 meta,
927 expr.clone()
928 .cast_to(&DataType::Int64, &schema)
929 .unwrap()
930 .metadata(&schema)
931 .unwrap()
932 );
933
934 let schema = DFSchema::from_unqualified_fields(
935 vec![meta.add_to_field(Field::new("foo", DataType::Int32, true))].into(),
936 HashMap::new(),
937 )
938 .unwrap();
939
940 assert_eq!(meta, expr.metadata(&schema).unwrap());
942
943 let outer_ref = out_ref_col_with_metadata(
945 DataType::Int32,
946 meta.to_hashmap(),
947 Column::from_name("foo"),
948 );
949 assert_eq!(meta, outer_ref.metadata(&schema).unwrap());
950 }
951
952 #[test]
953 fn test_expr_placeholder() {
954 let schema = MockExprSchema::new();
955
956 let mut placeholder_meta = HashMap::new();
957 placeholder_meta.insert("bar".to_string(), "buzz".to_string());
958 let placeholder_meta = FieldMetadata::from(placeholder_meta);
959
960 let expr = Expr::Placeholder(Placeholder::new_with_field(
961 "".to_string(),
962 Some(
963 Field::new("", DataType::Utf8, true)
964 .with_metadata(placeholder_meta.to_hashmap())
965 .into(),
966 ),
967 ));
968
969 assert_eq!(
970 expr.data_type_and_nullable(&schema).unwrap(),
971 (DataType::Utf8, true)
972 );
973 assert_eq!(placeholder_meta, expr.metadata(&schema).unwrap());
974
975 let expr_alias = expr.alias("a placeholder by any other name");
976 assert_eq!(
977 expr_alias.data_type_and_nullable(&schema).unwrap(),
978 (DataType::Utf8, true)
979 );
980 assert_eq!(placeholder_meta, expr_alias.metadata(&schema).unwrap());
981
982 let expr = Expr::Placeholder(Placeholder::new_with_field(
984 "".to_string(),
985 Some(Field::new("", DataType::Utf8, false).into()),
986 ));
987 assert_eq!(
988 expr.data_type_and_nullable(&schema).unwrap(),
989 (DataType::Utf8, false)
990 );
991 let expr_alias = expr.alias("a placeholder by any other name");
992 assert_eq!(
993 expr_alias.data_type_and_nullable(&schema).unwrap(),
994 (DataType::Utf8, false)
995 );
996 }
997
998 #[derive(Debug)]
999 struct MockExprSchema {
1000 field: Field,
1001 error_on_nullable: bool,
1002 }
1003
1004 impl MockExprSchema {
1005 fn new() -> Self {
1006 Self {
1007 field: Field::new("mock_field", DataType::Null, false),
1008 error_on_nullable: false,
1009 }
1010 }
1011
1012 fn with_nullable(mut self, nullable: bool) -> Self {
1013 self.field = self.field.with_nullable(nullable);
1014 self
1015 }
1016
1017 fn with_data_type(mut self, data_type: DataType) -> Self {
1018 self.field = self.field.with_data_type(data_type);
1019 self
1020 }
1021
1022 fn with_error_on_nullable(mut self, error_on_nullable: bool) -> Self {
1023 self.error_on_nullable = error_on_nullable;
1024 self
1025 }
1026
1027 fn with_metadata(mut self, metadata: FieldMetadata) -> Self {
1028 self.field = metadata.add_to_field(self.field);
1029 self
1030 }
1031 }
1032
1033 impl ExprSchema for MockExprSchema {
1034 fn nullable(&self, _col: &Column) -> Result<bool> {
1035 if self.error_on_nullable {
1036 internal_err!("nullable error")
1037 } else {
1038 Ok(self.field.is_nullable())
1039 }
1040 }
1041
1042 fn field_from_column(&self, _col: &Column) -> Result<&Field> {
1043 Ok(&self.field)
1044 }
1045 }
1046}