datafusion_expr/
expr_schema.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use super::{Between, Expr, Like};
19use crate::expr::FieldMetadata;
20use crate::expr::{
21    AggregateFunction, AggregateFunctionParams, Alias, BinaryExpr, Cast, InList,
22    InSubquery, Lambda, Placeholder, ScalarFunction, TryCast, Unnest, WindowFunction,
23    WindowFunctionParams,
24};
25use crate::type_coercion::functions::{
26    fields_with_aggregate_udf, fields_with_window_udf,
27};
28use crate::{
29    type_coercion::functions::data_types_with_scalar_udf, udf::ReturnFieldArgs, utils,
30    LogicalPlan, Projection, Subquery, WindowFunctionDefinition,
31};
32use arrow::datatypes::FieldRef;
33use arrow::{
34    compute::can_cast_types,
35    datatypes::{DataType, Field},
36};
37use datafusion_common::{
38    not_impl_err, plan_datafusion_err, plan_err, Column, DataFusionError, ExprSchema,
39    Result, Spans, TableReference,
40};
41use datafusion_expr_common::type_coercion::binary::BinaryTypeCoercer;
42use datafusion_functions_window_common::field::WindowUDFFieldArgs;
43use std::sync::Arc;
44
45/// Trait to allow expr to typable with respect to a schema
46pub trait ExprSchemable {
47    /// Given a schema, return the type of the expr
48    fn get_type(&self, schema: &dyn ExprSchema) -> Result<DataType>;
49
50    /// Given a schema, return the nullability of the expr
51    fn nullable(&self, input_schema: &dyn ExprSchema) -> Result<bool>;
52
53    /// Given a schema, return the expr's optional metadata
54    fn metadata(&self, schema: &dyn ExprSchema) -> Result<FieldMetadata>;
55
56    /// Convert to a field with respect to a schema
57    fn to_field(
58        &self,
59        input_schema: &dyn ExprSchema,
60    ) -> Result<(Option<TableReference>, Arc<Field>)>;
61
62    /// Cast to a type with respect to a schema
63    fn cast_to(self, cast_to_type: &DataType, schema: &dyn ExprSchema) -> Result<Expr>;
64
65    /// Given a schema, return the type and nullability of the expr
66    fn data_type_and_nullable(&self, schema: &dyn ExprSchema)
67        -> Result<(DataType, bool)>;
68}
69
70impl ExprSchemable for Expr {
71    /// Returns the [arrow::datatypes::DataType] of the expression
72    /// based on [ExprSchema]
73    ///
74    /// Note: [`DFSchema`] implements [ExprSchema].
75    ///
76    /// [`DFSchema`]: datafusion_common::DFSchema
77    ///
78    /// # Examples
79    ///
80    /// Get the type of an expression that adds 2 columns. Adding an Int32
81    /// and Float32 results in Float32 type
82    ///
83    /// ```
84    /// # use arrow::datatypes::{DataType, Field};
85    /// # use datafusion_common::DFSchema;
86    /// # use datafusion_expr::{col, ExprSchemable};
87    /// # use std::collections::HashMap;
88    ///
89    /// fn main() {
90    ///     let expr = col("c1") + col("c2");
91    ///     let schema = DFSchema::from_unqualified_fields(
92    ///         vec![
93    ///             Field::new("c1", DataType::Int32, true),
94    ///             Field::new("c2", DataType::Float32, true),
95    ///         ]
96    ///         .into(),
97    ///         HashMap::new(),
98    ///     )
99    ///     .unwrap();
100    ///     assert_eq!("Float32", format!("{}", expr.get_type(&schema).unwrap()));
101    /// }
102    /// ```
103    ///
104    /// # Errors
105    ///
106    /// This function errors when it is not possible to compute its
107    /// [arrow::datatypes::DataType].  This happens when e.g. the
108    /// expression refers to a column that does not exist in the
109    /// schema, or when the expression is incorrectly typed
110    /// (e.g. `[utf8] + [bool]`).
111    #[cfg_attr(feature = "recursive_protection", recursive::recursive)]
112    fn get_type(&self, schema: &dyn ExprSchema) -> Result<DataType> {
113        match self {
114            Expr::Alias(Alias { expr, name, .. }) => match &**expr {
115                Expr::Placeholder(Placeholder { field, .. }) => match &field {
116                    None => schema.data_type(&Column::from_name(name)).cloned(),
117                    Some(field) => Ok(field.data_type().clone()),
118                },
119                _ => expr.get_type(schema),
120            },
121            Expr::Negative(expr) => expr.get_type(schema),
122            Expr::Column(c) => Ok(schema.data_type(c)?.clone()),
123            Expr::OuterReferenceColumn(field, _) => Ok(field.data_type().clone()),
124            Expr::ScalarVariable(ty, _) => Ok(ty.clone()),
125            Expr::Literal(l, _) => Ok(l.data_type()),
126            Expr::Case(case) => {
127                for (_, then_expr) in &case.when_then_expr {
128                    let then_type = then_expr.get_type(schema)?;
129                    if !then_type.is_null() {
130                        return Ok(then_type);
131                    }
132                }
133                case.else_expr
134                    .as_ref()
135                    .map_or(Ok(DataType::Null), |e| e.get_type(schema))
136            }
137            Expr::Cast(Cast { data_type, .. })
138            | Expr::TryCast(TryCast { data_type, .. }) => Ok(data_type.clone()),
139            Expr::Unnest(Unnest { expr }) => {
140                let arg_data_type = expr.get_type(schema)?;
141                // Unnest's output type is the inner type of the list
142                match arg_data_type {
143                    DataType::List(field)
144                    | DataType::LargeList(field)
145                    | DataType::FixedSizeList(field, _) => Ok(field.data_type().clone()),
146                    DataType::Struct(_) => Ok(arg_data_type),
147                    DataType::Null => {
148                        not_impl_err!("unnest() does not support null yet")
149                    }
150                    _ => {
151                        plan_err!(
152                            "unnest() can only be applied to array, struct and null"
153                        )
154                    }
155                }
156            }
157            Expr::ScalarFunction(_func) => {
158                let (return_type, _) = self.data_type_and_nullable(schema)?;
159                Ok(return_type)
160            }
161            Expr::WindowFunction(window_function) => self
162                .data_type_and_nullable_with_window_function(schema, window_function)
163                .map(|(return_type, _)| return_type),
164            Expr::AggregateFunction(AggregateFunction {
165                func,
166                params: AggregateFunctionParams { args, .. },
167            }) => {
168                let fields = args
169                    .iter()
170                    .map(|e| e.to_field(schema).map(|(_, f)| f))
171                    .collect::<Result<Vec<_>>>()?;
172                let new_fields = fields_with_aggregate_udf(&fields, func)
173                    .map_err(|err| {
174                        let data_types = fields
175                            .iter()
176                            .map(|f| f.data_type().clone())
177                            .collect::<Vec<_>>();
178                        plan_datafusion_err!(
179                            "{} {}",
180                            match err {
181                                DataFusionError::Plan(msg) => msg,
182                                err => err.to_string(),
183                            },
184                            utils::generate_signature_error_msg(
185                                func.name(),
186                                func.signature().clone(),
187                                &data_types
188                            )
189                        )
190                    })?
191                    .into_iter()
192                    .collect::<Vec<_>>();
193                Ok(func.return_field(&new_fields)?.data_type().clone())
194            }
195            Expr::Not(_)
196            | Expr::IsNull(_)
197            | Expr::Exists { .. }
198            | Expr::InSubquery(_)
199            | Expr::Between { .. }
200            | Expr::InList { .. }
201            | Expr::IsNotNull(_)
202            | Expr::IsTrue(_)
203            | Expr::IsFalse(_)
204            | Expr::IsUnknown(_)
205            | Expr::IsNotTrue(_)
206            | Expr::IsNotFalse(_)
207            | Expr::IsNotUnknown(_) => Ok(DataType::Boolean),
208            Expr::ScalarSubquery(subquery) => {
209                Ok(subquery.subquery.schema().field(0).data_type().clone())
210            }
211            Expr::BinaryExpr(BinaryExpr {
212                ref left,
213                ref right,
214                ref op,
215            }) => BinaryTypeCoercer::new(
216                &left.get_type(schema)?,
217                op,
218                &right.get_type(schema)?,
219            )
220            .get_result_type(),
221            Expr::Like { .. } | Expr::SimilarTo { .. } => Ok(DataType::Boolean),
222            Expr::Placeholder(Placeholder { field, .. }) => {
223                if let Some(field) = field {
224                    Ok(field.data_type().clone())
225                } else {
226                    // If the placeholder's type hasn't been specified, treat it as
227                    // null (unspecified placeholders generate an error during planning)
228                    Ok(DataType::Null)
229                }
230            }
231            #[expect(deprecated)]
232            Expr::Wildcard { .. } => Ok(DataType::Null),
233            Expr::GroupingSet(_) => {
234                // Grouping sets do not really have a type and do not appear in projections
235                Ok(DataType::Null)
236            }
237            Expr::Lambda { .. } => Ok(DataType::Null),
238        }
239    }
240
241    /// Returns the nullability of the expression based on [ExprSchema].
242    ///
243    /// Note: [`DFSchema`] implements [ExprSchema].
244    ///
245    /// [`DFSchema`]: datafusion_common::DFSchema
246    ///
247    /// # Errors
248    ///
249    /// This function errors when it is not possible to compute its
250    /// nullability.  This happens when the expression refers to a
251    /// column that does not exist in the schema.
252    fn nullable(&self, input_schema: &dyn ExprSchema) -> Result<bool> {
253        match self {
254            Expr::Alias(Alias { expr, .. }) | Expr::Not(expr) | Expr::Negative(expr) => {
255                expr.nullable(input_schema)
256            }
257
258            Expr::InList(InList { expr, list, .. }) => {
259                // Avoid inspecting too many expressions.
260                const MAX_INSPECT_LIMIT: usize = 6;
261                // Stop if a nullable expression is found or an error occurs.
262                let has_nullable = std::iter::once(expr.as_ref())
263                    .chain(list)
264                    .take(MAX_INSPECT_LIMIT)
265                    .find_map(|e| {
266                        e.nullable(input_schema)
267                            .map(|nullable| if nullable { Some(()) } else { None })
268                            .transpose()
269                    })
270                    .transpose()?;
271                Ok(match has_nullable {
272                    // If a nullable subexpression is found, the result may also be nullable.
273                    Some(_) => true,
274                    // If the list is too long, we assume it is nullable.
275                    None if list.len() + 1 > MAX_INSPECT_LIMIT => true,
276                    // All the subexpressions are non-nullable, so the result must be non-nullable.
277                    _ => false,
278                })
279            }
280
281            Expr::Between(Between {
282                expr, low, high, ..
283            }) => Ok(expr.nullable(input_schema)?
284                || low.nullable(input_schema)?
285                || high.nullable(input_schema)?),
286
287            Expr::Column(c) => input_schema.nullable(c),
288            Expr::OuterReferenceColumn(field, _) => Ok(field.is_nullable()),
289            Expr::Literal(value, _) => Ok(value.is_null()),
290            Expr::Case(case) => {
291                // This expression is nullable if any of the input expressions are nullable
292                let then_nullable = case
293                    .when_then_expr
294                    .iter()
295                    .map(|(_, t)| t.nullable(input_schema))
296                    .collect::<Result<Vec<_>>>()?;
297                if then_nullable.contains(&true) {
298                    Ok(true)
299                } else if let Some(e) = &case.else_expr {
300                    e.nullable(input_schema)
301                } else {
302                    // CASE produces NULL if there is no `else` expr
303                    // (aka when none of the `when_then_exprs` match)
304                    Ok(true)
305                }
306            }
307            Expr::Cast(Cast { expr, .. }) => expr.nullable(input_schema),
308            Expr::ScalarFunction(_func) => {
309                let (_, nullable) = self.data_type_and_nullable(input_schema)?;
310                Ok(nullable)
311            }
312            Expr::AggregateFunction(AggregateFunction { func, .. }) => {
313                Ok(func.is_nullable())
314            }
315            Expr::WindowFunction(window_function) => self
316                .data_type_and_nullable_with_window_function(
317                    input_schema,
318                    window_function,
319                )
320                .map(|(_, nullable)| nullable),
321            Expr::Placeholder(Placeholder { id: _, field }) => {
322                Ok(field.as_ref().map(|f| f.is_nullable()).unwrap_or(true))
323            }
324            Expr::ScalarVariable(_, _) | Expr::TryCast { .. } | Expr::Unnest(_) => {
325                Ok(true)
326            }
327            Expr::IsNull(_)
328            | Expr::IsNotNull(_)
329            | Expr::IsTrue(_)
330            | Expr::IsFalse(_)
331            | Expr::IsUnknown(_)
332            | Expr::IsNotTrue(_)
333            | Expr::IsNotFalse(_)
334            | Expr::IsNotUnknown(_)
335            | Expr::Exists { .. } => Ok(false),
336            Expr::InSubquery(InSubquery { expr, .. }) => expr.nullable(input_schema),
337            Expr::ScalarSubquery(subquery) => {
338                Ok(subquery.subquery.schema().field(0).is_nullable())
339            }
340            Expr::BinaryExpr(BinaryExpr {
341                ref left,
342                ref right,
343                ..
344            }) => Ok(left.nullable(input_schema)? || right.nullable(input_schema)?),
345            Expr::Like(Like { expr, pattern, .. })
346            | Expr::SimilarTo(Like { expr, pattern, .. }) => {
347                Ok(expr.nullable(input_schema)? || pattern.nullable(input_schema)?)
348            }
349            #[expect(deprecated)]
350            Expr::Wildcard { .. } => Ok(false),
351            Expr::GroupingSet(_) => {
352                // Grouping sets do not really have the concept of nullable and do not appear
353                // in projections
354                Ok(true)
355            }
356            Expr::Lambda { .. } => Ok(false),
357        }
358    }
359
360    fn metadata(&self, schema: &dyn ExprSchema) -> Result<FieldMetadata> {
361        self.to_field(schema)
362            .map(|(_, field)| FieldMetadata::from(field.metadata()))
363    }
364
365    /// Returns the datatype and nullability of the expression based on [ExprSchema].
366    ///
367    /// Note: [`DFSchema`] implements [ExprSchema].
368    ///
369    /// [`DFSchema`]: datafusion_common::DFSchema
370    ///
371    /// # Errors
372    ///
373    /// This function errors when it is not possible to compute its
374    /// datatype or nullability.
375    fn data_type_and_nullable(
376        &self,
377        schema: &dyn ExprSchema,
378    ) -> Result<(DataType, bool)> {
379        let field = self.to_field(schema)?.1;
380
381        Ok((field.data_type().clone(), field.is_nullable()))
382    }
383
384    /// Returns a [arrow::datatypes::Field] compatible with this expression.
385    ///
386    /// This function converts an expression into a field with appropriate metadata
387    /// and nullability based on the expression type and context. It is the primary
388    /// mechanism for determining field-level schemas.
389    ///
390    /// # Field Property Resolution
391    ///
392    /// For each expression, the following properties are determined:
393    ///
394    /// ## Data Type Resolution
395    /// - **Column references**: Data type from input schema field
396    /// - **Literals**: Data type inferred from literal value
397    /// - **Aliases**: Data type inherited from the underlying expression (the aliased expression)
398    /// - **Binary expressions**: Result type from type coercion rules
399    /// - **Boolean expressions**: Always a boolean type
400    /// - **Cast expressions**: Target data type from cast operation
401    /// - **Function calls**: Return type based on function signature and argument types
402    ///
403    /// ## Nullability Determination
404    /// - **Column references**: Inherit nullability from input schema field
405    /// - **Literals**: Nullable only if literal value is NULL
406    /// - **Aliases**: Inherit nullability from the underlying expression (the aliased expression)
407    /// - **Binary expressions**: Nullable if either operand is nullable
408    /// - **Boolean expressions**: Always non-nullable (IS NULL, EXISTS, etc.)
409    /// - **Cast expressions**: determined by the input expression's nullability rules
410    /// - **Function calls**: Based on function nullability rules and input nullability
411    ///
412    /// ## Metadata Handling
413    /// - **Column references**: Preserve original field metadata from input schema
414    /// - **Literals**: Use explicitly provided metadata, otherwise empty
415    /// - **Aliases**: Merge underlying expr metadata with alias-specific metadata, preferring the alias metadata
416    /// - **Binary expressions**: field metadata is empty
417    /// - **Boolean expressions**: field metadata is empty
418    /// - **Cast expressions**: determined by the input expression's field metadata handling
419    /// - **Scalar functions**: Generate metadata via function's [`return_field_from_args`] method,
420    ///   with the default implementation returning empty field metadata
421    /// - **Aggregate functions**: Generate metadata via function's [`return_field`] method,
422    ///   with the default implementation returning empty field metadata
423    /// - **Window functions**: field metadata is empty
424    ///
425    /// ## Table Reference Scoping
426    /// - Establishes proper qualified field references when columns belong to specific tables
427    /// - Maintains table context for accurate field resolution in multi-table scenarios
428    ///
429    /// So for example, a projected expression `col(c1) + col(c2)` is
430    /// placed in an output field **named** col("c1 + c2")
431    ///
432    /// [`return_field_from_args`]: crate::ScalarUDF::return_field_from_args
433    /// [`return_field`]: crate::AggregateUDF::return_field
434    fn to_field(
435        &self,
436        schema: &dyn ExprSchema,
437    ) -> Result<(Option<TableReference>, Arc<Field>)> {
438        let (relation, schema_name) = self.qualified_name();
439        #[allow(deprecated)]
440        let field = match self {
441            Expr::Alias(Alias {
442                expr,
443                name: _,
444                metadata,
445                ..
446            }) => {
447                let field = expr.to_field(schema).map(|(_, f)| f.as_ref().clone())?;
448
449                let mut combined_metadata = expr.metadata(schema)?;
450                if let Some(metadata) = metadata {
451                    combined_metadata.extend(metadata.clone());
452                }
453
454                Ok(Arc::new(combined_metadata.add_to_field(field)))
455            }
456            Expr::Negative(expr) => expr.to_field(schema).map(|(_, f)| f),
457            Expr::Column(c) => schema.field_from_column(c).map(|f| Arc::new(f.clone())),
458            Expr::OuterReferenceColumn(field, _) => {
459                Ok(Arc::new(field.as_ref().clone().with_name(&schema_name)))
460            }
461            Expr::ScalarVariable(ty, _) => {
462                Ok(Arc::new(Field::new(&schema_name, ty.clone(), true)))
463            }
464            Expr::Literal(l, metadata) => {
465                let mut field = Field::new(&schema_name, l.data_type(), l.is_null());
466                if let Some(metadata) = metadata {
467                    field = metadata.add_to_field(field);
468                }
469                Ok(Arc::new(field))
470            }
471            Expr::IsNull(_)
472            | Expr::IsNotNull(_)
473            | Expr::IsTrue(_)
474            | Expr::IsFalse(_)
475            | Expr::IsUnknown(_)
476            | Expr::IsNotTrue(_)
477            | Expr::IsNotFalse(_)
478            | Expr::IsNotUnknown(_)
479            | Expr::Exists { .. } => {
480                Ok(Arc::new(Field::new(&schema_name, DataType::Boolean, false)))
481            }
482            Expr::ScalarSubquery(subquery) => {
483                Ok(Arc::clone(&subquery.subquery.schema().fields()[0]))
484            }
485            Expr::BinaryExpr(BinaryExpr {
486                ref left,
487                ref right,
488                ref op,
489            }) => {
490                let (lhs_type, lhs_nullable) = left.data_type_and_nullable(schema)?;
491                let (rhs_type, rhs_nullable) = right.data_type_and_nullable(schema)?;
492                let mut coercer = BinaryTypeCoercer::new(&lhs_type, op, &rhs_type);
493                coercer.set_lhs_spans(left.spans().cloned().unwrap_or_default());
494                coercer.set_rhs_spans(right.spans().cloned().unwrap_or_default());
495                Ok(Arc::new(Field::new(
496                    &schema_name,
497                    coercer.get_result_type()?,
498                    lhs_nullable || rhs_nullable,
499                )))
500            }
501            Expr::WindowFunction(window_function) => {
502                let (dt, nullable) = self.data_type_and_nullable_with_window_function(
503                    schema,
504                    window_function,
505                )?;
506                Ok(Arc::new(Field::new(&schema_name, dt, nullable)))
507            }
508            Expr::AggregateFunction(aggregate_function) => {
509                let AggregateFunction {
510                    func,
511                    params: AggregateFunctionParams { args, .. },
512                    ..
513                } = aggregate_function;
514
515                let fields = args
516                    .iter()
517                    .map(|e| e.to_field(schema).map(|(_, f)| f))
518                    .collect::<Result<Vec<_>>>()?;
519                // Verify that function is invoked with correct number and type of arguments as defined in `TypeSignature`
520                let new_fields = fields_with_aggregate_udf(&fields, func)
521                    .map_err(|err| {
522                        let arg_types = fields
523                            .iter()
524                            .map(|f| f.data_type())
525                            .cloned()
526                            .collect::<Vec<_>>();
527                        plan_datafusion_err!(
528                            "{} {}",
529                            match err {
530                                DataFusionError::Plan(msg) => msg,
531                                err => err.to_string(),
532                            },
533                            utils::generate_signature_error_msg(
534                                func.name(),
535                                func.signature().clone(),
536                                &arg_types,
537                            )
538                        )
539                    })?
540                    .into_iter()
541                    .collect::<Vec<_>>();
542
543                func.return_field(&new_fields)
544            }
545            // Expr::Lambda(Lambda { params, body}) => body.to_field(schema),
546            Expr::ScalarFunction(ScalarFunction { func, args }) => {
547                let fields = if args.iter().any(|arg| matches!(arg, Expr::Lambda(_))) {
548                    let lambdas_schemas = func.arguments_expr_schema(args, schema)?;
549
550                    std::iter::zip(args, lambdas_schemas)
551                        // .map(|(e, schema)| e.to_field(schema).map(|(_, f)| f))
552                        .map(|(e, schema)| match e {
553                            Expr::Lambda(Lambda { params: _, body }) => {
554                                body.to_field(&schema).map(|(_, f)| f)
555                            }
556                            _ => e.to_field(&schema).map(|(_, f)| f),
557                        })
558                        .collect::<Result<Vec<_>>>()?
559                } else {
560                    args.iter()
561                        .map(|e| e.to_field(schema).map(|(_, f)| f))
562                        .collect::<Result<Vec<_>>>()?
563                };
564
565                let arg_types = fields
566                    .iter()
567                    .map(|f| f.data_type().clone())
568                    .collect::<Vec<_>>();
569
570                // Verify that function is invoked with correct number and type of arguments as defined in `TypeSignature`
571                let new_data_types = data_types_with_scalar_udf(&arg_types, func)
572                    .map_err(|err| {
573                        plan_datafusion_err!(
574                            "{} {}",
575                            match err {
576                                DataFusionError::Plan(msg) => msg,
577                                err => err.to_string(),
578                            },
579                            utils::generate_signature_error_msg(
580                                func.name(),
581                                func.signature().clone(),
582                                &arg_types,
583                            )
584                        )
585                    })?;
586                let new_fields = fields
587                    .into_iter()
588                    .zip(new_data_types)
589                    .map(|(f, d)| f.as_ref().clone().with_data_type(d))
590                    .map(Arc::new)
591                    .collect::<Vec<FieldRef>>();
592
593                let arguments = args
594                    .iter()
595                    .map(|e| match e {
596                        Expr::Literal(sv, _) => Some(sv),
597                        _ => None,
598                    })
599                    .collect::<Vec<_>>();
600
601                let lambdas = args
602                    .iter()
603                    .map(|e| matches!(e, Expr::Lambda { .. }))
604                    .collect::<Vec<_>>();
605
606                let args = ReturnFieldArgs {
607                    arg_fields: &new_fields,
608                    scalar_arguments: &arguments,
609                    lambdas: &lambdas,
610                };
611
612                func.return_field_from_args(args)
613            }
614            // _ => Ok((self.get_type(schema)?, self.nullable(schema)?)),
615            Expr::Cast(Cast { expr, data_type }) => expr
616                .to_field(schema)
617                .map(|(_, f)| f.as_ref().clone().with_data_type(data_type.clone()))
618                .map(Arc::new),
619            Expr::Placeholder(Placeholder {
620                id: _,
621                field: Some(field),
622            }) => Ok(field.as_ref().clone().with_name(&schema_name).into()),
623            Expr::Like(_)
624            | Expr::SimilarTo(_)
625            | Expr::Not(_)
626            | Expr::Between(_)
627            | Expr::Case(_)
628            | Expr::TryCast(_)
629            | Expr::InList(_)
630            | Expr::InSubquery(_)
631            | Expr::Wildcard { .. }
632            | Expr::GroupingSet(_)
633            | Expr::Placeholder(_)
634            | Expr::Unnest(_)
635            | Expr::Lambda(_) => Ok(Arc::new(Field::new(
636                &schema_name,
637                self.get_type(schema)?,
638                self.nullable(schema)?,
639            ))),
640        }?;
641
642        Ok((
643            relation,
644            Arc::new(field.as_ref().clone().with_name(schema_name)),
645        ))
646    }
647
648    /// Wraps this expression in a cast to a target [arrow::datatypes::DataType].
649    ///
650    /// # Errors
651    ///
652    /// This function errors when it is impossible to cast the
653    /// expression to the target [arrow::datatypes::DataType].
654    fn cast_to(self, cast_to_type: &DataType, schema: &dyn ExprSchema) -> Result<Expr> {
655        let this_type = self.get_type(schema)?;
656        if this_type == *cast_to_type {
657            return Ok(self);
658        }
659
660        // TODO(kszucs): Most of the operations do not validate the type correctness
661        // like all of the binary expressions below. Perhaps Expr should track the
662        // type of the expression?
663
664        if can_cast_types(&this_type, cast_to_type) {
665            match self {
666                Expr::ScalarSubquery(subquery) => {
667                    Ok(Expr::ScalarSubquery(cast_subquery(subquery, cast_to_type)?))
668                }
669                _ => Ok(Expr::Cast(Cast::new(Box::new(self), cast_to_type.clone()))),
670            }
671        } else {
672            plan_err!("Cannot automatically convert {this_type} to {cast_to_type}")
673        }
674    }
675}
676
677impl Expr {
678    /// Common method for window functions that applies type coercion
679    /// to all arguments of the window function to check if it matches
680    /// its signature.
681    ///
682    /// If successful, this method returns the data type and
683    /// nullability of the window function's result.
684    ///
685    /// Otherwise, returns an error if there's a type mismatch between
686    /// the window function's signature and the provided arguments.
687    fn data_type_and_nullable_with_window_function(
688        &self,
689        schema: &dyn ExprSchema,
690        window_function: &WindowFunction,
691    ) -> Result<(DataType, bool)> {
692        let WindowFunction {
693            fun,
694            params: WindowFunctionParams { args, .. },
695            ..
696        } = window_function;
697
698        let fields = args
699            .iter()
700            .map(|e| e.to_field(schema).map(|(_, f)| f))
701            .collect::<Result<Vec<_>>>()?;
702        match fun {
703            WindowFunctionDefinition::AggregateUDF(udaf) => {
704                let data_types = fields
705                    .iter()
706                    .map(|f| f.data_type())
707                    .cloned()
708                    .collect::<Vec<_>>();
709                let new_fields = fields_with_aggregate_udf(&fields, udaf)
710                    .map_err(|err| {
711                        plan_datafusion_err!(
712                            "{} {}",
713                            match err {
714                                DataFusionError::Plan(msg) => msg,
715                                err => err.to_string(),
716                            },
717                            utils::generate_signature_error_msg(
718                                fun.name(),
719                                fun.signature(),
720                                &data_types
721                            )
722                        )
723                    })?
724                    .into_iter()
725                    .collect::<Vec<_>>();
726
727                let return_field = udaf.return_field(&new_fields)?;
728
729                Ok((return_field.data_type().clone(), return_field.is_nullable()))
730            }
731            WindowFunctionDefinition::WindowUDF(udwf) => {
732                let data_types = fields
733                    .iter()
734                    .map(|f| f.data_type())
735                    .cloned()
736                    .collect::<Vec<_>>();
737                let new_fields = fields_with_window_udf(&fields, udwf)
738                    .map_err(|err| {
739                        plan_datafusion_err!(
740                            "{} {}",
741                            match err {
742                                DataFusionError::Plan(msg) => msg,
743                                err => err.to_string(),
744                            },
745                            utils::generate_signature_error_msg(
746                                fun.name(),
747                                fun.signature(),
748                                &data_types
749                            )
750                        )
751                    })?
752                    .into_iter()
753                    .collect::<Vec<_>>();
754                let (_, function_name) = self.qualified_name();
755                let field_args = WindowUDFFieldArgs::new(&new_fields, &function_name);
756
757                udwf.field(field_args)
758                    .map(|field| (field.data_type().clone(), field.is_nullable()))
759            }
760        }
761    }
762}
763
764/// Cast subquery in InSubquery/ScalarSubquery to a given type.
765///
766/// 1. **Projection plan**: If the subquery is a projection (i.e. a SELECT statement with specific
767///    columns), it casts the first expression in the projection to the target type and creates a
768///    new projection with the casted expression.
769/// 2. **Non-projection plan**: If the subquery isn't a projection, it adds a projection to the plan
770///    with the casted first column.
771pub fn cast_subquery(subquery: Subquery, cast_to_type: &DataType) -> Result<Subquery> {
772    if subquery.subquery.schema().field(0).data_type() == cast_to_type {
773        return Ok(subquery);
774    }
775
776    let plan = subquery.subquery.as_ref();
777    let new_plan = match plan {
778        LogicalPlan::Projection(projection) => {
779            let cast_expr = projection.expr[0]
780                .clone()
781                .cast_to(cast_to_type, projection.input.schema())?;
782            LogicalPlan::Projection(Projection::try_new(
783                vec![cast_expr],
784                Arc::clone(&projection.input),
785            )?)
786        }
787        _ => {
788            let cast_expr = Expr::Column(Column::from(plan.schema().qualified_field(0)))
789                .cast_to(cast_to_type, subquery.subquery.schema())?;
790            LogicalPlan::Projection(Projection::try_new(
791                vec![cast_expr],
792                subquery.subquery,
793            )?)
794        }
795    };
796    Ok(Subquery {
797        subquery: Arc::new(new_plan),
798        outer_ref_columns: subquery.outer_ref_columns,
799        spans: Spans::new(),
800    })
801}
802
803#[cfg(test)]
804mod tests {
805    use std::collections::HashMap;
806
807    use super::*;
808    use crate::{col, lit, out_ref_col_with_metadata};
809
810    use datafusion_common::{internal_err, DFSchema, ScalarValue};
811
812    macro_rules! test_is_expr_nullable {
813        ($EXPR_TYPE:ident) => {{
814            let expr = lit(ScalarValue::Null).$EXPR_TYPE();
815            assert!(!expr.nullable(&MockExprSchema::new()).unwrap());
816        }};
817    }
818
819    #[test]
820    fn expr_schema_nullability() {
821        let expr = col("foo").eq(lit(1));
822        assert!(!expr.nullable(&MockExprSchema::new()).unwrap());
823        assert!(expr
824            .nullable(&MockExprSchema::new().with_nullable(true))
825            .unwrap());
826
827        test_is_expr_nullable!(is_null);
828        test_is_expr_nullable!(is_not_null);
829        test_is_expr_nullable!(is_true);
830        test_is_expr_nullable!(is_not_true);
831        test_is_expr_nullable!(is_false);
832        test_is_expr_nullable!(is_not_false);
833        test_is_expr_nullable!(is_unknown);
834        test_is_expr_nullable!(is_not_unknown);
835    }
836
837    #[test]
838    fn test_between_nullability() {
839        let get_schema = |nullable| {
840            MockExprSchema::new()
841                .with_data_type(DataType::Int32)
842                .with_nullable(nullable)
843        };
844
845        let expr = col("foo").between(lit(1), lit(2));
846        assert!(!expr.nullable(&get_schema(false)).unwrap());
847        assert!(expr.nullable(&get_schema(true)).unwrap());
848
849        let null = lit(ScalarValue::Int32(None));
850
851        let expr = col("foo").between(null.clone(), lit(2));
852        assert!(expr.nullable(&get_schema(false)).unwrap());
853
854        let expr = col("foo").between(lit(1), null.clone());
855        assert!(expr.nullable(&get_schema(false)).unwrap());
856
857        let expr = col("foo").between(null.clone(), null);
858        assert!(expr.nullable(&get_schema(false)).unwrap());
859    }
860
861    #[test]
862    fn test_inlist_nullability() {
863        let get_schema = |nullable| {
864            MockExprSchema::new()
865                .with_data_type(DataType::Int32)
866                .with_nullable(nullable)
867        };
868
869        let expr = col("foo").in_list(vec![lit(1); 5], false);
870        assert!(!expr.nullable(&get_schema(false)).unwrap());
871        assert!(expr.nullable(&get_schema(true)).unwrap());
872        // Testing nullable() returns an error.
873        assert!(expr
874            .nullable(&get_schema(false).with_error_on_nullable(true))
875            .is_err());
876
877        let null = lit(ScalarValue::Int32(None));
878        let expr = col("foo").in_list(vec![null, lit(1)], false);
879        assert!(expr.nullable(&get_schema(false)).unwrap());
880
881        // Testing on long list
882        let expr = col("foo").in_list(vec![lit(1); 6], false);
883        assert!(expr.nullable(&get_schema(false)).unwrap());
884    }
885
886    #[test]
887    fn test_like_nullability() {
888        let get_schema = |nullable| {
889            MockExprSchema::new()
890                .with_data_type(DataType::Utf8)
891                .with_nullable(nullable)
892        };
893
894        let expr = col("foo").like(lit("bar"));
895        assert!(!expr.nullable(&get_schema(false)).unwrap());
896        assert!(expr.nullable(&get_schema(true)).unwrap());
897
898        let expr = col("foo").like(lit(ScalarValue::Utf8(None)));
899        assert!(expr.nullable(&get_schema(false)).unwrap());
900    }
901
902    #[test]
903    fn expr_schema_data_type() {
904        let expr = col("foo");
905        assert_eq!(
906            DataType::Utf8,
907            expr.get_type(&MockExprSchema::new().with_data_type(DataType::Utf8))
908                .unwrap()
909        );
910    }
911
912    #[test]
913    fn test_expr_metadata() {
914        let mut meta = HashMap::new();
915        meta.insert("bar".to_string(), "buzz".to_string());
916        let meta = FieldMetadata::from(meta);
917        let expr = col("foo");
918        let schema = MockExprSchema::new()
919            .with_data_type(DataType::Int32)
920            .with_metadata(meta.clone());
921
922        // col, alias, and cast should be metadata-preserving
923        assert_eq!(meta, expr.metadata(&schema).unwrap());
924        assert_eq!(meta, expr.clone().alias("bar").metadata(&schema).unwrap());
925        assert_eq!(
926            meta,
927            expr.clone()
928                .cast_to(&DataType::Int64, &schema)
929                .unwrap()
930                .metadata(&schema)
931                .unwrap()
932        );
933
934        let schema = DFSchema::from_unqualified_fields(
935            vec![meta.add_to_field(Field::new("foo", DataType::Int32, true))].into(),
936            HashMap::new(),
937        )
938        .unwrap();
939
940        // verify to_field method populates metadata
941        assert_eq!(meta, expr.metadata(&schema).unwrap());
942
943        // outer ref constructed by `out_ref_col_with_metadata` should be metadata-preserving
944        let outer_ref = out_ref_col_with_metadata(
945            DataType::Int32,
946            meta.to_hashmap(),
947            Column::from_name("foo"),
948        );
949        assert_eq!(meta, outer_ref.metadata(&schema).unwrap());
950    }
951
952    #[test]
953    fn test_expr_placeholder() {
954        let schema = MockExprSchema::new();
955
956        let mut placeholder_meta = HashMap::new();
957        placeholder_meta.insert("bar".to_string(), "buzz".to_string());
958        let placeholder_meta = FieldMetadata::from(placeholder_meta);
959
960        let expr = Expr::Placeholder(Placeholder::new_with_field(
961            "".to_string(),
962            Some(
963                Field::new("", DataType::Utf8, true)
964                    .with_metadata(placeholder_meta.to_hashmap())
965                    .into(),
966            ),
967        ));
968
969        assert_eq!(
970            expr.data_type_and_nullable(&schema).unwrap(),
971            (DataType::Utf8, true)
972        );
973        assert_eq!(placeholder_meta, expr.metadata(&schema).unwrap());
974
975        let expr_alias = expr.alias("a placeholder by any other name");
976        assert_eq!(
977            expr_alias.data_type_and_nullable(&schema).unwrap(),
978            (DataType::Utf8, true)
979        );
980        assert_eq!(placeholder_meta, expr_alias.metadata(&schema).unwrap());
981
982        // Non-nullable placeholder field should remain non-nullable
983        let expr = Expr::Placeholder(Placeholder::new_with_field(
984            "".to_string(),
985            Some(Field::new("", DataType::Utf8, false).into()),
986        ));
987        assert_eq!(
988            expr.data_type_and_nullable(&schema).unwrap(),
989            (DataType::Utf8, false)
990        );
991        let expr_alias = expr.alias("a placeholder by any other name");
992        assert_eq!(
993            expr_alias.data_type_and_nullable(&schema).unwrap(),
994            (DataType::Utf8, false)
995        );
996    }
997
998    #[derive(Debug)]
999    struct MockExprSchema {
1000        field: Field,
1001        error_on_nullable: bool,
1002    }
1003
1004    impl MockExprSchema {
1005        fn new() -> Self {
1006            Self {
1007                field: Field::new("mock_field", DataType::Null, false),
1008                error_on_nullable: false,
1009            }
1010        }
1011
1012        fn with_nullable(mut self, nullable: bool) -> Self {
1013            self.field = self.field.with_nullable(nullable);
1014            self
1015        }
1016
1017        fn with_data_type(mut self, data_type: DataType) -> Self {
1018            self.field = self.field.with_data_type(data_type);
1019            self
1020        }
1021
1022        fn with_error_on_nullable(mut self, error_on_nullable: bool) -> Self {
1023            self.error_on_nullable = error_on_nullable;
1024            self
1025        }
1026
1027        fn with_metadata(mut self, metadata: FieldMetadata) -> Self {
1028            self.field = metadata.add_to_field(self.field);
1029            self
1030        }
1031    }
1032
1033    impl ExprSchema for MockExprSchema {
1034        fn nullable(&self, _col: &Column) -> Result<bool> {
1035            if self.error_on_nullable {
1036                internal_err!("nullable error")
1037            } else {
1038                Ok(self.field.is_nullable())
1039            }
1040        }
1041
1042        fn field_from_column(&self, _col: &Column) -> Result<&Field> {
1043            Ok(&self.field)
1044        }
1045    }
1046}