1use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
19
20use arrow::datatypes::DataType;
21use datafusion_common::{
22 internal_datafusion_err, internal_err, not_impl_err, plan_datafusion_err, plan_err,
23 DFSchema, Dependency, Diagnostic, Result, Span,
24};
25use datafusion_expr::expr::{Lambda, ScalarFunction, Unnest};
26use datafusion_expr::expr::{NullTreatment, WildcardOptions, WindowFunction};
27use datafusion_expr::planner::PlannerResult;
28use datafusion_expr::planner::{RawAggregateExpr, RawWindowExpr};
29use datafusion_expr::{expr, Expr, ExprSchemable, WindowFrame, WindowFunctionDefinition};
30use sqlparser::ast::{
31 DuplicateTreatment, Expr as SQLExpr, Function as SQLFunction, FunctionArg,
32 FunctionArgExpr, FunctionArgumentClause, FunctionArgumentList, FunctionArguments,
33 ObjectName, OrderByExpr, Spanned, WindowType,
34};
35
36pub fn suggest_valid_function(
41 input_function_name: &str,
42 is_window_func: bool,
43 ctx: &dyn ContextProvider,
44) -> Option<String> {
45 let valid_funcs = if is_window_func {
46 let mut funcs = Vec::new();
48
49 funcs.extend(ctx.udaf_names());
50 funcs.extend(ctx.udwf_names());
51
52 funcs
53 } else {
54 let mut funcs = Vec::new();
56
57 funcs.extend(ctx.udf_names());
58 funcs.extend(ctx.udaf_names());
59
60 funcs
61 };
62 find_closest_match(valid_funcs, input_function_name)
63}
64
65fn find_closest_match(candidates: Vec<String>, target: &str) -> Option<String> {
68 let target = target.to_lowercase();
69 candidates.into_iter().min_by_key(|candidate| {
70 datafusion_common::utils::datafusion_strsim::levenshtein(
71 &candidate.to_lowercase(),
72 &target,
73 )
74 })
75}
76
77#[derive(Debug)]
79struct FunctionArgs {
80 name: ObjectName,
82 args: Vec<FunctionArg>,
84 order_by: Vec<OrderByExpr>,
86 over: Option<WindowType>,
88 filter: Option<Box<SQLExpr>>,
90 null_treatment: Option<NullTreatment>,
92 distinct: bool,
94 within_group: Vec<OrderByExpr>,
96 function_without_parentheses: bool,
98}
99
100impl FunctionArgs {
101 fn try_new(function: SQLFunction) -> Result<Self> {
102 let SQLFunction {
103 name,
104 args,
105 over,
106 filter,
107 mut null_treatment,
108 within_group,
109 ..
110 } = function;
111
112 let FunctionArguments::List(args) = args else {
114 return Ok(Self {
115 name,
116 args: vec![],
117 order_by: vec![],
118 over,
119 filter,
120 null_treatment: null_treatment.map(|v| v.into()),
121 distinct: false,
122 within_group,
123 function_without_parentheses: matches!(args, FunctionArguments::None),
124 });
125 };
126
127 let FunctionArgumentList {
128 duplicate_treatment,
129 args,
130 clauses,
131 } = args;
132
133 let distinct = match duplicate_treatment {
134 Some(DuplicateTreatment::Distinct) => true,
135 Some(DuplicateTreatment::All) => false,
136 None => false,
137 };
138
139 let mut order_by = None;
141 for clause in clauses {
142 match clause {
143 FunctionArgumentClause::IgnoreOrRespectNulls(nt) => {
144 if null_treatment.is_some() {
145 return not_impl_err!(
146 "Calling {name}: Duplicated null treatment clause"
147 );
148 }
149 null_treatment = Some(nt);
150 }
151 FunctionArgumentClause::OrderBy(oby) => {
152 if order_by.is_some() {
153 if !within_group.is_empty() {
154 return plan_err!("ORDER BY clause is only permitted in WITHIN GROUP clause when a WITHIN GROUP is used");
155 }
156 return not_impl_err!("Calling {name}: Duplicated ORDER BY clause in function arguments");
157 }
158 order_by = Some(oby);
159 }
160 FunctionArgumentClause::Limit(limit) => {
161 return not_impl_err!(
162 "Calling {name}: LIMIT not supported in function arguments: {limit}"
163 )
164 }
165 FunctionArgumentClause::OnOverflow(overflow) => {
166 return not_impl_err!(
167 "Calling {name}: ON OVERFLOW not supported in function arguments: {overflow}"
168 )
169 }
170 FunctionArgumentClause::Having(having) => {
171 return not_impl_err!(
172 "Calling {name}: HAVING not supported in function arguments: {having}"
173 )
174 }
175 FunctionArgumentClause::Separator(sep) => {
176 return not_impl_err!(
177 "Calling {name}: SEPARATOR not supported in function arguments: {sep}"
178 )
179 }
180 FunctionArgumentClause::JsonNullClause(jn) => {
181 return not_impl_err!(
182 "Calling {name}: JSON NULL clause not supported in function arguments: {jn}"
183 )
184 }
185 FunctionArgumentClause::JsonReturningClause(jr) => {
186 return not_impl_err!(
187 "Calling {name}: JSON RETURNING clause not supported in function arguments: {jr}"
188 )
189 },
190 }
191 }
192
193 if within_group.len() > 1 {
194 return not_impl_err!(
195 "Only a single ordering expression is permitted in a WITHIN GROUP clause"
196 );
197 }
198
199 let order_by = order_by.unwrap_or_default();
200
201 Ok(Self {
202 name,
203 args,
204 order_by,
205 over,
206 filter,
207 null_treatment: null_treatment.map(|v| v.into()),
208 distinct,
209 within_group,
210 function_without_parentheses: false,
211 })
212 }
213}
214
215impl<S: ContextProvider> SqlToRel<'_, S> {
216 pub(super) fn sql_function_to_expr(
217 &self,
218 function: SQLFunction,
219 schema: &DFSchema,
220 planner_context: &mut PlannerContext,
221 ) -> Result<Expr> {
222 let function_args = FunctionArgs::try_new(function)?;
223 let FunctionArgs {
224 name: object_name,
225 args,
226 order_by,
227 over,
228 filter,
229 null_treatment,
230 distinct,
231 within_group,
232 function_without_parentheses,
233 } = function_args;
234
235 if over.is_some() && !within_group.is_empty() {
236 return plan_err!("OVER and WITHIN GROUP clause cannot be used together. \
237 OVER is for window functions, whereas WITHIN GROUP is for ordered set aggregate functions");
238 }
239
240 if !order_by.is_empty() && !within_group.is_empty() {
241 return plan_err!("ORDER BY and WITHIN GROUP clauses cannot be used together in the same aggregate function");
242 }
243
244 let is_function_window = over.is_some();
248 let sql_parser_span = object_name.0[0].span();
249 let name = if object_name.0.len() > 1 {
250 object_name.to_string()
253 } else {
254 match object_name.0[0].as_ident() {
255 Some(ident) => crate::utils::normalize_ident(ident.clone()),
256 None => {
257 return plan_err!(
258 "Expected an identifier in function name, but found {:?}",
259 object_name.0[0]
260 )
261 }
262 }
263 };
264
265 if name.eq("make_map") {
266 let mut fn_args =
267 self.function_args_to_expr(args.clone(), schema, planner_context)?;
268 for planner in self.context_provider.get_expr_planners().iter() {
269 match planner.plan_make_map(fn_args)? {
270 PlannerResult::Planned(expr) => return Ok(expr),
271 PlannerResult::Original(args) => fn_args = args,
272 }
273 }
274 }
275 if let Some(fm) = self.context_provider.get_function_meta(&name) {
277 let (args, arg_names) =
278 self.function_args_to_expr_with_names(args, schema, planner_context)?;
279
280 let resolved_args = if arg_names.iter().any(|name| name.is_some()) {
281 if let Some(param_names) = &fm.signature().parameter_names {
282 datafusion_expr::arguments::resolve_function_arguments(
283 param_names,
284 args,
285 arg_names,
286 )?
287 } else {
288 return plan_err!(
289 "Function '{}' does not support named arguments",
290 fm.name()
291 );
292 }
293 } else {
294 args
295 };
296
297 let inner = ScalarFunction::new_udf(fm, resolved_args);
299
300 if name.eq_ignore_ascii_case(inner.name()) {
301 return Ok(Expr::ScalarFunction(inner));
302 } else {
303 let arg_names = inner
307 .args
308 .iter()
309 .map(|arg| arg.to_string())
310 .collect::<Vec<_>>()
311 .join(",");
312 let verbose_alias = format!("{name}({arg_names})");
313
314 return Ok(Expr::ScalarFunction(inner).alias(verbose_alias));
315 }
316 }
317
318 if name.eq("unnest") {
320 let mut exprs = self.function_args_to_expr(args, schema, planner_context)?;
321 if exprs.len() != 1 {
322 return plan_err!("unnest() requires exactly one argument");
323 }
324 let expr = exprs.swap_remove(0);
325 Self::check_unnest_arg(&expr, schema)?;
326 return Ok(Expr::Unnest(Unnest::new(expr)));
327 }
328
329 if !order_by.is_empty() && is_function_window {
330 return plan_err!(
331 "Aggregate ORDER BY is not implemented for window functions"
332 );
333 }
334 if let Some(WindowType::WindowSpec(window)) = over {
336 let partition_by = window
337 .partition_by
338 .into_iter()
339 .filter(|e| !matches!(e, sqlparser::ast::Expr::Value { .. },))
342 .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
343 .collect::<Result<Vec<_>>>()?;
344 let mut order_by = self.order_by_to_sort_expr(
345 window.order_by,
346 schema,
347 planner_context,
348 false,
350 None,
351 )?;
352
353 let func_deps = schema.functional_dependencies();
354 let is_ordering_strict = order_by.iter().find_map(|orderby_expr| {
356 if let Expr::Column(col) = &orderby_expr.expr {
357 let idx = schema.index_of_column(col).ok()?;
358 return if func_deps.iter().any(|dep| {
359 dep.source_indices == vec![idx] && dep.mode == Dependency::Single
360 }) {
361 Some(true)
362 } else {
363 Some(false)
364 };
365 }
366 Some(false)
367 });
368
369 let window_frame = window
370 .window_frame
371 .as_ref()
372 .map(|window_frame| {
373 let window_frame: WindowFrame = window_frame.clone().try_into()?;
374 window_frame
375 .regularize_order_bys(&mut order_by)
376 .map(|_| window_frame)
377 })
378 .transpose()?;
379
380 let window_frame = if let Some(window_frame) = window_frame {
381 window_frame
382 } else if let Some(is_ordering_strict) = is_ordering_strict {
383 WindowFrame::new(Some(is_ordering_strict))
384 } else {
385 WindowFrame::new((!order_by.is_empty()).then_some(false))
386 };
387
388 if let Ok(fun) = self.find_window_func(&name) {
389 let (args, arg_names) =
390 self.function_args_to_expr_with_names(args, schema, planner_context)?;
391
392 let resolved_args = if arg_names.iter().any(|name| name.is_some()) {
393 let signature = match &fun {
394 WindowFunctionDefinition::AggregateUDF(udaf) => udaf.signature(),
395 WindowFunctionDefinition::WindowUDF(udwf) => udwf.signature(),
396 };
397
398 if let Some(param_names) = &signature.parameter_names {
399 datafusion_expr::arguments::resolve_function_arguments(
400 param_names,
401 args,
402 arg_names,
403 )?
404 } else {
405 return plan_err!(
406 "Window function '{}' does not support named arguments",
407 name
408 );
409 }
410 } else {
411 args
412 };
413
414 let filter = filter
416 .map(|e| self.sql_expr_to_logical_expr(*e, schema, planner_context))
417 .transpose()?
418 .map(Box::new);
419
420 let mut window_expr = RawWindowExpr {
421 func_def: fun,
422 args: resolved_args,
423 partition_by,
424 order_by,
425 window_frame,
426 filter,
427 null_treatment,
428 distinct: function_args.distinct,
429 };
430
431 for planner in self.context_provider.get_expr_planners().iter() {
432 match planner.plan_window(window_expr)? {
433 PlannerResult::Planned(expr) => return Ok(expr),
434 PlannerResult::Original(expr) => window_expr = expr,
435 }
436 }
437
438 let RawWindowExpr {
439 func_def,
440 args,
441 partition_by,
442 order_by,
443 window_frame,
444 filter,
445 null_treatment,
446 distinct,
447 } = window_expr;
448
449 let inner = WindowFunction {
450 fun: func_def,
451 params: expr::WindowFunctionParams {
452 args,
453 partition_by,
454 order_by,
455 window_frame,
456 filter,
457 null_treatment,
458 distinct,
459 },
460 };
461
462 if name.eq_ignore_ascii_case(inner.fun.name()) {
463 return Ok(Expr::WindowFunction(Box::new(inner)));
464 } else {
465 let arg_names = inner
469 .params
470 .args
471 .iter()
472 .map(|arg| arg.to_string())
473 .collect::<Vec<_>>()
474 .join(",");
475 let verbose_alias = format!("{name}({arg_names})");
476
477 return Ok(Expr::WindowFunction(Box::new(inner)).alias(verbose_alias));
478 }
479 }
480 } else {
481 if let Some(fm) = self.context_provider.get_aggregate_meta(&name) {
483 if null_treatment.is_some() && !fm.supports_null_handling_clause() {
484 return plan_err!(
485 "[IGNORE | RESPECT] NULLS are not permitted for {}",
486 fm.name()
487 );
488 }
489
490 let (mut args, mut arg_names) =
491 self.function_args_to_expr_with_names(args, schema, planner_context)?;
492
493 let order_by = if fm.supports_within_group_clause() {
494 let within_group = self.order_by_to_sort_expr(
495 within_group,
496 schema,
497 planner_context,
498 false,
499 None,
500 )?;
501
502 if !within_group.is_empty() {
505 let within_group_count = within_group.len();
507 arg_names = std::iter::repeat_n(None, within_group_count)
508 .chain(arg_names)
509 .collect();
510
511 args = within_group
512 .iter()
513 .map(|sort| sort.expr.clone())
514 .chain(args)
515 .collect::<Vec<_>>();
516 }
517 within_group
518 } else {
519 let order_by = if !order_by.is_empty() {
520 order_by
521 } else {
522 within_group
523 };
524 self.order_by_to_sort_expr(
525 order_by,
526 schema,
527 planner_context,
528 true,
529 None,
530 )?
531 };
532
533 let filter: Option<Box<Expr>> = filter
534 .map(|e| self.sql_expr_to_logical_expr(*e, schema, planner_context))
535 .transpose()?
536 .map(Box::new);
537
538 let resolved_args = if arg_names.iter().any(|name| name.is_some()) {
539 if let Some(param_names) = &fm.signature().parameter_names {
540 datafusion_expr::arguments::resolve_function_arguments(
541 param_names,
542 args,
543 arg_names,
544 )?
545 } else {
546 return plan_err!(
547 "Aggregate function '{}' does not support named arguments",
548 fm.name()
549 );
550 }
551 } else {
552 args
553 };
554
555 let mut aggregate_expr = RawAggregateExpr {
556 func: fm,
557 args: resolved_args,
558 distinct,
559 filter,
560 order_by,
561 null_treatment,
562 };
563 for planner in self.context_provider.get_expr_planners().iter() {
564 match planner.plan_aggregate(aggregate_expr)? {
565 PlannerResult::Planned(expr) => return Ok(expr),
566 PlannerResult::Original(expr) => aggregate_expr = expr,
567 }
568 }
569
570 let RawAggregateExpr {
571 func,
572 args,
573 distinct,
574 filter,
575 order_by,
576 null_treatment,
577 } = aggregate_expr;
578
579 let inner = expr::AggregateFunction::new_udf(
580 func,
581 args,
582 distinct,
583 filter,
584 order_by,
585 null_treatment,
586 );
587
588 if name.eq_ignore_ascii_case(inner.func.name()) {
589 return Ok(Expr::AggregateFunction(inner));
590 } else {
591 let arg_names = inner
595 .params
596 .args
597 .iter()
598 .map(|arg| arg.to_string())
599 .collect::<Vec<_>>()
600 .join(",");
601 let verbose_alias = format!("{name}({arg_names})");
602
603 return Ok(Expr::AggregateFunction(inner).alias(verbose_alias));
604 }
605 }
606 }
607
608 if function_without_parentheses {
610 let maybe_ids = object_name
611 .0
612 .iter()
613 .map(|part| part.as_ident().cloned().ok_or(()))
614 .collect::<Result<Vec<_>, ()>>();
615 if let Ok(ids) = maybe_ids {
616 if ids.len() == 1 {
617 return self.sql_identifier_to_expr(
618 ids.into_iter().next().unwrap(),
619 schema,
620 planner_context,
621 );
622 } else {
623 return self.sql_compound_identifier_to_expr(
624 ids,
625 schema,
626 planner_context,
627 );
628 }
629 }
630 }
631
632 if let Some(suggested_func_name) =
634 suggest_valid_function(&name, is_function_window, self.context_provider)
635 {
636 let span = Span::try_from_sqlparser_span(sql_parser_span);
637 let mut diagnostic =
638 Diagnostic::new_error(format!("Invalid function '{name}'"), span);
639 diagnostic
640 .add_note(format!("Possible function '{suggested_func_name}'"), None);
641 plan_err!("Invalid function '{name}'.\nDid you mean '{suggested_func_name}'?"; diagnostic=diagnostic)
642 } else {
643 internal_err!("No functions registered with this context.")
644 }
645 }
646
647 pub(super) fn sql_fn_name_to_expr(
648 &self,
649 expr: SQLExpr,
650 fn_name: &str,
651 schema: &DFSchema,
652 planner_context: &mut PlannerContext,
653 ) -> Result<Expr> {
654 let fun = self
655 .context_provider
656 .get_function_meta(fn_name)
657 .ok_or_else(|| {
658 internal_datafusion_err!("Unable to find expected '{fn_name}' function")
659 })?;
660 let args = vec![self.sql_expr_to_logical_expr(expr, schema, planner_context)?];
661 Ok(Expr::ScalarFunction(ScalarFunction::new_udf(fun, args)))
662 }
663
664 pub(super) fn find_window_func(
665 &self,
666 name: &str,
667 ) -> Result<WindowFunctionDefinition> {
668 let udaf = self.context_provider.get_aggregate_meta(name);
670 if udaf.as_ref().is_some_and(|udaf| {
672 udaf.name() != "first_value"
673 && udaf.name() != "last_value"
674 && udaf.name() != "nth_value"
675 }) {
676 Ok(WindowFunctionDefinition::AggregateUDF(udaf.unwrap()))
677 } else {
678 self.context_provider
679 .get_window_meta(name)
680 .map(WindowFunctionDefinition::WindowUDF)
681 .ok_or_else(|| {
682 plan_datafusion_err!("There is no window function named {name}")
683 })
684 }
685 }
686
687 fn sql_fn_arg_to_logical_expr(
688 &self,
689 sql: FunctionArg,
690 schema: &DFSchema,
691 planner_context: &mut PlannerContext,
692 ) -> Result<Expr> {
693 let (expr, _) =
694 self.sql_fn_arg_to_logical_expr_with_name(sql, schema, planner_context)?;
695 Ok(expr)
696 }
697
698 fn sql_fn_arg_to_logical_expr_with_name(
699 &self,
700 sql: FunctionArg,
701 schema: &DFSchema,
702 planner_context: &mut PlannerContext,
703 ) -> Result<(Expr, Option<String>)> {
704 match sql {
705 FunctionArg::Named {
706 name,
707 arg: FunctionArgExpr::Expr(arg),
708 operator: _,
709 } => {
710 let expr = self.sql_expr_to_logical_expr(arg, schema, planner_context)?;
711 let arg_name = crate::utils::normalize_ident(name);
712 Ok((expr, Some(arg_name)))
713 }
714 FunctionArg::Named {
715 name,
716 arg: FunctionArgExpr::Wildcard,
717 operator: _,
718 } => {
719 #[expect(deprecated)]
720 let expr = Expr::Wildcard {
721 qualifier: None,
722 options: Box::new(WildcardOptions::default()),
723 };
724 let arg_name = crate::utils::normalize_ident(name);
725 Ok((expr, Some(arg_name)))
726 }
727 FunctionArg::Unnamed(FunctionArgExpr::Expr(SQLExpr::Lambda(
728 sqlparser::ast::LambdaFunction { params, body },
729 ))) => {
730 let params = params
731 .into_iter()
732 .map(|v| v.to_string())
733 .collect::<Vec<_>>();
734
735 Ok((
736 Expr::Lambda(Lambda {
737 params: params.clone(),
738 body: Box::new(self.sql_expr_to_logical_expr(
739 *body,
740 schema,
741 &mut planner_context.clone().with_lambda_parameters(params),
742 )?),
743 }),
744 None,
745 ))
746 }
747 FunctionArg::Unnamed(FunctionArgExpr::Expr(arg)) => {
748 let expr = self.sql_expr_to_logical_expr(arg, schema, planner_context)?;
749 Ok((expr, None))
750 }
751 FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
752 #[expect(deprecated)]
753 let expr = Expr::Wildcard {
754 qualifier: None,
755 options: Box::new(WildcardOptions::default()),
756 };
757 Ok((expr, None))
758 }
759 FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(object_name)) => {
760 let qualifier = self.object_name_to_table_reference(object_name)?;
761 let qualified_indices = schema.fields_indices_with_qualified(&qualifier);
763 if qualified_indices.is_empty() {
764 return plan_err!("Invalid qualifier {qualifier}");
765 }
766
767 #[expect(deprecated)]
768 let expr = Expr::Wildcard {
769 qualifier: qualifier.into(),
770 options: Box::new(WildcardOptions::default()),
771 };
772 Ok((expr, None))
773 }
774 FunctionArg::ExprNamed {
776 name: SQLExpr::Identifier(name),
777 arg: FunctionArgExpr::Expr(arg),
778 operator: _,
779 } => {
780 let expr = self.sql_expr_to_logical_expr(arg, schema, planner_context)?;
781 let arg_name = crate::utils::normalize_ident(name);
782 Ok((expr, Some(arg_name)))
783 }
784 FunctionArg::ExprNamed {
785 name: SQLExpr::Identifier(name),
786 arg: FunctionArgExpr::Wildcard,
787 operator: _,
788 } => {
789 #[expect(deprecated)]
790 let expr = Expr::Wildcard {
791 qualifier: None,
792 options: Box::new(WildcardOptions::default()),
793 };
794 let arg_name = crate::utils::normalize_ident(name);
795 Ok((expr, Some(arg_name)))
796 }
797 _ => not_impl_err!("Unsupported qualified wildcard argument: {sql:?}"),
798 }
799 }
800
801 pub(super) fn function_args_to_expr(
802 &self,
803 args: Vec<FunctionArg>,
804 schema: &DFSchema,
805 planner_context: &mut PlannerContext,
806 ) -> Result<Vec<Expr>> {
807 args.into_iter()
808 .map(|a| self.sql_fn_arg_to_logical_expr(a, schema, planner_context))
809 .collect::<Result<Vec<Expr>>>()
810 }
811
812 pub(super) fn function_args_to_expr_with_names(
813 &self,
814 args: Vec<FunctionArg>,
815 schema: &DFSchema,
816 planner_context: &mut PlannerContext,
817 ) -> Result<(Vec<Expr>, Vec<Option<String>>)> {
818 let results: Result<Vec<(Expr, Option<String>)>> = args
819 .into_iter()
820 .map(|a| {
821 self.sql_fn_arg_to_logical_expr_with_name(a, schema, planner_context)
822 })
823 .collect();
824
825 let pairs = results?;
826 let (exprs, names): (Vec<Expr>, Vec<Option<String>>) = pairs.into_iter().unzip();
827 Ok((exprs, names))
828 }
829
830 pub(crate) fn check_unnest_arg(arg: &Expr, schema: &DFSchema) -> Result<()> {
831 match arg.get_type(schema)? {
833 DataType::List(_)
834 | DataType::LargeList(_)
835 | DataType::FixedSizeList(_, _)
836 | DataType::Struct(_) => Ok(()),
837 DataType::Null => {
838 not_impl_err!("unnest() does not support null yet")
839 }
840 _ => {
841 plan_err!("unnest() can only be applied to array, struct and null")
842 }
843 }
844 }
845}