datafusion_sql/expr/
function.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
19
20use arrow::datatypes::DataType;
21use datafusion_common::{
22    internal_datafusion_err, internal_err, not_impl_err, plan_datafusion_err, plan_err,
23    DFSchema, Dependency, Diagnostic, Result, Span,
24};
25use datafusion_expr::expr::{Lambda, ScalarFunction, Unnest};
26use datafusion_expr::expr::{NullTreatment, WildcardOptions, WindowFunction};
27use datafusion_expr::planner::PlannerResult;
28use datafusion_expr::planner::{RawAggregateExpr, RawWindowExpr};
29use datafusion_expr::{expr, Expr, ExprSchemable, WindowFrame, WindowFunctionDefinition};
30use sqlparser::ast::{
31    DuplicateTreatment, Expr as SQLExpr, Function as SQLFunction, FunctionArg,
32    FunctionArgExpr, FunctionArgumentClause, FunctionArgumentList, FunctionArguments,
33    ObjectName, OrderByExpr, Spanned, WindowType,
34};
35
36/// Suggest a valid function based on an invalid input function name
37///
38/// Returns `None` if no valid matches are found. This happens when there are no
39/// functions registered with the context.
40pub fn suggest_valid_function(
41    input_function_name: &str,
42    is_window_func: bool,
43    ctx: &dyn ContextProvider,
44) -> Option<String> {
45    let valid_funcs = if is_window_func {
46        // All aggregate functions and builtin window functions
47        let mut funcs = Vec::new();
48
49        funcs.extend(ctx.udaf_names());
50        funcs.extend(ctx.udwf_names());
51
52        funcs
53    } else {
54        // All scalar functions and aggregate functions
55        let mut funcs = Vec::new();
56
57        funcs.extend(ctx.udf_names());
58        funcs.extend(ctx.udaf_names());
59
60        funcs
61    };
62    find_closest_match(valid_funcs, input_function_name)
63}
64
65/// Find the closest matching string to the target string in the candidates list, using edit distance(case insensitive)
66/// Input `candidates` must not be empty otherwise an error is returned.
67fn find_closest_match(candidates: Vec<String>, target: &str) -> Option<String> {
68    let target = target.to_lowercase();
69    candidates.into_iter().min_by_key(|candidate| {
70        datafusion_common::utils::datafusion_strsim::levenshtein(
71            &candidate.to_lowercase(),
72            &target,
73        )
74    })
75}
76
77/// Arguments for a function call extracted from the SQL AST
78#[derive(Debug)]
79struct FunctionArgs {
80    /// Function name
81    name: ObjectName,
82    /// Argument expressions
83    args: Vec<FunctionArg>,
84    /// ORDER BY clause, if any
85    order_by: Vec<OrderByExpr>,
86    /// OVER clause, if any
87    over: Option<WindowType>,
88    /// FILTER clause, if any
89    filter: Option<Box<SQLExpr>>,
90    /// NULL treatment clause, if any
91    null_treatment: Option<NullTreatment>,
92    /// DISTINCT
93    distinct: bool,
94    /// WITHIN GROUP clause, if any
95    within_group: Vec<OrderByExpr>,
96    /// Was the function called without parenthesis, i.e. could this also be a column reference?
97    function_without_parentheses: bool,
98}
99
100impl FunctionArgs {
101    fn try_new(function: SQLFunction) -> Result<Self> {
102        let SQLFunction {
103            name,
104            args,
105            over,
106            filter,
107            mut null_treatment,
108            within_group,
109            ..
110        } = function;
111
112        // Handle no argument form (aka `current_time`  as opposed to `current_time()`)
113        let FunctionArguments::List(args) = args else {
114            return Ok(Self {
115                name,
116                args: vec![],
117                order_by: vec![],
118                over,
119                filter,
120                null_treatment: null_treatment.map(|v| v.into()),
121                distinct: false,
122                within_group,
123                function_without_parentheses: matches!(args, FunctionArguments::None),
124            });
125        };
126
127        let FunctionArgumentList {
128            duplicate_treatment,
129            args,
130            clauses,
131        } = args;
132
133        let distinct = match duplicate_treatment {
134            Some(DuplicateTreatment::Distinct) => true,
135            Some(DuplicateTreatment::All) => false,
136            None => false,
137        };
138
139        // Pull out argument handling
140        let mut order_by = None;
141        for clause in clauses {
142            match clause {
143                FunctionArgumentClause::IgnoreOrRespectNulls(nt) => {
144                    if null_treatment.is_some() {
145                        return not_impl_err!(
146                            "Calling {name}: Duplicated null treatment clause"
147                        );
148                    }
149                    null_treatment = Some(nt);
150                }
151                FunctionArgumentClause::OrderBy(oby) => {
152                    if order_by.is_some() {
153                        if !within_group.is_empty() {
154                            return plan_err!("ORDER BY clause is only permitted in WITHIN GROUP clause when a WITHIN GROUP is used");
155                        }
156                        return not_impl_err!("Calling {name}: Duplicated ORDER BY clause in function arguments");
157                    }
158                    order_by = Some(oby);
159                }
160                FunctionArgumentClause::Limit(limit) => {
161                    return not_impl_err!(
162                        "Calling {name}: LIMIT not supported in function arguments: {limit}"
163                    )
164                }
165                FunctionArgumentClause::OnOverflow(overflow) => {
166                    return not_impl_err!(
167                        "Calling {name}: ON OVERFLOW not supported in function arguments: {overflow}"
168                    )
169                }
170                FunctionArgumentClause::Having(having) => {
171                    return not_impl_err!(
172                        "Calling {name}: HAVING not supported in function arguments: {having}"
173                    )
174                }
175                FunctionArgumentClause::Separator(sep) => {
176                    return not_impl_err!(
177                        "Calling {name}: SEPARATOR not supported in function arguments: {sep}"
178                    )
179                }
180                FunctionArgumentClause::JsonNullClause(jn) => {
181                    return not_impl_err!(
182                        "Calling {name}: JSON NULL clause not supported in function arguments: {jn}"
183                    )
184                }
185                FunctionArgumentClause::JsonReturningClause(jr) => {
186                    return not_impl_err!(
187                        "Calling {name}: JSON RETURNING clause not supported in function arguments: {jr}"
188                    )
189                },
190            }
191        }
192
193        if within_group.len() > 1 {
194            return not_impl_err!(
195                "Only a single ordering expression is permitted in a WITHIN GROUP clause"
196            );
197        }
198
199        let order_by = order_by.unwrap_or_default();
200
201        Ok(Self {
202            name,
203            args,
204            order_by,
205            over,
206            filter,
207            null_treatment: null_treatment.map(|v| v.into()),
208            distinct,
209            within_group,
210            function_without_parentheses: false,
211        })
212    }
213}
214
215impl<S: ContextProvider> SqlToRel<'_, S> {
216    pub(super) fn sql_function_to_expr(
217        &self,
218        function: SQLFunction,
219        schema: &DFSchema,
220        planner_context: &mut PlannerContext,
221    ) -> Result<Expr> {
222        let function_args = FunctionArgs::try_new(function)?;
223        let FunctionArgs {
224            name: object_name,
225            args,
226            order_by,
227            over,
228            filter,
229            null_treatment,
230            distinct,
231            within_group,
232            function_without_parentheses,
233        } = function_args;
234
235        if over.is_some() && !within_group.is_empty() {
236            return plan_err!("OVER and WITHIN GROUP clause cannot be used together. \
237                OVER is for window functions, whereas WITHIN GROUP is for ordered set aggregate functions");
238        }
239
240        if !order_by.is_empty() && !within_group.is_empty() {
241            return plan_err!("ORDER BY and WITHIN GROUP clauses cannot be used together in the same aggregate function");
242        }
243
244        // If function is a window function (it has an OVER clause),
245        // it shouldn't have ordering requirement as function argument
246        // required ordering should be defined in OVER clause.
247        let is_function_window = over.is_some();
248        let sql_parser_span = object_name.0[0].span();
249        let name = if object_name.0.len() > 1 {
250            // DF doesn't handle compound identifiers
251            // (e.g. "foo.bar") for function names yet
252            object_name.to_string()
253        } else {
254            match object_name.0[0].as_ident() {
255                Some(ident) => crate::utils::normalize_ident(ident.clone()),
256                None => {
257                    return plan_err!(
258                        "Expected an identifier in function name, but found {:?}",
259                        object_name.0[0]
260                    )
261                }
262            }
263        };
264
265        if name.eq("make_map") {
266            let mut fn_args =
267                self.function_args_to_expr(args.clone(), schema, planner_context)?;
268            for planner in self.context_provider.get_expr_planners().iter() {
269                match planner.plan_make_map(fn_args)? {
270                    PlannerResult::Planned(expr) => return Ok(expr),
271                    PlannerResult::Original(args) => fn_args = args,
272                }
273            }
274        }
275        // User-defined function (UDF) should have precedence
276        if let Some(fm) = self.context_provider.get_function_meta(&name) {
277            let (args, arg_names) =
278                self.function_args_to_expr_with_names(args, schema, planner_context)?;
279
280            let resolved_args = if arg_names.iter().any(|name| name.is_some()) {
281                if let Some(param_names) = &fm.signature().parameter_names {
282                    datafusion_expr::arguments::resolve_function_arguments(
283                        param_names,
284                        args,
285                        arg_names,
286                    )?
287                } else {
288                    return plan_err!(
289                        "Function '{}' does not support named arguments",
290                        fm.name()
291                    );
292                }
293            } else {
294                args
295            };
296
297            // After resolution, all arguments are positional
298            let inner = ScalarFunction::new_udf(fm, resolved_args);
299
300            if name.eq_ignore_ascii_case(inner.name()) {
301                return Ok(Expr::ScalarFunction(inner));
302            } else {
303                // If the function is called by an alias, a verbose string representation is created
304                // (e.g., "my_alias(arg1, arg2)") and the expression is wrapped in an `Alias`
305                // to ensure the output column name matches the user's query.
306                let arg_names = inner
307                    .args
308                    .iter()
309                    .map(|arg| arg.to_string())
310                    .collect::<Vec<_>>()
311                    .join(",");
312                let verbose_alias = format!("{name}({arg_names})");
313
314                return Ok(Expr::ScalarFunction(inner).alias(verbose_alias));
315            }
316        }
317
318        // Build Unnest expression
319        if name.eq("unnest") {
320            let mut exprs = self.function_args_to_expr(args, schema, planner_context)?;
321            if exprs.len() != 1 {
322                return plan_err!("unnest() requires exactly one argument");
323            }
324            let expr = exprs.swap_remove(0);
325            Self::check_unnest_arg(&expr, schema)?;
326            return Ok(Expr::Unnest(Unnest::new(expr)));
327        }
328
329        if !order_by.is_empty() && is_function_window {
330            return plan_err!(
331                "Aggregate ORDER BY is not implemented for window functions"
332            );
333        }
334        // Then, window function
335        if let Some(WindowType::WindowSpec(window)) = over {
336            let partition_by = window
337                .partition_by
338                .into_iter()
339                // Ignore window spec PARTITION BY for scalar values
340                // as they do not change and thus do not generate new partitions
341                .filter(|e| !matches!(e, sqlparser::ast::Expr::Value { .. },))
342                .map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
343                .collect::<Result<Vec<_>>>()?;
344            let mut order_by = self.order_by_to_sort_expr(
345                window.order_by,
346                schema,
347                planner_context,
348                // Numeric literals in window function ORDER BY are treated as constants
349                false,
350                None,
351            )?;
352
353            let func_deps = schema.functional_dependencies();
354            // Find whether ties are possible in the given ordering
355            let is_ordering_strict = order_by.iter().find_map(|orderby_expr| {
356                if let Expr::Column(col) = &orderby_expr.expr {
357                    let idx = schema.index_of_column(col).ok()?;
358                    return if func_deps.iter().any(|dep| {
359                        dep.source_indices == vec![idx] && dep.mode == Dependency::Single
360                    }) {
361                        Some(true)
362                    } else {
363                        Some(false)
364                    };
365                }
366                Some(false)
367            });
368
369            let window_frame = window
370                .window_frame
371                .as_ref()
372                .map(|window_frame| {
373                    let window_frame: WindowFrame = window_frame.clone().try_into()?;
374                    window_frame
375                        .regularize_order_bys(&mut order_by)
376                        .map(|_| window_frame)
377                })
378                .transpose()?;
379
380            let window_frame = if let Some(window_frame) = window_frame {
381                window_frame
382            } else if let Some(is_ordering_strict) = is_ordering_strict {
383                WindowFrame::new(Some(is_ordering_strict))
384            } else {
385                WindowFrame::new((!order_by.is_empty()).then_some(false))
386            };
387
388            if let Ok(fun) = self.find_window_func(&name) {
389                let (args, arg_names) =
390                    self.function_args_to_expr_with_names(args, schema, planner_context)?;
391
392                let resolved_args = if arg_names.iter().any(|name| name.is_some()) {
393                    let signature = match &fun {
394                        WindowFunctionDefinition::AggregateUDF(udaf) => udaf.signature(),
395                        WindowFunctionDefinition::WindowUDF(udwf) => udwf.signature(),
396                    };
397
398                    if let Some(param_names) = &signature.parameter_names {
399                        datafusion_expr::arguments::resolve_function_arguments(
400                            param_names,
401                            args,
402                            arg_names,
403                        )?
404                    } else {
405                        return plan_err!(
406                            "Window function '{}' does not support named arguments",
407                            name
408                        );
409                    }
410                } else {
411                    args
412                };
413
414                // Plan FILTER clause if present
415                let filter = filter
416                    .map(|e| self.sql_expr_to_logical_expr(*e, schema, planner_context))
417                    .transpose()?
418                    .map(Box::new);
419
420                let mut window_expr = RawWindowExpr {
421                    func_def: fun,
422                    args: resolved_args,
423                    partition_by,
424                    order_by,
425                    window_frame,
426                    filter,
427                    null_treatment,
428                    distinct: function_args.distinct,
429                };
430
431                for planner in self.context_provider.get_expr_planners().iter() {
432                    match planner.plan_window(window_expr)? {
433                        PlannerResult::Planned(expr) => return Ok(expr),
434                        PlannerResult::Original(expr) => window_expr = expr,
435                    }
436                }
437
438                let RawWindowExpr {
439                    func_def,
440                    args,
441                    partition_by,
442                    order_by,
443                    window_frame,
444                    filter,
445                    null_treatment,
446                    distinct,
447                } = window_expr;
448
449                let inner = WindowFunction {
450                    fun: func_def,
451                    params: expr::WindowFunctionParams {
452                        args,
453                        partition_by,
454                        order_by,
455                        window_frame,
456                        filter,
457                        null_treatment,
458                        distinct,
459                    },
460                };
461
462                if name.eq_ignore_ascii_case(inner.fun.name()) {
463                    return Ok(Expr::WindowFunction(Box::new(inner)));
464                } else {
465                    // If the function is called by an alias, a verbose string representation is created
466                    // (e.g., "my_alias(arg1, arg2)") and the expression is wrapped in an `Alias`
467                    // to ensure the output column name matches the user's query.
468                    let arg_names = inner
469                        .params
470                        .args
471                        .iter()
472                        .map(|arg| arg.to_string())
473                        .collect::<Vec<_>>()
474                        .join(",");
475                    let verbose_alias = format!("{name}({arg_names})");
476
477                    return Ok(Expr::WindowFunction(Box::new(inner)).alias(verbose_alias));
478                }
479            }
480        } else {
481            // User defined aggregate functions (UDAF) have precedence in case it has the same name as a scalar built-in function
482            if let Some(fm) = self.context_provider.get_aggregate_meta(&name) {
483                if null_treatment.is_some() && !fm.supports_null_handling_clause() {
484                    return plan_err!(
485                        "[IGNORE | RESPECT] NULLS are not permitted for {}",
486                        fm.name()
487                    );
488                }
489
490                let (mut args, mut arg_names) =
491                    self.function_args_to_expr_with_names(args, schema, planner_context)?;
492
493                let order_by = if fm.supports_within_group_clause() {
494                    let within_group = self.order_by_to_sort_expr(
495                        within_group,
496                        schema,
497                        planner_context,
498                        false,
499                        None,
500                    )?;
501
502                    // Add the WITHIN GROUP ordering expressions to the front of the argument list
503                    // So function(arg) WITHIN GROUP (ORDER BY x) becomes function(x, arg)
504                    if !within_group.is_empty() {
505                        // Prepend None arg names for each WITHIN GROUP expression
506                        let within_group_count = within_group.len();
507                        arg_names = std::iter::repeat_n(None, within_group_count)
508                            .chain(arg_names)
509                            .collect();
510
511                        args = within_group
512                            .iter()
513                            .map(|sort| sort.expr.clone())
514                            .chain(args)
515                            .collect::<Vec<_>>();
516                    }
517                    within_group
518                } else {
519                    let order_by = if !order_by.is_empty() {
520                        order_by
521                    } else {
522                        within_group
523                    };
524                    self.order_by_to_sort_expr(
525                        order_by,
526                        schema,
527                        planner_context,
528                        true,
529                        None,
530                    )?
531                };
532
533                let filter: Option<Box<Expr>> = filter
534                    .map(|e| self.sql_expr_to_logical_expr(*e, schema, planner_context))
535                    .transpose()?
536                    .map(Box::new);
537
538                let resolved_args = if arg_names.iter().any(|name| name.is_some()) {
539                    if let Some(param_names) = &fm.signature().parameter_names {
540                        datafusion_expr::arguments::resolve_function_arguments(
541                            param_names,
542                            args,
543                            arg_names,
544                        )?
545                    } else {
546                        return plan_err!(
547                            "Aggregate function '{}' does not support named arguments",
548                            fm.name()
549                        );
550                    }
551                } else {
552                    args
553                };
554
555                let mut aggregate_expr = RawAggregateExpr {
556                    func: fm,
557                    args: resolved_args,
558                    distinct,
559                    filter,
560                    order_by,
561                    null_treatment,
562                };
563                for planner in self.context_provider.get_expr_planners().iter() {
564                    match planner.plan_aggregate(aggregate_expr)? {
565                        PlannerResult::Planned(expr) => return Ok(expr),
566                        PlannerResult::Original(expr) => aggregate_expr = expr,
567                    }
568                }
569
570                let RawAggregateExpr {
571                    func,
572                    args,
573                    distinct,
574                    filter,
575                    order_by,
576                    null_treatment,
577                } = aggregate_expr;
578
579                let inner = expr::AggregateFunction::new_udf(
580                    func,
581                    args,
582                    distinct,
583                    filter,
584                    order_by,
585                    null_treatment,
586                );
587
588                if name.eq_ignore_ascii_case(inner.func.name()) {
589                    return Ok(Expr::AggregateFunction(inner));
590                } else {
591                    // If the function is called by an alias, a verbose string representation is created
592                    // (e.g., "my_alias(arg1, arg2)") and the expression is wrapped in an `Alias`
593                    // to ensure the output column name matches the user's query.
594                    let arg_names = inner
595                        .params
596                        .args
597                        .iter()
598                        .map(|arg| arg.to_string())
599                        .collect::<Vec<_>>()
600                        .join(",");
601                    let verbose_alias = format!("{name}({arg_names})");
602
603                    return Ok(Expr::AggregateFunction(inner).alias(verbose_alias));
604                }
605            }
606        }
607
608        // workaround for https://github.com/apache/datafusion-sqlparser-rs/issues/1909
609        if function_without_parentheses {
610            let maybe_ids = object_name
611                .0
612                .iter()
613                .map(|part| part.as_ident().cloned().ok_or(()))
614                .collect::<Result<Vec<_>, ()>>();
615            if let Ok(ids) = maybe_ids {
616                if ids.len() == 1 {
617                    return self.sql_identifier_to_expr(
618                        ids.into_iter().next().unwrap(),
619                        schema,
620                        planner_context,
621                    );
622                } else {
623                    return self.sql_compound_identifier_to_expr(
624                        ids,
625                        schema,
626                        planner_context,
627                    );
628                }
629            }
630        }
631
632        // Could not find the relevant function, so return an error
633        if let Some(suggested_func_name) =
634            suggest_valid_function(&name, is_function_window, self.context_provider)
635        {
636            let span = Span::try_from_sqlparser_span(sql_parser_span);
637            let mut diagnostic =
638                Diagnostic::new_error(format!("Invalid function '{name}'"), span);
639            diagnostic
640                .add_note(format!("Possible function '{suggested_func_name}'"), None);
641            plan_err!("Invalid function '{name}'.\nDid you mean '{suggested_func_name}'?"; diagnostic=diagnostic)
642        } else {
643            internal_err!("No functions registered with this context.")
644        }
645    }
646
647    pub(super) fn sql_fn_name_to_expr(
648        &self,
649        expr: SQLExpr,
650        fn_name: &str,
651        schema: &DFSchema,
652        planner_context: &mut PlannerContext,
653    ) -> Result<Expr> {
654        let fun = self
655            .context_provider
656            .get_function_meta(fn_name)
657            .ok_or_else(|| {
658                internal_datafusion_err!("Unable to find expected '{fn_name}' function")
659            })?;
660        let args = vec![self.sql_expr_to_logical_expr(expr, schema, planner_context)?];
661        Ok(Expr::ScalarFunction(ScalarFunction::new_udf(fun, args)))
662    }
663
664    pub(super) fn find_window_func(
665        &self,
666        name: &str,
667    ) -> Result<WindowFunctionDefinition> {
668        // Check udaf first
669        let udaf = self.context_provider.get_aggregate_meta(name);
670        // Use the builtin window function instead of the user-defined aggregate function
671        if udaf.as_ref().is_some_and(|udaf| {
672            udaf.name() != "first_value"
673                && udaf.name() != "last_value"
674                && udaf.name() != "nth_value"
675        }) {
676            Ok(WindowFunctionDefinition::AggregateUDF(udaf.unwrap()))
677        } else {
678            self.context_provider
679                .get_window_meta(name)
680                .map(WindowFunctionDefinition::WindowUDF)
681                .ok_or_else(|| {
682                    plan_datafusion_err!("There is no window function named {name}")
683                })
684        }
685    }
686
687    fn sql_fn_arg_to_logical_expr(
688        &self,
689        sql: FunctionArg,
690        schema: &DFSchema,
691        planner_context: &mut PlannerContext,
692    ) -> Result<Expr> {
693        let (expr, _) =
694            self.sql_fn_arg_to_logical_expr_with_name(sql, schema, planner_context)?;
695        Ok(expr)
696    }
697
698    fn sql_fn_arg_to_logical_expr_with_name(
699        &self,
700        sql: FunctionArg,
701        schema: &DFSchema,
702        planner_context: &mut PlannerContext,
703    ) -> Result<(Expr, Option<String>)> {
704        match sql {
705            FunctionArg::Named {
706                name,
707                arg: FunctionArgExpr::Expr(arg),
708                operator: _,
709            } => {
710                let expr = self.sql_expr_to_logical_expr(arg, schema, planner_context)?;
711                let arg_name = crate::utils::normalize_ident(name);
712                Ok((expr, Some(arg_name)))
713            }
714            FunctionArg::Named {
715                name,
716                arg: FunctionArgExpr::Wildcard,
717                operator: _,
718            } => {
719                #[expect(deprecated)]
720                let expr = Expr::Wildcard {
721                    qualifier: None,
722                    options: Box::new(WildcardOptions::default()),
723                };
724                let arg_name = crate::utils::normalize_ident(name);
725                Ok((expr, Some(arg_name)))
726            }
727            FunctionArg::Unnamed(FunctionArgExpr::Expr(SQLExpr::Lambda(
728                sqlparser::ast::LambdaFunction { params, body },
729            ))) => {
730                let params = params
731                    .into_iter()
732                    .map(|v| v.to_string())
733                    .collect::<Vec<_>>();
734
735                Ok((
736                    Expr::Lambda(Lambda {
737                        params: params.clone(),
738                        body: Box::new(self.sql_expr_to_logical_expr(
739                            *body,
740                            schema,
741                            &mut planner_context.clone().with_lambda_parameters(params),
742                        )?),
743                    }),
744                    None,
745                ))
746            }
747            FunctionArg::Unnamed(FunctionArgExpr::Expr(arg)) => {
748                let expr = self.sql_expr_to_logical_expr(arg, schema, planner_context)?;
749                Ok((expr, None))
750            }
751            FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
752                #[expect(deprecated)]
753                let expr = Expr::Wildcard {
754                    qualifier: None,
755                    options: Box::new(WildcardOptions::default()),
756                };
757                Ok((expr, None))
758            }
759            FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(object_name)) => {
760                let qualifier = self.object_name_to_table_reference(object_name)?;
761                // Sanity check on qualifier with schema
762                let qualified_indices = schema.fields_indices_with_qualified(&qualifier);
763                if qualified_indices.is_empty() {
764                    return plan_err!("Invalid qualifier {qualifier}");
765                }
766
767                #[expect(deprecated)]
768                let expr = Expr::Wildcard {
769                    qualifier: qualifier.into(),
770                    options: Box::new(WildcardOptions::default()),
771                };
772                Ok((expr, None))
773            }
774            // PostgreSQL dialect uses ExprNamed variant with expression for name
775            FunctionArg::ExprNamed {
776                name: SQLExpr::Identifier(name),
777                arg: FunctionArgExpr::Expr(arg),
778                operator: _,
779            } => {
780                let expr = self.sql_expr_to_logical_expr(arg, schema, planner_context)?;
781                let arg_name = crate::utils::normalize_ident(name);
782                Ok((expr, Some(arg_name)))
783            }
784            FunctionArg::ExprNamed {
785                name: SQLExpr::Identifier(name),
786                arg: FunctionArgExpr::Wildcard,
787                operator: _,
788            } => {
789                #[expect(deprecated)]
790                let expr = Expr::Wildcard {
791                    qualifier: None,
792                    options: Box::new(WildcardOptions::default()),
793                };
794                let arg_name = crate::utils::normalize_ident(name);
795                Ok((expr, Some(arg_name)))
796            }
797            _ => not_impl_err!("Unsupported qualified wildcard argument: {sql:?}"),
798        }
799    }
800
801    pub(super) fn function_args_to_expr(
802        &self,
803        args: Vec<FunctionArg>,
804        schema: &DFSchema,
805        planner_context: &mut PlannerContext,
806    ) -> Result<Vec<Expr>> {
807        args.into_iter()
808            .map(|a| self.sql_fn_arg_to_logical_expr(a, schema, planner_context))
809            .collect::<Result<Vec<Expr>>>()
810    }
811
812    pub(super) fn function_args_to_expr_with_names(
813        &self,
814        args: Vec<FunctionArg>,
815        schema: &DFSchema,
816        planner_context: &mut PlannerContext,
817    ) -> Result<(Vec<Expr>, Vec<Option<String>>)> {
818        let results: Result<Vec<(Expr, Option<String>)>> = args
819            .into_iter()
820            .map(|a| {
821                self.sql_fn_arg_to_logical_expr_with_name(a, schema, planner_context)
822            })
823            .collect();
824
825        let pairs = results?;
826        let (exprs, names): (Vec<Expr>, Vec<Option<String>>) = pairs.into_iter().unzip();
827        Ok((exprs, names))
828    }
829
830    pub(crate) fn check_unnest_arg(arg: &Expr, schema: &DFSchema) -> Result<()> {
831        // Check argument type, array types are supported
832        match arg.get_type(schema)? {
833            DataType::List(_)
834            | DataType::LargeList(_)
835            | DataType::FixedSizeList(_, _)
836            | DataType::Struct(_) => Ok(()),
837            DataType::Null => {
838                not_impl_err!("unnest() does not support null yet")
839            }
840            _ => {
841                plan_err!("unnest() can only be applied to array, struct and null")
842            }
843        }
844    }
845}