datafusion_sql/
select.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::collections::HashSet;
19use std::ops::ControlFlow;
20use std::sync::Arc;
21
22use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
23use crate::query::to_order_by_exprs_with_select;
24use crate::utils::{
25    check_columns_satisfy_exprs, extract_aliases, rebase_expr, resolve_aliases_to_exprs,
26    resolve_columns, resolve_positions_to_exprs, rewrite_recursive_unnests_bottom_up,
27    CheckColumnsMustReferenceAggregatePurpose, CheckColumnsSatisfyExprsPurpose,
28};
29
30use datafusion_common::error::DataFusionErrorBuilder;
31use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
32use datafusion_common::{not_impl_err, plan_err, Result};
33use datafusion_common::{RecursionUnnestOption, UnnestOptions};
34use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem, WildcardOptions};
35use datafusion_expr::expr_rewriter::{
36    normalize_col, normalize_col_with_schemas_and_ambiguity_check, normalize_sorts,
37};
38use datafusion_expr::select_expr::SelectExpr;
39use datafusion_expr::utils::{
40    expr_as_column_expr, expr_to_columns, find_aggregate_exprs, find_window_exprs,
41};
42use datafusion_expr::{
43    Aggregate, Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder,
44    LogicalPlanBuilderOptions, Partitioning,
45};
46
47use indexmap::IndexMap;
48use sqlparser::ast::{
49    visit_expressions_mut, Distinct, Expr as SQLExpr, GroupByExpr, NamedWindowExpr,
50    OrderBy, SelectItemQualifiedWildcardKind, WildcardAdditionalOptions, WindowType,
51};
52use sqlparser::ast::{NamedWindowDefinition, Select, SelectItem, TableWithJoins};
53
54impl<S: ContextProvider> SqlToRel<'_, S> {
55    /// Generate a logic plan from an SQL select
56    pub(super) fn select_to_plan(
57        &self,
58        mut select: Select,
59        query_order_by: Option<OrderBy>,
60        planner_context: &mut PlannerContext,
61    ) -> Result<LogicalPlan> {
62        // Check for unsupported syntax first
63        if !select.cluster_by.is_empty() {
64            return not_impl_err!("CLUSTER BY");
65        }
66        if !select.lateral_views.is_empty() {
67            return not_impl_err!("LATERAL VIEWS");
68        }
69
70        if select.top.is_some() {
71            return not_impl_err!("TOP");
72        }
73        if !select.sort_by.is_empty() {
74            return not_impl_err!("SORT BY");
75        }
76
77        // Process `from` clause
78        let plan = self.plan_from_tables(select.from, planner_context)?;
79        let empty_from = matches!(plan, LogicalPlan::EmptyRelation(_));
80
81        // Process `where` clause
82        let base_plan = self.plan_selection(select.selection, plan, planner_context)?;
83
84        // Handle named windows before processing the projection expression
85        check_conflicting_windows(&select.named_window)?;
86        self.match_window_definitions(&mut select.projection, &select.named_window)?;
87
88        // Process the SELECT expressions
89        let select_exprs = self.prepare_select_exprs(
90            &base_plan,
91            select.projection,
92            empty_from,
93            planner_context,
94        )?;
95
96        // Having and group by clause may reference aliases defined in select projection
97        let projected_plan = self.project(base_plan.clone(), select_exprs)?;
98        let select_exprs = projected_plan.expressions();
99
100        let order_by =
101            to_order_by_exprs_with_select(query_order_by, Some(&select_exprs))?;
102
103        // Place the fields of the base plan at the front so that when there are references
104        // with the same name, the fields of the base plan will be searched first.
105        // See https://github.com/apache/datafusion/issues/9162
106        let mut combined_schema = base_plan.schema().as_ref().clone();
107        combined_schema.merge(projected_plan.schema());
108
109        // Order-by expressions prioritize referencing columns from the select list,
110        // then from the FROM clause.
111        let order_by_rex = self.order_by_to_sort_expr(
112            order_by,
113            projected_plan.schema().as_ref(),
114            planner_context,
115            true,
116            Some(base_plan.schema().as_ref()),
117        )?;
118        let order_by_rex = normalize_sorts(order_by_rex, &projected_plan)?;
119
120        // This alias map is resolved and looked up in both having exprs and group by exprs
121        let alias_map = extract_aliases(&select_exprs);
122
123        // Optionally the HAVING expression.
124        let having_expr_opt = select
125            .having
126            .map::<Result<Expr>, _>(|having_expr| {
127                let having_expr = self.sql_expr_to_logical_expr(
128                    having_expr,
129                    &combined_schema,
130                    planner_context,
131                )?;
132                // This step "dereferences" any aliases in the HAVING clause.
133                //
134                // This is how we support queries with HAVING expressions that
135                // refer to aliased columns.
136                //
137                // For example:
138                //
139                //   SELECT c1, MAX(c2) AS m FROM t GROUP BY c1 HAVING m > 10;
140                //
141                // are rewritten as, respectively:
142                //
143                //   SELECT c1, MAX(c2) AS m FROM t GROUP BY c1 HAVING MAX(c2) > 10;
144                //
145                let having_expr = resolve_aliases_to_exprs(having_expr, &alias_map)?;
146                normalize_col(having_expr, &projected_plan)
147            })
148            .transpose()?;
149
150        // All of the group by expressions
151        let group_by_exprs = if let GroupByExpr::Expressions(exprs, _) = select.group_by {
152            exprs
153                .into_iter()
154                .map(|e| {
155                    let group_by_expr = self.sql_expr_to_logical_expr(
156                        e,
157                        &combined_schema,
158                        planner_context,
159                    )?;
160
161                    // Aliases from the projection can conflict with same-named expressions in the input
162                    let mut alias_map = alias_map.clone();
163                    for f in base_plan.schema().fields() {
164                        alias_map.remove(f.name());
165                    }
166                    let group_by_expr =
167                        resolve_aliases_to_exprs(group_by_expr, &alias_map)?;
168                    let group_by_expr =
169                        resolve_positions_to_exprs(group_by_expr, &select_exprs)?;
170                    let group_by_expr = normalize_col(group_by_expr, &projected_plan)?;
171                    self.validate_schema_satisfies_exprs(
172                        base_plan.schema(),
173                        std::slice::from_ref(&group_by_expr),
174                    )?;
175                    Ok(group_by_expr)
176                })
177                .collect::<Result<Vec<Expr>>>()?
178        } else {
179            // 'group by all' groups wrt. all select expressions except 'AggregateFunction's.
180            // Filter and collect non-aggregate select expressions
181            select_exprs
182                .iter()
183                .filter(|select_expr| match select_expr {
184                    Expr::AggregateFunction(_) => false,
185                    Expr::Alias(Alias { expr, name: _, .. }) => {
186                        !matches!(**expr, Expr::AggregateFunction(_))
187                    }
188                    _ => true,
189                })
190                .cloned()
191                .collect()
192        };
193
194        // Optionally the QUALIFY expression.
195        let qualify_expr_opt = select
196            .qualify
197            .map::<Result<Expr>, _>(|qualify_expr| {
198                let qualify_expr = self.sql_expr_to_logical_expr(
199                    qualify_expr,
200                    &combined_schema,
201                    planner_context,
202                )?;
203                // This step "dereferences" any aliases in the QUALIFY clause.
204                //
205                // This is how we support queries with QUALIFY expressions that
206                // refer to aliased columns.
207                //
208                // For example:
209                //
210                //   select row_number() over (PARTITION BY id) as rk from users qualify rk > 1;
211                //
212                // are rewritten as, respectively:
213                //
214                //   select row_number() over (PARTITION BY id) as rk from users qualify row_number() over (PARTITION BY id) > 1;
215                //
216                let qualify_expr = resolve_aliases_to_exprs(qualify_expr, &alias_map)?;
217                normalize_col(qualify_expr, &projected_plan)
218            })
219            .transpose()?;
220
221        // The outer expressions we will search through for aggregates.
222        // Aggregates may be sourced from the SELECT list or from the HAVING expression.
223        let aggr_expr_haystack = select_exprs
224            .iter()
225            .chain(having_expr_opt.iter())
226            .chain(qualify_expr_opt.iter());
227        // All of the aggregate expressions (deduplicated).
228        let aggr_exprs = find_aggregate_exprs(aggr_expr_haystack);
229
230        // Process group by, aggregation or having
231        let (
232            plan,
233            mut select_exprs_post_aggr,
234            having_expr_post_aggr,
235            qualify_expr_post_aggr,
236        ) = if !group_by_exprs.is_empty() || !aggr_exprs.is_empty() {
237            self.aggregate(
238                &base_plan,
239                &select_exprs,
240                having_expr_opt.as_ref(),
241                qualify_expr_opt.as_ref(),
242                &group_by_exprs,
243                &aggr_exprs,
244            )?
245        } else {
246            match having_expr_opt {
247                Some(having_expr) => return plan_err!("HAVING clause references: {having_expr} must appear in the GROUP BY clause or be used in an aggregate function"),
248                None => (base_plan.clone(), select_exprs.clone(), having_expr_opt, qualify_expr_opt)
249            }
250        };
251
252        let plan = if let Some(having_expr_post_aggr) = having_expr_post_aggr {
253            LogicalPlanBuilder::from(plan)
254                .having(having_expr_post_aggr)?
255                .build()?
256        } else {
257            plan
258        };
259
260        // The outer expressions we will search through for window functions.
261        // Window functions may be sourced from the SELECT list or from the QUALIFY expression.
262        let windows_expr_haystack = select_exprs_post_aggr
263            .iter()
264            .chain(qualify_expr_post_aggr.iter());
265        // All of the window expressions (deduplicated and rewritten to reference aggregates as
266        // columns from input).
267        let window_func_exprs = find_window_exprs(windows_expr_haystack);
268
269        // Process window functions after aggregation as they can reference
270        // aggregate functions in their body
271        let plan = if window_func_exprs.is_empty() {
272            plan
273        } else {
274            let plan = LogicalPlanBuilder::window_plan(plan, window_func_exprs.clone())?;
275
276            // Re-write the projection
277            select_exprs_post_aggr = select_exprs_post_aggr
278                .iter()
279                .map(|expr| rebase_expr(expr, &window_func_exprs, &plan))
280                .collect::<Result<Vec<Expr>>>()?;
281
282            plan
283        };
284
285        // Process QUALIFY clause after window functions
286        // QUALIFY filters the results of window functions, similar to how HAVING filters aggregates
287        let plan = if let Some(qualify_expr) = qualify_expr_post_aggr {
288            // Validate that QUALIFY is used with window functions
289            if window_func_exprs.is_empty() {
290                return plan_err!(
291                    "QUALIFY clause requires window functions in the SELECT list or QUALIFY clause"
292                );
293            }
294
295            // now attempt to resolve columns and replace with fully-qualified columns
296            let windows_projection_exprs = window_func_exprs
297                .iter()
298                .map(|expr| resolve_columns(expr, &plan))
299                .collect::<Result<Vec<Expr>>>()?;
300
301            // Rewrite the qualify expression to reference columns from the window plan
302            let qualify_expr_post_window =
303                rebase_expr(&qualify_expr, &windows_projection_exprs, &plan)?;
304
305            // Validate that the qualify expression can be resolved from the window plan schema
306            self.validate_schema_satisfies_exprs(
307                plan.schema(),
308                std::slice::from_ref(&qualify_expr_post_window),
309            )?;
310
311            LogicalPlanBuilder::from(plan)
312                .filter(qualify_expr_post_window)?
313                .build()?
314        } else {
315            plan
316        };
317
318        // Try processing unnest expression or do the final projection
319        let plan = self.try_process_unnest(plan, select_exprs_post_aggr)?;
320
321        // Process distinct clause
322        let plan = match select.distinct {
323            None => Ok(plan),
324            Some(Distinct::Distinct) => {
325                LogicalPlanBuilder::from(plan).distinct()?.build()
326            }
327            Some(Distinct::On(on_expr)) => {
328                if !aggr_exprs.is_empty()
329                    || !group_by_exprs.is_empty()
330                    || !window_func_exprs.is_empty()
331                {
332                    return not_impl_err!("DISTINCT ON expressions with GROUP BY, aggregation or window functions are not supported ");
333                }
334
335                let on_expr = on_expr
336                    .into_iter()
337                    .map(|e| {
338                        self.sql_expr_to_logical_expr(e, plan.schema(), planner_context)
339                    })
340                    .collect::<Result<Vec<_>>>()?;
341
342                // Build the final plan
343                LogicalPlanBuilder::from(base_plan)
344                    .distinct_on(on_expr, select_exprs, None)?
345                    .build()
346            }
347        }?;
348
349        // DISTRIBUTE BY
350        let plan = if !select.distribute_by.is_empty() {
351            let x = select
352                .distribute_by
353                .iter()
354                .map(|e| {
355                    self.sql_expr_to_logical_expr(
356                        e.clone(),
357                        &combined_schema,
358                        planner_context,
359                    )
360                })
361                .collect::<Result<Vec<_>>>()?;
362            LogicalPlanBuilder::from(plan)
363                .repartition(Partitioning::DistributeBy(x))?
364                .build()?
365        } else {
366            plan
367        };
368
369        self.order_by(plan, order_by_rex)
370    }
371
372    /// Try converting Expr(Unnest(Expr)) to Projection/Unnest/Projection
373    pub(super) fn try_process_unnest(
374        &self,
375        input: LogicalPlan,
376        select_exprs: Vec<Expr>,
377    ) -> Result<LogicalPlan> {
378        // Try process group by unnest
379        let input = self.try_process_aggregate_unnest(input)?;
380
381        let mut intermediate_plan = input;
382        let mut intermediate_select_exprs = select_exprs;
383        // Fast path: If there is are no unnests in the select_exprs, wrap the plan in a projection
384        if !intermediate_select_exprs
385            .iter()
386            .any(has_unnest_expr_recursively)
387        {
388            return LogicalPlanBuilder::from(intermediate_plan)
389                .project(intermediate_select_exprs)?
390                .build();
391        }
392
393        // Each expr in select_exprs can contains multiple unnest stage
394        // The transformation happen bottom up, one at a time for each iteration
395        // Only exhaust the loop if no more unnest transformation is found
396        for i in 0.. {
397            let mut unnest_columns = IndexMap::new();
398            // from which column used for projection, before the unnest happen
399            // including non unnest column and unnest column
400            let mut inner_projection_exprs = vec![];
401
402            // expr returned here maybe different from the originals in inner_projection_exprs
403            // for example:
404            // - unnest(struct_col) will be transformed into unnest(struct_col).field1, unnest(struct_col).field2
405            // - unnest(array_col) will be transformed into unnest(array_col).element
406            // - unnest(array_col) + 1 will be transformed into unnest(array_col).element +1
407            let outer_projection_exprs = rewrite_recursive_unnests_bottom_up(
408                &intermediate_plan,
409                &mut unnest_columns,
410                &mut inner_projection_exprs,
411                &intermediate_select_exprs,
412            )?;
413
414            // No more unnest is possible
415            if unnest_columns.is_empty() {
416                // The original expr does not contain any unnest
417                if i == 0 {
418                    return LogicalPlanBuilder::from(intermediate_plan)
419                        .project(intermediate_select_exprs)?
420                        .build();
421                }
422                break;
423            } else {
424                // Set preserve_nulls to false to ensure compatibility with DuckDB and PostgreSQL
425                let mut unnest_options = UnnestOptions::new().with_preserve_nulls(false);
426                let mut unnest_col_vec = vec![];
427
428                for (col, maybe_list_unnest) in unnest_columns.into_iter() {
429                    if let Some(list_unnest) = maybe_list_unnest {
430                        unnest_options = list_unnest.into_iter().fold(
431                            unnest_options,
432                            |options, unnest_list| {
433                                options.with_recursions(RecursionUnnestOption {
434                                    input_column: col.clone(),
435                                    output_column: unnest_list.output_column,
436                                    depth: unnest_list.depth,
437                                })
438                            },
439                        );
440                    }
441                    unnest_col_vec.push(col);
442                }
443                let plan = LogicalPlanBuilder::from(intermediate_plan)
444                    .project(inner_projection_exprs)?
445                    .unnest_columns_with_options(unnest_col_vec, unnest_options)?
446                    .build()?;
447                intermediate_plan = plan;
448                intermediate_select_exprs = outer_projection_exprs;
449            }
450        }
451
452        LogicalPlanBuilder::from(intermediate_plan)
453            .project(intermediate_select_exprs)?
454            .build()
455    }
456
457    fn try_process_aggregate_unnest(&self, input: LogicalPlan) -> Result<LogicalPlan> {
458        match input {
459            // Fast path if there are no unnest in group by
460            LogicalPlan::Aggregate(ref agg)
461                if !&agg.group_expr.iter().any(has_unnest_expr_recursively) =>
462            {
463                Ok(input)
464            }
465            LogicalPlan::Aggregate(agg) => {
466                let agg_expr = agg.aggr_expr.clone();
467                let (new_input, new_group_by_exprs) =
468                    self.try_process_group_by_unnest(agg)?;
469                let options = LogicalPlanBuilderOptions::new()
470                    .with_add_implicit_group_by_exprs(true);
471                LogicalPlanBuilder::from(new_input)
472                    .with_options(options)
473                    .aggregate(new_group_by_exprs, agg_expr)?
474                    .build()
475            }
476            LogicalPlan::Filter(mut filter) => {
477                filter.input =
478                    Arc::new(self.try_process_aggregate_unnest(Arc::unwrap_or_clone(
479                        filter.input,
480                    ))?);
481                Ok(LogicalPlan::Filter(filter))
482            }
483            _ => Ok(input),
484        }
485    }
486
487    /// Try converting Unnest(Expr) of group by to Unnest/Projection.
488    /// Return the new input and group_by_exprs of Aggregate.
489    /// Select exprs can be different from agg exprs, for example:
490    fn try_process_group_by_unnest(
491        &self,
492        agg: Aggregate,
493    ) -> Result<(LogicalPlan, Vec<Expr>)> {
494        let mut aggr_expr_using_columns: Option<HashSet<Expr>> = None;
495
496        let Aggregate {
497            input,
498            group_expr,
499            aggr_expr,
500            ..
501        } = agg;
502
503        // Process unnest of group_by_exprs, and input of agg will be rewritten
504        // for example:
505        //
506        // ```
507        // Aggregate: groupBy=[[UNNEST(Column(Column { relation: Some(Bare { table: "tab" }), name: "array_col" }))]], aggr=[[]]
508        //   TableScan: tab
509        // ```
510        //
511        // will be transformed into
512        //
513        // ```
514        // Aggregate: groupBy=[[unnest(tab.array_col)]], aggr=[[]]
515        //   Unnest: lists[unnest(tab.array_col)] structs[]
516        //     Projection: tab.array_col AS unnest(tab.array_col)
517        //       TableScan: tab
518        // ```
519        let mut intermediate_plan = Arc::unwrap_or_clone(input);
520        let mut intermediate_select_exprs = group_expr;
521
522        loop {
523            let mut unnest_columns = IndexMap::new();
524            let mut inner_projection_exprs = vec![];
525
526            let outer_projection_exprs = rewrite_recursive_unnests_bottom_up(
527                &intermediate_plan,
528                &mut unnest_columns,
529                &mut inner_projection_exprs,
530                &intermediate_select_exprs,
531            )?;
532
533            if unnest_columns.is_empty() {
534                break;
535            } else {
536                let mut unnest_options = UnnestOptions::new().with_preserve_nulls(false);
537
538                let mut projection_exprs = match &aggr_expr_using_columns {
539                    Some(exprs) => (*exprs).clone(),
540                    None => {
541                        let mut columns = HashSet::new();
542                        for expr in &aggr_expr {
543                            expr.apply_with_lambdas_params(|expr, lambdas_params| {
544                                if let Expr::Column(c) = expr {
545                                    if !c.is_lambda_parameter(lambdas_params) {
546                                        columns.insert(Expr::Column(c.clone()));
547                                    }
548                                }
549                                Ok(TreeNodeRecursion::Continue)
550                            })
551                            // As the closure always returns Ok, this "can't" error
552                            .expect("Unexpected error");
553                        }
554                        aggr_expr_using_columns = Some(columns.clone());
555                        columns
556                    }
557                };
558                projection_exprs.extend(inner_projection_exprs);
559
560                let mut unnest_col_vec = vec![];
561
562                for (col, maybe_list_unnest) in unnest_columns.into_iter() {
563                    if let Some(list_unnest) = maybe_list_unnest {
564                        unnest_options = list_unnest.into_iter().fold(
565                            unnest_options,
566                            |options, unnest_list| {
567                                options.with_recursions(RecursionUnnestOption {
568                                    input_column: col.clone(),
569                                    output_column: unnest_list.output_column,
570                                    depth: unnest_list.depth,
571                                })
572                            },
573                        );
574                    }
575                    unnest_col_vec.push(col);
576                }
577
578                intermediate_plan = LogicalPlanBuilder::from(intermediate_plan)
579                    .project(projection_exprs)?
580                    .unnest_columns_with_options(unnest_col_vec, unnest_options)?
581                    .build()?;
582
583                intermediate_select_exprs = outer_projection_exprs;
584            }
585        }
586
587        Ok((intermediate_plan, intermediate_select_exprs))
588    }
589
590    pub(crate) fn plan_selection(
591        &self,
592        selection: Option<SQLExpr>,
593        plan: LogicalPlan,
594        planner_context: &mut PlannerContext,
595    ) -> Result<LogicalPlan> {
596        match selection {
597            Some(predicate_expr) => {
598                let fallback_schemas = plan.fallback_normalize_schemas();
599                let outer_query_schema = planner_context.outer_query_schema().cloned();
600                let outer_query_schema_vec = outer_query_schema
601                    .as_ref()
602                    .map(|schema| vec![schema])
603                    .unwrap_or_else(Vec::new);
604
605                let filter_expr =
606                    self.sql_to_expr(predicate_expr, plan.schema(), planner_context)?;
607
608                // Check for aggregation functions
609                let aggregate_exprs =
610                    find_aggregate_exprs(std::slice::from_ref(&filter_expr));
611                if !aggregate_exprs.is_empty() {
612                    return plan_err!(
613                        "Aggregate functions are not allowed in the WHERE clause. Consider using HAVING instead"
614                    );
615                }
616
617                let mut using_columns = HashSet::new();
618                expr_to_columns(&filter_expr, &mut using_columns)?;
619                let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
620                    filter_expr,
621                    &[&[plan.schema()], &fallback_schemas, &outer_query_schema_vec],
622                    &[using_columns],
623                )?;
624
625                Ok(LogicalPlan::Filter(Filter::try_new(
626                    filter_expr,
627                    Arc::new(plan),
628                )?))
629            }
630            None => Ok(plan),
631        }
632    }
633
634    pub(crate) fn plan_from_tables(
635        &self,
636        mut from: Vec<TableWithJoins>,
637        planner_context: &mut PlannerContext,
638    ) -> Result<LogicalPlan> {
639        match from.len() {
640            0 => Ok(LogicalPlanBuilder::empty(true).build()?),
641            1 => {
642                let input = from.remove(0);
643                self.plan_table_with_joins(input, planner_context)
644            }
645            _ => {
646                let mut from = from.into_iter();
647
648                let mut left = LogicalPlanBuilder::from({
649                    let input = from.next().unwrap();
650                    self.plan_table_with_joins(input, planner_context)?
651                });
652                let old_outer_from_schema = {
653                    let left_schema = Some(Arc::clone(left.schema()));
654                    planner_context.set_outer_from_schema(left_schema)
655                };
656                for input in from {
657                    // Join `input` with the current result (`left`).
658                    let right = self.plan_table_with_joins(input, planner_context)?;
659                    left = left.cross_join(right)?;
660                    // Update the outer FROM schema.
661                    let left_schema = Some(Arc::clone(left.schema()));
662                    planner_context.set_outer_from_schema(left_schema);
663                }
664                planner_context.set_outer_from_schema(old_outer_from_schema);
665                left.build()
666            }
667        }
668    }
669
670    /// Returns the `Expr`'s corresponding to a SQL query's SELECT expressions.
671    pub(crate) fn prepare_select_exprs(
672        &self,
673        plan: &LogicalPlan,
674        projection: Vec<SelectItem>,
675        empty_from: bool,
676        planner_context: &mut PlannerContext,
677    ) -> Result<Vec<SelectExpr>> {
678        let mut prepared_select_exprs = vec![];
679        let mut error_builder = DataFusionErrorBuilder::new();
680
681        for expr in projection {
682            match self.sql_select_to_rex(expr, plan, empty_from, planner_context) {
683                Ok(expr) => prepared_select_exprs.push(expr),
684                Err(err) => error_builder.add_error(err),
685            }
686        }
687        error_builder.error_or(prepared_select_exprs)
688    }
689
690    /// Generate a relational expression from a select SQL expression
691    fn sql_select_to_rex(
692        &self,
693        sql: SelectItem,
694        plan: &LogicalPlan,
695        empty_from: bool,
696        planner_context: &mut PlannerContext,
697    ) -> Result<SelectExpr> {
698        match sql {
699            SelectItem::UnnamedExpr(expr) => {
700                let expr = self.sql_to_expr(expr, plan.schema(), planner_context)?;
701                let col = normalize_col_with_schemas_and_ambiguity_check(
702                    expr,
703                    &[&[plan.schema()]],
704                    &plan.using_columns()?,
705                )?;
706
707                Ok(SelectExpr::Expression(col))
708            }
709            SelectItem::ExprWithAlias { expr, alias } => {
710                let select_expr =
711                    self.sql_to_expr(expr, plan.schema(), planner_context)?;
712                let col = normalize_col_with_schemas_and_ambiguity_check(
713                    select_expr,
714                    &[&[plan.schema()]],
715                    &plan.using_columns()?,
716                )?;
717                let name = self.ident_normalizer.normalize(alias);
718                // avoiding adding an alias if the column name is the same.
719                let expr = match &col {
720                    Expr::Column(column) if column.name.eq(&name) => col,
721                    _ => col.alias(name),
722                };
723
724                Ok(SelectExpr::Expression(expr))
725            }
726            SelectItem::Wildcard(options) => {
727                Self::check_wildcard_options(&options)?;
728                if empty_from {
729                    return plan_err!("SELECT * with no tables specified is not valid");
730                }
731                let planned_options = self.plan_wildcard_options(
732                    plan,
733                    empty_from,
734                    planner_context,
735                    options,
736                )?;
737
738                Ok(SelectExpr::Wildcard(planned_options))
739            }
740            SelectItem::QualifiedWildcard(object_name, options) => {
741                Self::check_wildcard_options(&options)?;
742                let object_name = match object_name {
743                    SelectItemQualifiedWildcardKind::ObjectName(object_name) => {
744                        object_name
745                    }
746                    SelectItemQualifiedWildcardKind::Expr(_) => {
747                        return plan_err!(
748                            "Qualified wildcard with expression not supported"
749                        )
750                    }
751                };
752                let qualifier = self.object_name_to_table_reference(object_name)?;
753                let planned_options = self.plan_wildcard_options(
754                    plan,
755                    empty_from,
756                    planner_context,
757                    options,
758                )?;
759
760                Ok(SelectExpr::QualifiedWildcard(qualifier, planned_options))
761            }
762        }
763    }
764
765    fn check_wildcard_options(options: &WildcardAdditionalOptions) -> Result<()> {
766        let WildcardAdditionalOptions {
767            // opt_exclude is handled
768            opt_exclude: _opt_exclude,
769            opt_except: _opt_except,
770            opt_rename,
771            opt_replace: _opt_replace,
772            opt_ilike: _opt_ilike,
773            wildcard_token: _wildcard_token,
774        } = options;
775
776        if opt_rename.is_some() {
777            not_impl_err!("wildcard * with RENAME not supported ")
778        } else {
779            Ok(())
780        }
781    }
782
783    /// If there is a REPLACE statement in the projected expression in the form of
784    /// "REPLACE (some_column_within_an_expr AS some_column)", we should plan the
785    /// replace expressions first.
786    fn plan_wildcard_options(
787        &self,
788        plan: &LogicalPlan,
789        empty_from: bool,
790        planner_context: &mut PlannerContext,
791        options: WildcardAdditionalOptions,
792    ) -> Result<WildcardOptions> {
793        let planned_option = WildcardOptions {
794            ilike: options.opt_ilike,
795            exclude: options.opt_exclude,
796            except: options.opt_except,
797            replace: None,
798            rename: options.opt_rename,
799        };
800        if let Some(replace) = options.opt_replace {
801            let replace_expr = replace
802                .items
803                .iter()
804                .map(|item| {
805                    self.sql_select_to_rex(
806                        SelectItem::UnnamedExpr(item.expr.clone()),
807                        plan,
808                        empty_from,
809                        planner_context,
810                    )
811                })
812                .collect::<Result<Vec<_>>>()?
813                .into_iter()
814                .filter_map(|expr| match expr {
815                    SelectExpr::Expression(expr) => Some(expr),
816                    _ => None,
817                })
818                .collect::<Vec<_>>();
819
820            let planned_replace = PlannedReplaceSelectItem {
821                items: replace.items.into_iter().map(|i| *i).collect(),
822                planned_expressions: replace_expr,
823            };
824            Ok(planned_option.with_replace(planned_replace))
825        } else {
826            Ok(planned_option)
827        }
828    }
829
830    /// Wrap a plan in a projection
831    pub(crate) fn project(
832        &self,
833        input: LogicalPlan,
834        expr: Vec<SelectExpr>,
835    ) -> Result<LogicalPlan> {
836        // convert to Expr for validate_schema_satisfies_exprs
837        let exprs = expr
838            .iter()
839            .filter_map(|e| match e {
840                SelectExpr::Expression(expr) => Some(expr.to_owned()),
841                _ => None,
842            })
843            .collect::<Vec<_>>();
844        self.validate_schema_satisfies_exprs(input.schema(), &exprs)?;
845
846        LogicalPlanBuilder::from(input).project(expr)?.build()
847    }
848
849    /// Create an aggregate plan.
850    ///
851    /// An aggregate plan consists of grouping expressions, aggregate expressions, an
852    /// optional HAVING expression (which is a filter on the output of the aggregate),
853    /// and an optional QUALIFY clause which may reference aggregates.
854    ///
855    /// # Arguments
856    ///
857    /// * `input`           - The input plan that will be aggregated. The grouping, aggregate, and
858    ///   "having" expressions must all be resolvable from this plan.
859    /// * `select_exprs`    - The projection expressions from the SELECT clause.
860    /// * `having_expr_opt` - Optional HAVING clause.
861    /// * `qualify_expr_opt` - Optional QUALIFY clause.
862    /// * `group_by_exprs`  - Grouping expressions from the GROUP BY clause. These can be column
863    ///   references or more complex expressions.
864    /// * `aggr_exprs`      - Aggregate expressions, such as `SUM(a)` or `COUNT(1)`.
865    ///
866    /// # Return
867    ///
868    /// The return value is a quadruplet of the following items:
869    ///
870    /// * `plan`                   - A [LogicalPlan::Aggregate] plan for the newly created aggregate.
871    /// * `select_exprs_post_aggr` - The projection expressions rewritten to reference columns from
872    ///   the aggregate
873    /// * `having_expr_post_aggr`  - The "having" expression rewritten to reference a column from
874    ///   the aggregate
875    /// * `qualify_expr_post_aggr`  - The "qualify" expression rewritten to reference a column from
876    ///   the aggregate
877    #[allow(clippy::type_complexity)]
878    fn aggregate(
879        &self,
880        input: &LogicalPlan,
881        select_exprs: &[Expr],
882        having_expr_opt: Option<&Expr>,
883        qualify_expr_opt: Option<&Expr>,
884        group_by_exprs: &[Expr],
885        aggr_exprs: &[Expr],
886    ) -> Result<(LogicalPlan, Vec<Expr>, Option<Expr>, Option<Expr>)> {
887        // create the aggregate plan
888        let options =
889            LogicalPlanBuilderOptions::new().with_add_implicit_group_by_exprs(true);
890        let plan = LogicalPlanBuilder::from(input.clone())
891            .with_options(options)
892            .aggregate(group_by_exprs.to_vec(), aggr_exprs.to_vec())?
893            .build()?;
894        let group_by_exprs = if let LogicalPlan::Aggregate(agg) = &plan {
895            &agg.group_expr
896        } else {
897            unreachable!();
898        };
899
900        // in this next section of code we are re-writing the projection to refer to columns
901        // output by the aggregate plan. For example, if the projection contains the expression
902        // `SUM(a)` then we replace that with a reference to a column `SUM(a)` produced by
903        // the aggregate plan.
904
905        // combine the original grouping and aggregate expressions into one list (note that
906        // we do not add the "having" expression since that is not part of the projection)
907        let mut aggr_projection_exprs = vec![];
908        for expr in group_by_exprs {
909            match expr {
910                Expr::GroupingSet(GroupingSet::Rollup(exprs)) => {
911                    aggr_projection_exprs.extend_from_slice(exprs)
912                }
913                Expr::GroupingSet(GroupingSet::Cube(exprs)) => {
914                    aggr_projection_exprs.extend_from_slice(exprs)
915                }
916                Expr::GroupingSet(GroupingSet::GroupingSets(lists_of_exprs)) => {
917                    for exprs in lists_of_exprs {
918                        aggr_projection_exprs.extend_from_slice(exprs)
919                    }
920                }
921                _ => aggr_projection_exprs.push(expr.clone()),
922            }
923        }
924        aggr_projection_exprs.extend_from_slice(aggr_exprs);
925
926        // now attempt to resolve columns and replace with fully-qualified columns
927        let aggr_projection_exprs = aggr_projection_exprs
928            .iter()
929            .map(|expr| resolve_columns(expr, input))
930            .collect::<Result<Vec<Expr>>>()?;
931
932        // next we replace any expressions that are not a column with a column referencing
933        // an output column from the aggregate schema
934        let column_exprs_post_aggr = aggr_projection_exprs
935            .iter()
936            .map(|expr| expr_as_column_expr(expr, input))
937            .collect::<Result<Vec<Expr>>>()?;
938
939        // next we re-write the projection
940        let select_exprs_post_aggr = select_exprs
941            .iter()
942            .map(|expr| rebase_expr(expr, &aggr_projection_exprs, input))
943            .collect::<Result<Vec<Expr>>>()?;
944
945        // finally, we have some validation that the re-written projection can be resolved
946        // from the aggregate output columns
947        check_columns_satisfy_exprs(
948            &column_exprs_post_aggr,
949            &select_exprs_post_aggr,
950            CheckColumnsSatisfyExprsPurpose::Aggregate(
951                CheckColumnsMustReferenceAggregatePurpose::Projection,
952            ),
953        )?;
954
955        // Rewrite the HAVING expression to use the columns produced by the
956        // aggregation.
957        let having_expr_post_aggr = if let Some(having_expr) = having_expr_opt {
958            let having_expr_post_aggr =
959                rebase_expr(having_expr, &aggr_projection_exprs, input)?;
960
961            check_columns_satisfy_exprs(
962                &column_exprs_post_aggr,
963                std::slice::from_ref(&having_expr_post_aggr),
964                CheckColumnsSatisfyExprsPurpose::Aggregate(
965                    CheckColumnsMustReferenceAggregatePurpose::Having,
966                ),
967            )?;
968
969            Some(having_expr_post_aggr)
970        } else {
971            None
972        };
973
974        // Rewrite the QUALIFY expression to use the columns produced by the
975        // aggregation.
976        let qualify_expr_post_aggr = if let Some(qualify_expr) = qualify_expr_opt {
977            let qualify_expr_post_aggr =
978                rebase_expr(qualify_expr, &aggr_projection_exprs, input)?;
979
980            check_columns_satisfy_exprs(
981                &column_exprs_post_aggr,
982                std::slice::from_ref(&qualify_expr_post_aggr),
983                CheckColumnsSatisfyExprsPurpose::Aggregate(
984                    CheckColumnsMustReferenceAggregatePurpose::Qualify,
985                ),
986            )?;
987
988            Some(qualify_expr_post_aggr)
989        } else {
990            None
991        };
992
993        Ok((
994            plan,
995            select_exprs_post_aggr,
996            having_expr_post_aggr,
997            qualify_expr_post_aggr,
998        ))
999    }
1000
1001    // If the projection is done over a named window, that window
1002    // name must be defined. Otherwise, it gives an error.
1003    fn match_window_definitions(
1004        &self,
1005        projection: &mut [SelectItem],
1006        named_windows: &[NamedWindowDefinition],
1007    ) -> Result<()> {
1008        let named_windows: Vec<(&NamedWindowDefinition, String)> = named_windows
1009            .iter()
1010            .map(|w| (w, self.ident_normalizer.normalize(w.0.clone())))
1011            .collect();
1012        for proj in projection.iter_mut() {
1013            if let SelectItem::ExprWithAlias { expr, alias: _ }
1014            | SelectItem::UnnamedExpr(expr) = proj
1015            {
1016                let mut err = None;
1017                let _ = visit_expressions_mut(expr, |expr| {
1018                    if let SQLExpr::Function(f) = expr {
1019                        if let Some(WindowType::NamedWindow(ident)) = &f.over {
1020                            let normalized_ident =
1021                                self.ident_normalizer.normalize(ident.clone());
1022                            for (
1023                                NamedWindowDefinition(_, window_expr),
1024                                normalized_window_ident,
1025                            ) in named_windows.iter()
1026                            {
1027                                if normalized_ident.eq(normalized_window_ident) {
1028                                    f.over = Some(match window_expr {
1029                                        NamedWindowExpr::NamedWindow(ident) => {
1030                                            WindowType::NamedWindow(ident.clone())
1031                                        }
1032                                        NamedWindowExpr::WindowSpec(spec) => {
1033                                            WindowType::WindowSpec(spec.clone())
1034                                        }
1035                                    })
1036                                }
1037                            }
1038                            // All named windows must be defined with a WindowSpec.
1039                            if let Some(WindowType::NamedWindow(ident)) = &f.over {
1040                                err =
1041                                    Some(plan_err!("The window {ident} is not defined!"));
1042                                return ControlFlow::Break(());
1043                            }
1044                        }
1045                    }
1046                    ControlFlow::Continue(())
1047                });
1048                if let Some(err) = err {
1049                    return err;
1050                }
1051            }
1052        }
1053        Ok(())
1054    }
1055}
1056
1057// If there are any multiple-defined windows, we raise an error.
1058fn check_conflicting_windows(window_defs: &[NamedWindowDefinition]) -> Result<()> {
1059    for (i, window_def_i) in window_defs.iter().enumerate() {
1060        for window_def_j in window_defs.iter().skip(i + 1) {
1061            if window_def_i.0 == window_def_j.0 {
1062                return plan_err!(
1063                    "The window {} is defined multiple times!",
1064                    window_def_i.0
1065                );
1066            }
1067        }
1068    }
1069    Ok(())
1070}
1071
1072/// Returns true if the expression recursively contains an `Expr::Unnest` expression
1073fn has_unnest_expr_recursively(expr: &Expr) -> bool {
1074    let mut has_unnest = false;
1075    let _ = expr.apply(|e| {
1076        if let Expr::Unnest(_) = e {
1077            has_unnest = true;
1078            Ok(TreeNodeRecursion::Stop)
1079        } else {
1080            Ok(TreeNodeRecursion::Continue)
1081        }
1082    });
1083    has_unnest
1084}