1use std::sync::Arc;
19
20use crate::expressions::LambdaExpr;
21use crate::ScalarFunctionExpr;
22use crate::{
23 expressions::{self, binary, like, similar_to, Column, Literal},
24 PhysicalExpr,
25};
26
27use arrow::datatypes::Schema;
28use datafusion_common::config::ConfigOptions;
29use datafusion_common::metadata::FieldMetadata;
30use datafusion_common::{
31 exec_err, not_impl_err, plan_err, DFSchema, Result, ScalarValue, ToDFSchema,
32};
33use datafusion_expr::execution_props::ExecutionProps;
34use datafusion_expr::expr::{Alias, Cast, InList, Lambda, Placeholder, ScalarFunction};
35use datafusion_expr::var_provider::is_system_variables;
36use datafusion_expr::var_provider::VarType;
37use datafusion_expr::{
38 binary_expr, lit, Between, BinaryExpr, Expr, Like, Operator, TryCast,
39};
40
41pub fn create_physical_expr(
111 e: &Expr,
112 input_dfschema: &DFSchema,
113 execution_props: &ExecutionProps,
114) -> Result<Arc<dyn PhysicalExpr>> {
115 let input_schema = input_dfschema.as_arrow();
116
117 match e {
118 Expr::Alias(Alias { expr, metadata, .. }) => {
119 if let Expr::Literal(v, prior_metadata) = expr.as_ref() {
120 let new_metadata = FieldMetadata::merge_options(
121 prior_metadata.as_ref(),
122 metadata.as_ref(),
123 );
124 Ok(Arc::new(Literal::new_with_metadata(
125 v.clone(),
126 new_metadata,
127 )))
128 } else {
129 Ok(create_physical_expr(expr, input_dfschema, execution_props)?)
130 }
131 }
132 Expr::Column(c) => {
133 let idx = input_dfschema.index_of_column(c)?;
134 Ok(Arc::new(Column::new(&c.name, idx)))
135 }
136 Expr::Literal(value, metadata) => Ok(Arc::new(Literal::new_with_metadata(
137 value.clone(),
138 metadata.clone(),
139 ))),
140 Expr::ScalarVariable(_, variable_names) => {
141 if is_system_variables(variable_names) {
142 match execution_props.get_var_provider(VarType::System) {
143 Some(provider) => {
144 let scalar_value = provider.get_value(variable_names.clone())?;
145 Ok(Arc::new(Literal::new(scalar_value)))
146 }
147 _ => plan_err!("No system variable provider found"),
148 }
149 } else {
150 match execution_props.get_var_provider(VarType::UserDefined) {
151 Some(provider) => {
152 let scalar_value = provider.get_value(variable_names.clone())?;
153 Ok(Arc::new(Literal::new(scalar_value)))
154 }
155 _ => plan_err!("No user defined variable provider found"),
156 }
157 }
158 }
159 Expr::IsTrue(expr) => {
160 let binary_op = binary_expr(
161 expr.as_ref().clone(),
162 Operator::IsNotDistinctFrom,
163 lit(true),
164 );
165 create_physical_expr(&binary_op, input_dfschema, execution_props)
166 }
167 Expr::IsNotTrue(expr) => {
168 let binary_op =
169 binary_expr(expr.as_ref().clone(), Operator::IsDistinctFrom, lit(true));
170 create_physical_expr(&binary_op, input_dfschema, execution_props)
171 }
172 Expr::IsFalse(expr) => {
173 let binary_op = binary_expr(
174 expr.as_ref().clone(),
175 Operator::IsNotDistinctFrom,
176 lit(false),
177 );
178 create_physical_expr(&binary_op, input_dfschema, execution_props)
179 }
180 Expr::IsNotFalse(expr) => {
181 let binary_op =
182 binary_expr(expr.as_ref().clone(), Operator::IsDistinctFrom, lit(false));
183 create_physical_expr(&binary_op, input_dfschema, execution_props)
184 }
185 Expr::IsUnknown(expr) => {
186 let binary_op = binary_expr(
187 expr.as_ref().clone(),
188 Operator::IsNotDistinctFrom,
189 Expr::Literal(ScalarValue::Boolean(None), None),
190 );
191 create_physical_expr(&binary_op, input_dfschema, execution_props)
192 }
193 Expr::IsNotUnknown(expr) => {
194 let binary_op = binary_expr(
195 expr.as_ref().clone(),
196 Operator::IsDistinctFrom,
197 Expr::Literal(ScalarValue::Boolean(None), None),
198 );
199 create_physical_expr(&binary_op, input_dfschema, execution_props)
200 }
201 Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
202 let lhs = create_physical_expr(left, input_dfschema, execution_props)?;
204 let rhs = create_physical_expr(right, input_dfschema, execution_props)?;
205 binary(lhs, *op, rhs, input_schema)
213 }
214 Expr::Like(Like {
215 negated,
216 expr,
217 pattern,
218 escape_char,
219 case_insensitive,
220 }) => {
221 if escape_char.unwrap_or('\\') != '\\' {
223 return exec_err!(
224 "LIKE does not support escape_char other than the backslash (\\)"
225 );
226 }
227 let physical_expr =
228 create_physical_expr(expr, input_dfschema, execution_props)?;
229 let physical_pattern =
230 create_physical_expr(pattern, input_dfschema, execution_props)?;
231 like(
232 *negated,
233 *case_insensitive,
234 physical_expr,
235 physical_pattern,
236 input_schema,
237 )
238 }
239 Expr::SimilarTo(Like {
240 negated,
241 expr,
242 pattern,
243 escape_char,
244 case_insensitive,
245 }) => {
246 if escape_char.is_some() {
247 return exec_err!("SIMILAR TO does not support escape_char yet");
248 }
249 let physical_expr =
250 create_physical_expr(expr, input_dfschema, execution_props)?;
251 let physical_pattern =
252 create_physical_expr(pattern, input_dfschema, execution_props)?;
253 similar_to(*negated, *case_insensitive, physical_expr, physical_pattern)
254 }
255 Expr::Case(case) => {
256 let expr: Option<Arc<dyn PhysicalExpr>> = if let Some(e) = &case.expr {
257 Some(create_physical_expr(
258 e.as_ref(),
259 input_dfschema,
260 execution_props,
261 )?)
262 } else {
263 None
264 };
265 let (when_expr, then_expr): (Vec<&Expr>, Vec<&Expr>) = case
266 .when_then_expr
267 .iter()
268 .map(|(w, t)| (w.as_ref(), t.as_ref()))
269 .unzip();
270 let when_expr =
271 create_physical_exprs(when_expr, input_dfschema, execution_props)?;
272 let then_expr =
273 create_physical_exprs(then_expr, input_dfschema, execution_props)?;
274 let when_then_expr: Vec<(Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>)> =
275 when_expr
276 .iter()
277 .zip(then_expr.iter())
278 .map(|(w, t)| (Arc::clone(w), Arc::clone(t)))
279 .collect();
280 let else_expr: Option<Arc<dyn PhysicalExpr>> =
281 if let Some(e) = &case.else_expr {
282 Some(create_physical_expr(
283 e.as_ref(),
284 input_dfschema,
285 execution_props,
286 )?)
287 } else {
288 None
289 };
290 Ok(expressions::case(expr, when_then_expr, else_expr)?)
291 }
292 Expr::Cast(Cast { expr, data_type }) => expressions::cast(
293 create_physical_expr(expr, input_dfschema, execution_props)?,
294 input_schema,
295 data_type.clone(),
296 ),
297 Expr::TryCast(TryCast { expr, data_type }) => expressions::try_cast(
298 create_physical_expr(expr, input_dfschema, execution_props)?,
299 input_schema,
300 data_type.clone(),
301 ),
302 Expr::Not(expr) => {
303 expressions::not(create_physical_expr(expr, input_dfschema, execution_props)?)
304 }
305 Expr::Negative(expr) => expressions::negative(
306 create_physical_expr(expr, input_dfschema, execution_props)?,
307 input_schema,
308 ),
309 Expr::IsNull(expr) => expressions::is_null(create_physical_expr(
310 expr,
311 input_dfschema,
312 execution_props,
313 )?),
314 Expr::IsNotNull(expr) => expressions::is_not_null(create_physical_expr(
315 expr,
316 input_dfschema,
317 execution_props,
318 )?),
319 Expr::Lambda { .. } => {
320 exec_err!("Expr::Lambda should be handled by Expr::ScalarFunction, as it can only exist within it")
321 }
322 Expr::ScalarFunction(ScalarFunction { func, args }) => {
323 let lambdas_schemas =
324 func.arguments_schema_from_logical_args(args, input_dfschema)?;
325
326 let physical_args = std::iter::zip(args, lambdas_schemas)
327 .map(|(expr, schema)| match expr {
328 Expr::Lambda(Lambda { params, body }) => {
329 Ok(Arc::new(LambdaExpr::new(
330 params.clone(),
331 create_physical_expr(body, &schema, execution_props)?,
332 )) as Arc<dyn PhysicalExpr>)
333 }
334 expr => create_physical_expr(expr, &schema, execution_props),
335 })
336 .collect::<Result<Vec<_>>>()?;
337
338 let config_options = match execution_props.config_options.as_ref() {
342 Some(config_options) => Arc::clone(config_options),
343 None => Arc::new(ConfigOptions::default()),
344 };
345
346 Ok(Arc::new(ScalarFunctionExpr::try_new(
347 Arc::clone(func),
348 physical_args,
349 input_schema,
350 config_options,
351 )?))
352 }
353 Expr::Between(Between {
354 expr,
355 negated,
356 low,
357 high,
358 }) => {
359 let value_expr = create_physical_expr(expr, input_dfschema, execution_props)?;
360 let low_expr = create_physical_expr(low, input_dfschema, execution_props)?;
361 let high_expr = create_physical_expr(high, input_dfschema, execution_props)?;
362
363 let binary_expr = binary(
365 binary(
366 Arc::clone(&value_expr),
367 Operator::GtEq,
368 low_expr,
369 input_schema,
370 )?,
371 Operator::And,
372 binary(
373 Arc::clone(&value_expr),
374 Operator::LtEq,
375 high_expr,
376 input_schema,
377 )?,
378 input_schema,
379 );
380
381 if *negated {
382 expressions::not(binary_expr?)
383 } else {
384 binary_expr
385 }
386 }
387 Expr::InList(InList {
388 expr,
389 list,
390 negated,
391 }) => match expr.as_ref() {
392 Expr::Literal(ScalarValue::Utf8(None), _) => {
393 Ok(expressions::lit(ScalarValue::Boolean(None)))
394 }
395 _ => {
396 let value_expr =
397 create_physical_expr(expr, input_dfschema, execution_props)?;
398
399 let list_exprs =
400 create_physical_exprs(list, input_dfschema, execution_props)?;
401 expressions::in_list(value_expr, list_exprs, negated, input_schema)
402 }
403 },
404 Expr::Placeholder(Placeholder { id, .. }) => {
405 exec_err!("Placeholder '{id}' was not provided a value for execution.")
406 }
407 other => {
408 not_impl_err!("Physical plan does not support logical expression {other:?}")
409 }
410 }
411}
412
413pub fn create_physical_exprs<'a, I>(
415 exprs: I,
416 input_dfschema: &DFSchema,
417 execution_props: &ExecutionProps,
418) -> Result<Vec<Arc<dyn PhysicalExpr>>>
419where
420 I: IntoIterator<Item = &'a Expr>,
421{
422 exprs
423 .into_iter()
424 .map(|expr| create_physical_expr(expr, input_dfschema, execution_props))
425 .collect()
426}
427
428pub fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr> {
430 let df_schema = schema.clone().to_dfschema().unwrap();
432 let execution_props = ExecutionProps::new();
433 create_physical_expr(expr, &df_schema, &execution_props).unwrap()
434}
435
436#[cfg(test)]
437mod tests {
438 use arrow::array::{ArrayRef, BooleanArray, RecordBatch, StringArray};
439 use arrow::datatypes::{DataType, Field};
440
441 use datafusion_expr::{col, lit};
442
443 use super::*;
444
445 #[test]
446 fn test_create_physical_expr_scalar_input_output() -> Result<()> {
447 let expr = col("letter").eq(lit("A"));
448
449 let schema = Schema::new(vec![Field::new("letter", DataType::Utf8, false)]);
450 let df_schema = DFSchema::try_from_qualified_schema("data", &schema)?;
451 let p = create_physical_expr(&expr, &df_schema, &ExecutionProps::new())?;
452
453 let batch = RecordBatch::try_new(
454 Arc::new(schema),
455 vec![Arc::new(StringArray::from_iter_values(vec![
456 "A", "B", "C", "D",
457 ]))],
458 )?;
459 let result = p.evaluate(&batch)?;
460 let result = result.into_array(4).expect("Failed to convert to array");
461
462 assert_eq!(
463 &result,
464 &(Arc::new(BooleanArray::from(vec![true, false, false, false,])) as ArrayRef)
465 );
466
467 Ok(())
468 }
469}