datafusion_physical_expr/
planner.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use crate::expressions::LambdaExpr;
21use crate::ScalarFunctionExpr;
22use crate::{
23    expressions::{self, binary, like, similar_to, Column, Literal},
24    PhysicalExpr,
25};
26
27use arrow::datatypes::Schema;
28use datafusion_common::config::ConfigOptions;
29use datafusion_common::metadata::FieldMetadata;
30use datafusion_common::{
31    exec_err, not_impl_err, plan_err, DFSchema, Result, ScalarValue, ToDFSchema,
32};
33use datafusion_expr::execution_props::ExecutionProps;
34use datafusion_expr::expr::{Alias, Cast, InList, Lambda, Placeholder, ScalarFunction};
35use datafusion_expr::var_provider::is_system_variables;
36use datafusion_expr::var_provider::VarType;
37use datafusion_expr::{
38    binary_expr, lit, Between, BinaryExpr, Expr, Like, Operator, TryCast,
39};
40
41/// [PhysicalExpr] evaluate DataFusion expressions such as `A + 1`, or `CAST(c1
42/// AS int)`.
43///
44/// [PhysicalExpr] are the physical counterpart to [Expr] used in logical
45/// planning, and can be evaluated directly on a [RecordBatch]. They are
46/// normally created from [Expr] by a [PhysicalPlanner] and can be created
47/// directly using [create_physical_expr].
48///
49/// A Physical expression knows its type, nullability and how to evaluate itself.
50///
51/// [PhysicalPlanner]: https://docs.rs/datafusion/latest/datafusion/physical_planner/trait.PhysicalPlanner.html
52/// [RecordBatch]: https://docs.rs/arrow/latest/arrow/record_batch/struct.RecordBatch.html
53///
54/// # Example: Create `PhysicalExpr` from `Expr`
55/// ```
56/// # use arrow::datatypes::{DataType, Field, Schema};
57/// # use datafusion_common::DFSchema;
58/// # use datafusion_expr::{Expr, col, lit};
59/// # use datafusion_physical_expr::create_physical_expr;
60/// # use datafusion_expr::execution_props::ExecutionProps;
61/// // For a logical expression `a = 1`, we can create a physical expression
62/// let expr = col("a").eq(lit(1));
63/// // To create a PhysicalExpr we need 1. a schema
64/// let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
65/// let df_schema = DFSchema::try_from(schema).unwrap();
66/// // 2. ExecutionProps
67/// let props = ExecutionProps::new();
68/// // We can now create a PhysicalExpr:
69/// let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap();
70/// ```
71///
72/// # Example: Executing a PhysicalExpr to obtain [ColumnarValue]
73/// ```
74/// # use std::sync::Arc;
75/// # use arrow::array::{cast::AsArray, BooleanArray, Int32Array, RecordBatch};
76/// # use arrow::datatypes::{DataType, Field, Schema};
77/// # use datafusion_common::{assert_batches_eq, DFSchema};
78/// # use datafusion_expr::{Expr, col, lit, ColumnarValue};
79/// # use datafusion_physical_expr::create_physical_expr;
80/// # use datafusion_expr::execution_props::ExecutionProps;
81/// # let expr = col("a").eq(lit(1));
82/// # let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
83/// # let df_schema = DFSchema::try_from(schema.clone()).unwrap();
84/// # let props = ExecutionProps::new();
85/// // Given a PhysicalExpr, for `a = 1` we can evaluate it against a RecordBatch like this:
86/// let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap();
87/// // Input of [1,2,3]
88/// let input_batch = RecordBatch::try_from_iter(vec![
89///   ("a", Arc::new(Int32Array::from(vec![1, 2, 3])) as _)
90/// ]).unwrap();
91/// // The result is a ColumnarValue (either an Array or a Scalar)
92/// let result = physical_expr.evaluate(&input_batch).unwrap();
93/// // In this case, a BooleanArray with the result of the comparison
94/// let ColumnarValue::Array(arr) = result else {
95///  panic!("Expected an array")
96/// };
97/// assert_eq!(arr.as_boolean(), &BooleanArray::from(vec![true, false, false]));
98/// ```
99///
100/// [ColumnarValue]: datafusion_expr::ColumnarValue
101///
102/// Create a physical expression from a logical expression ([Expr]).
103///
104/// # Arguments
105///
106/// * `e` - The logical expression
107/// * `input_dfschema` - The DataFusion schema for the input, used to resolve `Column` references
108///   to qualified or unqualified fields by name. Note that for creating a lambda, this must be
109///   scoped lambda schema, and not the outer schema
110pub fn create_physical_expr(
111    e: &Expr,
112    input_dfschema: &DFSchema,
113    execution_props: &ExecutionProps,
114) -> Result<Arc<dyn PhysicalExpr>> {
115    let input_schema = input_dfschema.as_arrow();
116
117    match e {
118        Expr::Alias(Alias { expr, metadata, .. }) => {
119            if let Expr::Literal(v, prior_metadata) = expr.as_ref() {
120                let new_metadata = FieldMetadata::merge_options(
121                    prior_metadata.as_ref(),
122                    metadata.as_ref(),
123                );
124                Ok(Arc::new(Literal::new_with_metadata(
125                    v.clone(),
126                    new_metadata,
127                )))
128            } else {
129                Ok(create_physical_expr(expr, input_dfschema, execution_props)?)
130            }
131        }
132        Expr::Column(c) => {
133            let idx = input_dfschema.index_of_column(c)?;
134            Ok(Arc::new(Column::new(&c.name, idx)))
135        }
136        Expr::Literal(value, metadata) => Ok(Arc::new(Literal::new_with_metadata(
137            value.clone(),
138            metadata.clone(),
139        ))),
140        Expr::ScalarVariable(_, variable_names) => {
141            if is_system_variables(variable_names) {
142                match execution_props.get_var_provider(VarType::System) {
143                    Some(provider) => {
144                        let scalar_value = provider.get_value(variable_names.clone())?;
145                        Ok(Arc::new(Literal::new(scalar_value)))
146                    }
147                    _ => plan_err!("No system variable provider found"),
148                }
149            } else {
150                match execution_props.get_var_provider(VarType::UserDefined) {
151                    Some(provider) => {
152                        let scalar_value = provider.get_value(variable_names.clone())?;
153                        Ok(Arc::new(Literal::new(scalar_value)))
154                    }
155                    _ => plan_err!("No user defined variable provider found"),
156                }
157            }
158        }
159        Expr::IsTrue(expr) => {
160            let binary_op = binary_expr(
161                expr.as_ref().clone(),
162                Operator::IsNotDistinctFrom,
163                lit(true),
164            );
165            create_physical_expr(&binary_op, input_dfschema, execution_props)
166        }
167        Expr::IsNotTrue(expr) => {
168            let binary_op =
169                binary_expr(expr.as_ref().clone(), Operator::IsDistinctFrom, lit(true));
170            create_physical_expr(&binary_op, input_dfschema, execution_props)
171        }
172        Expr::IsFalse(expr) => {
173            let binary_op = binary_expr(
174                expr.as_ref().clone(),
175                Operator::IsNotDistinctFrom,
176                lit(false),
177            );
178            create_physical_expr(&binary_op, input_dfschema, execution_props)
179        }
180        Expr::IsNotFalse(expr) => {
181            let binary_op =
182                binary_expr(expr.as_ref().clone(), Operator::IsDistinctFrom, lit(false));
183            create_physical_expr(&binary_op, input_dfschema, execution_props)
184        }
185        Expr::IsUnknown(expr) => {
186            let binary_op = binary_expr(
187                expr.as_ref().clone(),
188                Operator::IsNotDistinctFrom,
189                Expr::Literal(ScalarValue::Boolean(None), None),
190            );
191            create_physical_expr(&binary_op, input_dfschema, execution_props)
192        }
193        Expr::IsNotUnknown(expr) => {
194            let binary_op = binary_expr(
195                expr.as_ref().clone(),
196                Operator::IsDistinctFrom,
197                Expr::Literal(ScalarValue::Boolean(None), None),
198            );
199            create_physical_expr(&binary_op, input_dfschema, execution_props)
200        }
201        Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
202            // Create physical expressions for left and right operands
203            let lhs = create_physical_expr(left, input_dfschema, execution_props)?;
204            let rhs = create_physical_expr(right, input_dfschema, execution_props)?;
205            // Note that the logical planner is responsible
206            // for type coercion on the arguments (e.g. if one
207            // argument was originally Int32 and one was
208            // Int64 they will both be coerced to Int64).
209            //
210            // There should be no coercion during physical
211            // planning.
212            binary(lhs, *op, rhs, input_schema)
213        }
214        Expr::Like(Like {
215            negated,
216            expr,
217            pattern,
218            escape_char,
219            case_insensitive,
220        }) => {
221            // `\` is the implicit escape, see https://github.com/apache/datafusion/issues/13291
222            if escape_char.unwrap_or('\\') != '\\' {
223                return exec_err!(
224                    "LIKE does not support escape_char other than the backslash (\\)"
225                );
226            }
227            let physical_expr =
228                create_physical_expr(expr, input_dfschema, execution_props)?;
229            let physical_pattern =
230                create_physical_expr(pattern, input_dfschema, execution_props)?;
231            like(
232                *negated,
233                *case_insensitive,
234                physical_expr,
235                physical_pattern,
236                input_schema,
237            )
238        }
239        Expr::SimilarTo(Like {
240            negated,
241            expr,
242            pattern,
243            escape_char,
244            case_insensitive,
245        }) => {
246            if escape_char.is_some() {
247                return exec_err!("SIMILAR TO does not support escape_char yet");
248            }
249            let physical_expr =
250                create_physical_expr(expr, input_dfschema, execution_props)?;
251            let physical_pattern =
252                create_physical_expr(pattern, input_dfschema, execution_props)?;
253            similar_to(*negated, *case_insensitive, physical_expr, physical_pattern)
254        }
255        Expr::Case(case) => {
256            let expr: Option<Arc<dyn PhysicalExpr>> = if let Some(e) = &case.expr {
257                Some(create_physical_expr(
258                    e.as_ref(),
259                    input_dfschema,
260                    execution_props,
261                )?)
262            } else {
263                None
264            };
265            let (when_expr, then_expr): (Vec<&Expr>, Vec<&Expr>) = case
266                .when_then_expr
267                .iter()
268                .map(|(w, t)| (w.as_ref(), t.as_ref()))
269                .unzip();
270            let when_expr =
271                create_physical_exprs(when_expr, input_dfschema, execution_props)?;
272            let then_expr =
273                create_physical_exprs(then_expr, input_dfschema, execution_props)?;
274            let when_then_expr: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> =
275                when_expr
276                    .iter()
277                    .zip(then_expr.iter())
278                    .map(|(w, t)| (Arc::clone(w), Arc::clone(t)))
279                    .collect();
280            let else_expr: Option<Arc<dyn PhysicalExpr>> =
281                if let Some(e) = &case.else_expr {
282                    Some(create_physical_expr(
283                        e.as_ref(),
284                        input_dfschema,
285                        execution_props,
286                    )?)
287                } else {
288                    None
289                };
290            Ok(expressions::case(expr, when_then_expr, else_expr)?)
291        }
292        Expr::Cast(Cast { expr, data_type }) => expressions::cast(
293            create_physical_expr(expr, input_dfschema, execution_props)?,
294            input_schema,
295            data_type.clone(),
296        ),
297        Expr::TryCast(TryCast { expr, data_type }) => expressions::try_cast(
298            create_physical_expr(expr, input_dfschema, execution_props)?,
299            input_schema,
300            data_type.clone(),
301        ),
302        Expr::Not(expr) => {
303            expressions::not(create_physical_expr(expr, input_dfschema, execution_props)?)
304        }
305        Expr::Negative(expr) => expressions::negative(
306            create_physical_expr(expr, input_dfschema, execution_props)?,
307            input_schema,
308        ),
309        Expr::IsNull(expr) => expressions::is_null(create_physical_expr(
310            expr,
311            input_dfschema,
312            execution_props,
313        )?),
314        Expr::IsNotNull(expr) => expressions::is_not_null(create_physical_expr(
315            expr,
316            input_dfschema,
317            execution_props,
318        )?),
319        Expr::Lambda { .. } => {
320            exec_err!("Expr::Lambda should be handled by Expr::ScalarFunction, as it can only exist within it")
321        }
322        Expr::ScalarFunction(ScalarFunction { func, args }) => {
323            let lambdas_schemas =
324                func.arguments_schema_from_logical_args(args, input_dfschema)?;
325
326            let physical_args = std::iter::zip(args, lambdas_schemas)
327                .map(|(expr, schema)| match expr {
328                    Expr::Lambda(Lambda { params, body }) => {
329                        Ok(Arc::new(LambdaExpr::new(
330                            params.clone(),
331                            create_physical_expr(body, &schema, execution_props)?,
332                        )) as Arc<dyn PhysicalExpr>)
333                    }
334                    expr => create_physical_expr(expr, &schema, execution_props),
335                })
336                .collect::<Result<Vec<_>>>()?;
337
338            //let physical_args =
339            //    create_physical_exprs(args, input_dfschema, execution_props)?;
340
341            let config_options = match execution_props.config_options.as_ref() {
342                Some(config_options) => Arc::clone(config_options),
343                None => Arc::new(ConfigOptions::default()),
344            };
345
346            Ok(Arc::new(ScalarFunctionExpr::try_new(
347                Arc::clone(func),
348                physical_args,
349                input_schema,
350                config_options,
351            )?))
352        }
353        Expr::Between(Between {
354            expr,
355            negated,
356            low,
357            high,
358        }) => {
359            let value_expr = create_physical_expr(expr, input_dfschema, execution_props)?;
360            let low_expr = create_physical_expr(low, input_dfschema, execution_props)?;
361            let high_expr = create_physical_expr(high, input_dfschema, execution_props)?;
362
363            // rewrite the between into the two binary operators
364            let binary_expr = binary(
365                binary(
366                    Arc::clone(&value_expr),
367                    Operator::GtEq,
368                    low_expr,
369                    input_schema,
370                )?,
371                Operator::And,
372                binary(
373                    Arc::clone(&value_expr),
374                    Operator::LtEq,
375                    high_expr,
376                    input_schema,
377                )?,
378                input_schema,
379            );
380
381            if *negated {
382                expressions::not(binary_expr?)
383            } else {
384                binary_expr
385            }
386        }
387        Expr::InList(InList {
388            expr,
389            list,
390            negated,
391        }) => match expr.as_ref() {
392            Expr::Literal(ScalarValue::Utf8(None), _) => {
393                Ok(expressions::lit(ScalarValue::Boolean(None)))
394            }
395            _ => {
396                let value_expr =
397                    create_physical_expr(expr, input_dfschema, execution_props)?;
398
399                let list_exprs =
400                    create_physical_exprs(list, input_dfschema, execution_props)?;
401                expressions::in_list(value_expr, list_exprs, negated, input_schema)
402            }
403        },
404        Expr::Placeholder(Placeholder { id, .. }) => {
405            exec_err!("Placeholder '{id}' was not provided a value for execution.")
406        }
407        other => {
408            not_impl_err!("Physical plan does not support logical expression {other:?}")
409        }
410    }
411}
412
413/// Create vector of Physical Expression from a vector of logical expression
414pub fn create_physical_exprs<'a, I>(
415    exprs: I,
416    input_dfschema: &DFSchema,
417    execution_props: &ExecutionProps,
418) -> Result<Vec<Arc<dyn PhysicalExpr>>>
419where
420    I: IntoIterator<Item = &'a Expr>,
421{
422    exprs
423        .into_iter()
424        .map(|expr| create_physical_expr(expr, input_dfschema, execution_props))
425        .collect()
426}
427
428/// Convert a logical expression to a physical expression (without any simplification, etc)
429pub fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr> {
430    // TODO this makes a deep copy of the Schema. Should take SchemaRef instead and avoid deep copy
431    let df_schema = schema.clone().to_dfschema().unwrap();
432    let execution_props = ExecutionProps::new();
433    create_physical_expr(expr, &df_schema, &execution_props).unwrap()
434}
435
436#[cfg(test)]
437mod tests {
438    use arrow::array::{ArrayRef, BooleanArray, RecordBatch, StringArray};
439    use arrow::datatypes::{DataType, Field};
440
441    use datafusion_expr::{col, lit};
442
443    use super::*;
444
445    #[test]
446    fn test_create_physical_expr_scalar_input_output() -> Result<()> {
447        let expr = col("letter").eq(lit("A"));
448
449        let schema = Schema::new(vec![Field::new("letter", DataType::Utf8, false)]);
450        let df_schema = DFSchema::try_from_qualified_schema("data", &schema)?;
451        let p = create_physical_expr(&expr, &df_schema, &ExecutionProps::new())?;
452
453        let batch = RecordBatch::try_new(
454            Arc::new(schema),
455            vec![Arc::new(StringArray::from_iter_values(vec![
456                "A", "B", "C", "D",
457            ]))],
458        )?;
459        let result = p.evaluate(&batch)?;
460        let result = result.into_array(4).expect("Failed to convert to array");
461
462        assert_eq!(
463            &result,
464            &(Arc::new(BooleanArray::from(vec![true, false, false, false,])) as ArrayRef)
465        );
466
467        Ok(())
468    }
469}