1use std::collections::HashSet;
19use std::ops::ControlFlow;
20use std::sync::Arc;
21
22use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
23use crate::query::to_order_by_exprs_with_select;
24use crate::utils::{
25 check_columns_satisfy_exprs, extract_aliases, rebase_expr, resolve_aliases_to_exprs,
26 resolve_columns, resolve_positions_to_exprs, rewrite_recursive_unnests_bottom_up,
27 CheckColumnsMustReferenceAggregatePurpose, CheckColumnsSatisfyExprsPurpose,
28};
29
30use datafusion_common::error::DataFusionErrorBuilder;
31use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
32use datafusion_common::{not_impl_err, plan_err, Result};
33use datafusion_common::{RecursionUnnestOption, UnnestOptions};
34use datafusion_expr::expr::{Alias, PlannedReplaceSelectItem, WildcardOptions};
35use datafusion_expr::expr_rewriter::{
36 normalize_col, normalize_col_with_schemas_and_ambiguity_check, normalize_sorts,
37};
38use datafusion_expr::select_expr::SelectExpr;
39use datafusion_expr::utils::{
40 expr_as_column_expr, expr_to_columns, find_aggregate_exprs, find_window_exprs,
41};
42use datafusion_expr::{
43 Aggregate, Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder,
44 LogicalPlanBuilderOptions, Partitioning,
45};
46
47use indexmap::IndexMap;
48use sqlparser::ast::{
49 visit_expressions_mut, Distinct, Expr as SQLExpr, GroupByExpr, NamedWindowExpr,
50 OrderBy, SelectItemQualifiedWildcardKind, WildcardAdditionalOptions, WindowType,
51};
52use sqlparser::ast::{NamedWindowDefinition, Select, SelectItem, TableWithJoins};
53
54impl<S: ContextProvider> SqlToRel<'_, S> {
55 pub(super) fn select_to_plan(
57 &self,
58 mut select: Select,
59 query_order_by: Option<OrderBy>,
60 planner_context: &mut PlannerContext,
61 ) -> Result<LogicalPlan> {
62 if !select.cluster_by.is_empty() {
64 return not_impl_err!("CLUSTER BY");
65 }
66 if !select.lateral_views.is_empty() {
67 return not_impl_err!("LATERAL VIEWS");
68 }
69
70 if select.top.is_some() {
71 return not_impl_err!("TOP");
72 }
73 if !select.sort_by.is_empty() {
74 return not_impl_err!("SORT BY");
75 }
76
77 let plan = self.plan_from_tables(select.from, planner_context)?;
79 let empty_from = matches!(plan, LogicalPlan::EmptyRelation(_));
80
81 let base_plan = self.plan_selection(select.selection, plan, planner_context)?;
83
84 check_conflicting_windows(&select.named_window)?;
86 self.match_window_definitions(&mut select.projection, &select.named_window)?;
87
88 let select_exprs = self.prepare_select_exprs(
90 &base_plan,
91 select.projection,
92 empty_from,
93 planner_context,
94 )?;
95
96 let projected_plan = self.project(base_plan.clone(), select_exprs)?;
98 let select_exprs = projected_plan.expressions();
99
100 let order_by =
101 to_order_by_exprs_with_select(query_order_by, Some(&select_exprs))?;
102
103 let mut combined_schema = base_plan.schema().as_ref().clone();
107 combined_schema.merge(projected_plan.schema());
108
109 let order_by_rex = self.order_by_to_sort_expr(
112 order_by,
113 projected_plan.schema().as_ref(),
114 planner_context,
115 true,
116 Some(base_plan.schema().as_ref()),
117 )?;
118 let order_by_rex = normalize_sorts(order_by_rex, &projected_plan)?;
119
120 let alias_map = extract_aliases(&select_exprs);
122
123 let having_expr_opt = select
125 .having
126 .map::<Result<Expr>, _>(|having_expr| {
127 let having_expr = self.sql_expr_to_logical_expr(
128 having_expr,
129 &combined_schema,
130 planner_context,
131 )?;
132 let having_expr = resolve_aliases_to_exprs(having_expr, &alias_map)?;
146 normalize_col(having_expr, &projected_plan)
147 })
148 .transpose()?;
149
150 let group_by_exprs = if let GroupByExpr::Expressions(exprs, _) = select.group_by {
152 exprs
153 .into_iter()
154 .map(|e| {
155 let group_by_expr = self.sql_expr_to_logical_expr(
156 e,
157 &combined_schema,
158 planner_context,
159 )?;
160
161 let mut alias_map = alias_map.clone();
163 for f in base_plan.schema().fields() {
164 alias_map.remove(f.name());
165 }
166 let group_by_expr =
167 resolve_aliases_to_exprs(group_by_expr, &alias_map)?;
168 let group_by_expr =
169 resolve_positions_to_exprs(group_by_expr, &select_exprs)?;
170 let group_by_expr = normalize_col(group_by_expr, &projected_plan)?;
171 self.validate_schema_satisfies_exprs(
172 base_plan.schema(),
173 std::slice::from_ref(&group_by_expr),
174 )?;
175 Ok(group_by_expr)
176 })
177 .collect::<Result<Vec<Expr>>>()?
178 } else {
179 select_exprs
182 .iter()
183 .filter(|select_expr| match select_expr {
184 Expr::AggregateFunction(_) => false,
185 Expr::Alias(Alias { expr, name: _, .. }) => {
186 !matches!(**expr, Expr::AggregateFunction(_))
187 }
188 _ => true,
189 })
190 .cloned()
191 .collect()
192 };
193
194 let qualify_expr_opt = select
196 .qualify
197 .map::<Result<Expr>, _>(|qualify_expr| {
198 let qualify_expr = self.sql_expr_to_logical_expr(
199 qualify_expr,
200 &combined_schema,
201 planner_context,
202 )?;
203 let qualify_expr = resolve_aliases_to_exprs(qualify_expr, &alias_map)?;
217 normalize_col(qualify_expr, &projected_plan)
218 })
219 .transpose()?;
220
221 let aggr_expr_haystack = select_exprs
224 .iter()
225 .chain(having_expr_opt.iter())
226 .chain(qualify_expr_opt.iter());
227 let aggr_exprs = find_aggregate_exprs(aggr_expr_haystack);
229
230 let (
232 plan,
233 mut select_exprs_post_aggr,
234 having_expr_post_aggr,
235 qualify_expr_post_aggr,
236 ) = if !group_by_exprs.is_empty() || !aggr_exprs.is_empty() {
237 self.aggregate(
238 &base_plan,
239 &select_exprs,
240 having_expr_opt.as_ref(),
241 qualify_expr_opt.as_ref(),
242 &group_by_exprs,
243 &aggr_exprs,
244 )?
245 } else {
246 match having_expr_opt {
247 Some(having_expr) => return plan_err!("HAVING clause references: {having_expr} must appear in the GROUP BY clause or be used in an aggregate function"),
248 None => (base_plan.clone(), select_exprs.clone(), having_expr_opt, qualify_expr_opt)
249 }
250 };
251
252 let plan = if let Some(having_expr_post_aggr) = having_expr_post_aggr {
253 LogicalPlanBuilder::from(plan)
254 .having(having_expr_post_aggr)?
255 .build()?
256 } else {
257 plan
258 };
259
260 let windows_expr_haystack = select_exprs_post_aggr
263 .iter()
264 .chain(qualify_expr_post_aggr.iter());
265 let window_func_exprs = find_window_exprs(windows_expr_haystack);
268
269 let plan = if window_func_exprs.is_empty() {
272 plan
273 } else {
274 let plan = LogicalPlanBuilder::window_plan(plan, window_func_exprs.clone())?;
275
276 select_exprs_post_aggr = select_exprs_post_aggr
278 .iter()
279 .map(|expr| rebase_expr(expr, &window_func_exprs, &plan))
280 .collect::<Result<Vec<Expr>>>()?;
281
282 plan
283 };
284
285 let plan = if let Some(qualify_expr) = qualify_expr_post_aggr {
288 if window_func_exprs.is_empty() {
290 return plan_err!(
291 "QUALIFY clause requires window functions in the SELECT list or QUALIFY clause"
292 );
293 }
294
295 let windows_projection_exprs = window_func_exprs
297 .iter()
298 .map(|expr| resolve_columns(expr, &plan))
299 .collect::<Result<Vec<Expr>>>()?;
300
301 let qualify_expr_post_window =
303 rebase_expr(&qualify_expr, &windows_projection_exprs, &plan)?;
304
305 self.validate_schema_satisfies_exprs(
307 plan.schema(),
308 std::slice::from_ref(&qualify_expr_post_window),
309 )?;
310
311 LogicalPlanBuilder::from(plan)
312 .filter(qualify_expr_post_window)?
313 .build()?
314 } else {
315 plan
316 };
317
318 let plan = self.try_process_unnest(plan, select_exprs_post_aggr)?;
320
321 let plan = match select.distinct {
323 None => Ok(plan),
324 Some(Distinct::Distinct) => {
325 LogicalPlanBuilder::from(plan).distinct()?.build()
326 }
327 Some(Distinct::On(on_expr)) => {
328 if !aggr_exprs.is_empty()
329 || !group_by_exprs.is_empty()
330 || !window_func_exprs.is_empty()
331 {
332 return not_impl_err!("DISTINCT ON expressions with GROUP BY, aggregation or window functions are not supported ");
333 }
334
335 let on_expr = on_expr
336 .into_iter()
337 .map(|e| {
338 self.sql_expr_to_logical_expr(e, plan.schema(), planner_context)
339 })
340 .collect::<Result<Vec<_>>>()?;
341
342 LogicalPlanBuilder::from(base_plan)
344 .distinct_on(on_expr, select_exprs, None)?
345 .build()
346 }
347 }?;
348
349 let plan = if !select.distribute_by.is_empty() {
351 let x = select
352 .distribute_by
353 .iter()
354 .map(|e| {
355 self.sql_expr_to_logical_expr(
356 e.clone(),
357 &combined_schema,
358 planner_context,
359 )
360 })
361 .collect::<Result<Vec<_>>>()?;
362 LogicalPlanBuilder::from(plan)
363 .repartition(Partitioning::DistributeBy(x))?
364 .build()?
365 } else {
366 plan
367 };
368
369 self.order_by(plan, order_by_rex)
370 }
371
372 pub(super) fn try_process_unnest(
374 &self,
375 input: LogicalPlan,
376 select_exprs: Vec<Expr>,
377 ) -> Result<LogicalPlan> {
378 let input = self.try_process_aggregate_unnest(input)?;
380
381 let mut intermediate_plan = input;
382 let mut intermediate_select_exprs = select_exprs;
383 if !intermediate_select_exprs
385 .iter()
386 .any(has_unnest_expr_recursively)
387 {
388 return LogicalPlanBuilder::from(intermediate_plan)
389 .project(intermediate_select_exprs)?
390 .build();
391 }
392
393 for i in 0.. {
397 let mut unnest_columns = IndexMap::new();
398 let mut inner_projection_exprs = vec![];
401
402 let outer_projection_exprs = rewrite_recursive_unnests_bottom_up(
408 &intermediate_plan,
409 &mut unnest_columns,
410 &mut inner_projection_exprs,
411 &intermediate_select_exprs,
412 )?;
413
414 if unnest_columns.is_empty() {
416 if i == 0 {
418 return LogicalPlanBuilder::from(intermediate_plan)
419 .project(intermediate_select_exprs)?
420 .build();
421 }
422 break;
423 } else {
424 let mut unnest_options = UnnestOptions::new().with_preserve_nulls(false);
426 let mut unnest_col_vec = vec![];
427
428 for (col, maybe_list_unnest) in unnest_columns.into_iter() {
429 if let Some(list_unnest) = maybe_list_unnest {
430 unnest_options = list_unnest.into_iter().fold(
431 unnest_options,
432 |options, unnest_list| {
433 options.with_recursions(RecursionUnnestOption {
434 input_column: col.clone(),
435 output_column: unnest_list.output_column,
436 depth: unnest_list.depth,
437 })
438 },
439 );
440 }
441 unnest_col_vec.push(col);
442 }
443 let plan = LogicalPlanBuilder::from(intermediate_plan)
444 .project(inner_projection_exprs)?
445 .unnest_columns_with_options(unnest_col_vec, unnest_options)?
446 .build()?;
447 intermediate_plan = plan;
448 intermediate_select_exprs = outer_projection_exprs;
449 }
450 }
451
452 LogicalPlanBuilder::from(intermediate_plan)
453 .project(intermediate_select_exprs)?
454 .build()
455 }
456
457 fn try_process_aggregate_unnest(&self, input: LogicalPlan) -> Result<LogicalPlan> {
458 match input {
459 LogicalPlan::Aggregate(ref agg)
461 if !&agg.group_expr.iter().any(has_unnest_expr_recursively) =>
462 {
463 Ok(input)
464 }
465 LogicalPlan::Aggregate(agg) => {
466 let agg_expr = agg.aggr_expr.clone();
467 let (new_input, new_group_by_exprs) =
468 self.try_process_group_by_unnest(agg)?;
469 let options = LogicalPlanBuilderOptions::new()
470 .with_add_implicit_group_by_exprs(true);
471 LogicalPlanBuilder::from(new_input)
472 .with_options(options)
473 .aggregate(new_group_by_exprs, agg_expr)?
474 .build()
475 }
476 LogicalPlan::Filter(mut filter) => {
477 filter.input =
478 Arc::new(self.try_process_aggregate_unnest(Arc::unwrap_or_clone(
479 filter.input,
480 ))?);
481 Ok(LogicalPlan::Filter(filter))
482 }
483 _ => Ok(input),
484 }
485 }
486
487 fn try_process_group_by_unnest(
491 &self,
492 agg: Aggregate,
493 ) -> Result<(LogicalPlan, Vec<Expr>)> {
494 let mut aggr_expr_using_columns: Option<HashSet<Expr>> = None;
495
496 let Aggregate {
497 input,
498 group_expr,
499 aggr_expr,
500 ..
501 } = agg;
502
503 let mut intermediate_plan = Arc::unwrap_or_clone(input);
520 let mut intermediate_select_exprs = group_expr;
521
522 loop {
523 let mut unnest_columns = IndexMap::new();
524 let mut inner_projection_exprs = vec![];
525
526 let outer_projection_exprs = rewrite_recursive_unnests_bottom_up(
527 &intermediate_plan,
528 &mut unnest_columns,
529 &mut inner_projection_exprs,
530 &intermediate_select_exprs,
531 )?;
532
533 if unnest_columns.is_empty() {
534 break;
535 } else {
536 let mut unnest_options = UnnestOptions::new().with_preserve_nulls(false);
537
538 let mut projection_exprs = match &aggr_expr_using_columns {
539 Some(exprs) => (*exprs).clone(),
540 None => {
541 let mut columns = HashSet::new();
542 for expr in &aggr_expr {
543 expr.apply_with_lambdas_params(|expr, lambdas_params| {
544 if let Expr::Column(c) = expr {
545 if !c.is_lambda_parameter(lambdas_params) {
546 columns.insert(Expr::Column(c.clone()));
547 }
548 }
549 Ok(TreeNodeRecursion::Continue)
550 })
551 .expect("Unexpected error");
553 }
554 aggr_expr_using_columns = Some(columns.clone());
555 columns
556 }
557 };
558 projection_exprs.extend(inner_projection_exprs);
559
560 let mut unnest_col_vec = vec![];
561
562 for (col, maybe_list_unnest) in unnest_columns.into_iter() {
563 if let Some(list_unnest) = maybe_list_unnest {
564 unnest_options = list_unnest.into_iter().fold(
565 unnest_options,
566 |options, unnest_list| {
567 options.with_recursions(RecursionUnnestOption {
568 input_column: col.clone(),
569 output_column: unnest_list.output_column,
570 depth: unnest_list.depth,
571 })
572 },
573 );
574 }
575 unnest_col_vec.push(col);
576 }
577
578 intermediate_plan = LogicalPlanBuilder::from(intermediate_plan)
579 .project(projection_exprs)?
580 .unnest_columns_with_options(unnest_col_vec, unnest_options)?
581 .build()?;
582
583 intermediate_select_exprs = outer_projection_exprs;
584 }
585 }
586
587 Ok((intermediate_plan, intermediate_select_exprs))
588 }
589
590 pub(crate) fn plan_selection(
591 &self,
592 selection: Option<SQLExpr>,
593 plan: LogicalPlan,
594 planner_context: &mut PlannerContext,
595 ) -> Result<LogicalPlan> {
596 match selection {
597 Some(predicate_expr) => {
598 let fallback_schemas = plan.fallback_normalize_schemas();
599 let outer_query_schema = planner_context.outer_query_schema().cloned();
600 let outer_query_schema_vec = outer_query_schema
601 .as_ref()
602 .map(|schema| vec![schema])
603 .unwrap_or_else(Vec::new);
604
605 let filter_expr =
606 self.sql_to_expr(predicate_expr, plan.schema(), planner_context)?;
607
608 let aggregate_exprs =
610 find_aggregate_exprs(std::slice::from_ref(&filter_expr));
611 if !aggregate_exprs.is_empty() {
612 return plan_err!(
613 "Aggregate functions are not allowed in the WHERE clause. Consider using HAVING instead"
614 );
615 }
616
617 let mut using_columns = HashSet::new();
618 expr_to_columns(&filter_expr, &mut using_columns)?;
619 let filter_expr = normalize_col_with_schemas_and_ambiguity_check(
620 filter_expr,
621 &[&[plan.schema()], &fallback_schemas, &outer_query_schema_vec],
622 &[using_columns],
623 )?;
624
625 Ok(LogicalPlan::Filter(Filter::try_new(
626 filter_expr,
627 Arc::new(plan),
628 )?))
629 }
630 None => Ok(plan),
631 }
632 }
633
634 pub(crate) fn plan_from_tables(
635 &self,
636 mut from: Vec<TableWithJoins>,
637 planner_context: &mut PlannerContext,
638 ) -> Result<LogicalPlan> {
639 match from.len() {
640 0 => Ok(LogicalPlanBuilder::empty(true).build()?),
641 1 => {
642 let input = from.remove(0);
643 self.plan_table_with_joins(input, planner_context)
644 }
645 _ => {
646 let mut from = from.into_iter();
647
648 let mut left = LogicalPlanBuilder::from({
649 let input = from.next().unwrap();
650 self.plan_table_with_joins(input, planner_context)?
651 });
652 let old_outer_from_schema = {
653 let left_schema = Some(Arc::clone(left.schema()));
654 planner_context.set_outer_from_schema(left_schema)
655 };
656 for input in from {
657 let right = self.plan_table_with_joins(input, planner_context)?;
659 left = left.cross_join(right)?;
660 let left_schema = Some(Arc::clone(left.schema()));
662 planner_context.set_outer_from_schema(left_schema);
663 }
664 planner_context.set_outer_from_schema(old_outer_from_schema);
665 left.build()
666 }
667 }
668 }
669
670 pub(crate) fn prepare_select_exprs(
672 &self,
673 plan: &LogicalPlan,
674 projection: Vec<SelectItem>,
675 empty_from: bool,
676 planner_context: &mut PlannerContext,
677 ) -> Result<Vec<SelectExpr>> {
678 let mut prepared_select_exprs = vec![];
679 let mut error_builder = DataFusionErrorBuilder::new();
680
681 for expr in projection {
682 match self.sql_select_to_rex(expr, plan, empty_from, planner_context) {
683 Ok(expr) => prepared_select_exprs.push(expr),
684 Err(err) => error_builder.add_error(err),
685 }
686 }
687 error_builder.error_or(prepared_select_exprs)
688 }
689
690 fn sql_select_to_rex(
692 &self,
693 sql: SelectItem,
694 plan: &LogicalPlan,
695 empty_from: bool,
696 planner_context: &mut PlannerContext,
697 ) -> Result<SelectExpr> {
698 match sql {
699 SelectItem::UnnamedExpr(expr) => {
700 let expr = self.sql_to_expr(expr, plan.schema(), planner_context)?;
701 let col = normalize_col_with_schemas_and_ambiguity_check(
702 expr,
703 &[&[plan.schema()]],
704 &plan.using_columns()?,
705 )?;
706
707 Ok(SelectExpr::Expression(col))
708 }
709 SelectItem::ExprWithAlias { expr, alias } => {
710 let select_expr =
711 self.sql_to_expr(expr, plan.schema(), planner_context)?;
712 let col = normalize_col_with_schemas_and_ambiguity_check(
713 select_expr,
714 &[&[plan.schema()]],
715 &plan.using_columns()?,
716 )?;
717 let name = self.ident_normalizer.normalize(alias);
718 let expr = match &col {
720 Expr::Column(column) if column.name.eq(&name) => col,
721 _ => col.alias(name),
722 };
723
724 Ok(SelectExpr::Expression(expr))
725 }
726 SelectItem::Wildcard(options) => {
727 Self::check_wildcard_options(&options)?;
728 if empty_from {
729 return plan_err!("SELECT * with no tables specified is not valid");
730 }
731 let planned_options = self.plan_wildcard_options(
732 plan,
733 empty_from,
734 planner_context,
735 options,
736 )?;
737
738 Ok(SelectExpr::Wildcard(planned_options))
739 }
740 SelectItem::QualifiedWildcard(object_name, options) => {
741 Self::check_wildcard_options(&options)?;
742 let object_name = match object_name {
743 SelectItemQualifiedWildcardKind::ObjectName(object_name) => {
744 object_name
745 }
746 SelectItemQualifiedWildcardKind::Expr(_) => {
747 return plan_err!(
748 "Qualified wildcard with expression not supported"
749 )
750 }
751 };
752 let qualifier = self.object_name_to_table_reference(object_name)?;
753 let planned_options = self.plan_wildcard_options(
754 plan,
755 empty_from,
756 planner_context,
757 options,
758 )?;
759
760 Ok(SelectExpr::QualifiedWildcard(qualifier, planned_options))
761 }
762 }
763 }
764
765 fn check_wildcard_options(options: &WildcardAdditionalOptions) -> Result<()> {
766 let WildcardAdditionalOptions {
767 opt_exclude: _opt_exclude,
769 opt_except: _opt_except,
770 opt_rename,
771 opt_replace: _opt_replace,
772 opt_ilike: _opt_ilike,
773 wildcard_token: _wildcard_token,
774 } = options;
775
776 if opt_rename.is_some() {
777 not_impl_err!("wildcard * with RENAME not supported ")
778 } else {
779 Ok(())
780 }
781 }
782
783 fn plan_wildcard_options(
787 &self,
788 plan: &LogicalPlan,
789 empty_from: bool,
790 planner_context: &mut PlannerContext,
791 options: WildcardAdditionalOptions,
792 ) -> Result<WildcardOptions> {
793 let planned_option = WildcardOptions {
794 ilike: options.opt_ilike,
795 exclude: options.opt_exclude,
796 except: options.opt_except,
797 replace: None,
798 rename: options.opt_rename,
799 };
800 if let Some(replace) = options.opt_replace {
801 let replace_expr = replace
802 .items
803 .iter()
804 .map(|item| {
805 self.sql_select_to_rex(
806 SelectItem::UnnamedExpr(item.expr.clone()),
807 plan,
808 empty_from,
809 planner_context,
810 )
811 })
812 .collect::<Result<Vec<_>>>()?
813 .into_iter()
814 .filter_map(|expr| match expr {
815 SelectExpr::Expression(expr) => Some(expr),
816 _ => None,
817 })
818 .collect::<Vec<_>>();
819
820 let planned_replace = PlannedReplaceSelectItem {
821 items: replace.items.into_iter().map(|i| *i).collect(),
822 planned_expressions: replace_expr,
823 };
824 Ok(planned_option.with_replace(planned_replace))
825 } else {
826 Ok(planned_option)
827 }
828 }
829
830 pub(crate) fn project(
832 &self,
833 input: LogicalPlan,
834 expr: Vec<SelectExpr>,
835 ) -> Result<LogicalPlan> {
836 let exprs = expr
838 .iter()
839 .filter_map(|e| match e {
840 SelectExpr::Expression(expr) => Some(expr.to_owned()),
841 _ => None,
842 })
843 .collect::<Vec<_>>();
844 self.validate_schema_satisfies_exprs(input.schema(), &exprs)?;
845
846 LogicalPlanBuilder::from(input).project(expr)?.build()
847 }
848
849 #[allow(clippy::type_complexity)]
878 fn aggregate(
879 &self,
880 input: &LogicalPlan,
881 select_exprs: &[Expr],
882 having_expr_opt: Option<&Expr>,
883 qualify_expr_opt: Option<&Expr>,
884 group_by_exprs: &[Expr],
885 aggr_exprs: &[Expr],
886 ) -> Result<(LogicalPlan, Vec<Expr>, Option<Expr>, Option<Expr>)> {
887 let options =
889 LogicalPlanBuilderOptions::new().with_add_implicit_group_by_exprs(true);
890 let plan = LogicalPlanBuilder::from(input.clone())
891 .with_options(options)
892 .aggregate(group_by_exprs.to_vec(), aggr_exprs.to_vec())?
893 .build()?;
894 let group_by_exprs = if let LogicalPlan::Aggregate(agg) = &plan {
895 &agg.group_expr
896 } else {
897 unreachable!();
898 };
899
900 let mut aggr_projection_exprs = vec![];
908 for expr in group_by_exprs {
909 match expr {
910 Expr::GroupingSet(GroupingSet::Rollup(exprs)) => {
911 aggr_projection_exprs.extend_from_slice(exprs)
912 }
913 Expr::GroupingSet(GroupingSet::Cube(exprs)) => {
914 aggr_projection_exprs.extend_from_slice(exprs)
915 }
916 Expr::GroupingSet(GroupingSet::GroupingSets(lists_of_exprs)) => {
917 for exprs in lists_of_exprs {
918 aggr_projection_exprs.extend_from_slice(exprs)
919 }
920 }
921 _ => aggr_projection_exprs.push(expr.clone()),
922 }
923 }
924 aggr_projection_exprs.extend_from_slice(aggr_exprs);
925
926 let aggr_projection_exprs = aggr_projection_exprs
928 .iter()
929 .map(|expr| resolve_columns(expr, input))
930 .collect::<Result<Vec<Expr>>>()?;
931
932 let column_exprs_post_aggr = aggr_projection_exprs
935 .iter()
936 .map(|expr| expr_as_column_expr(expr, input))
937 .collect::<Result<Vec<Expr>>>()?;
938
939 let select_exprs_post_aggr = select_exprs
941 .iter()
942 .map(|expr| rebase_expr(expr, &aggr_projection_exprs, input))
943 .collect::<Result<Vec<Expr>>>()?;
944
945 check_columns_satisfy_exprs(
948 &column_exprs_post_aggr,
949 &select_exprs_post_aggr,
950 CheckColumnsSatisfyExprsPurpose::Aggregate(
951 CheckColumnsMustReferenceAggregatePurpose::Projection,
952 ),
953 )?;
954
955 let having_expr_post_aggr = if let Some(having_expr) = having_expr_opt {
958 let having_expr_post_aggr =
959 rebase_expr(having_expr, &aggr_projection_exprs, input)?;
960
961 check_columns_satisfy_exprs(
962 &column_exprs_post_aggr,
963 std::slice::from_ref(&having_expr_post_aggr),
964 CheckColumnsSatisfyExprsPurpose::Aggregate(
965 CheckColumnsMustReferenceAggregatePurpose::Having,
966 ),
967 )?;
968
969 Some(having_expr_post_aggr)
970 } else {
971 None
972 };
973
974 let qualify_expr_post_aggr = if let Some(qualify_expr) = qualify_expr_opt {
977 let qualify_expr_post_aggr =
978 rebase_expr(qualify_expr, &aggr_projection_exprs, input)?;
979
980 check_columns_satisfy_exprs(
981 &column_exprs_post_aggr,
982 std::slice::from_ref(&qualify_expr_post_aggr),
983 CheckColumnsSatisfyExprsPurpose::Aggregate(
984 CheckColumnsMustReferenceAggregatePurpose::Qualify,
985 ),
986 )?;
987
988 Some(qualify_expr_post_aggr)
989 } else {
990 None
991 };
992
993 Ok((
994 plan,
995 select_exprs_post_aggr,
996 having_expr_post_aggr,
997 qualify_expr_post_aggr,
998 ))
999 }
1000
1001 fn match_window_definitions(
1004 &self,
1005 projection: &mut [SelectItem],
1006 named_windows: &[NamedWindowDefinition],
1007 ) -> Result<()> {
1008 let named_windows: Vec<(&NamedWindowDefinition, String)> = named_windows
1009 .iter()
1010 .map(|w| (w, self.ident_normalizer.normalize(w.0.clone())))
1011 .collect();
1012 for proj in projection.iter_mut() {
1013 if let SelectItem::ExprWithAlias { expr, alias: _ }
1014 | SelectItem::UnnamedExpr(expr) = proj
1015 {
1016 let mut err = None;
1017 let _ = visit_expressions_mut(expr, |expr| {
1018 if let SQLExpr::Function(f) = expr {
1019 if let Some(WindowType::NamedWindow(ident)) = &f.over {
1020 let normalized_ident =
1021 self.ident_normalizer.normalize(ident.clone());
1022 for (
1023 NamedWindowDefinition(_, window_expr),
1024 normalized_window_ident,
1025 ) in named_windows.iter()
1026 {
1027 if normalized_ident.eq(normalized_window_ident) {
1028 f.over = Some(match window_expr {
1029 NamedWindowExpr::NamedWindow(ident) => {
1030 WindowType::NamedWindow(ident.clone())
1031 }
1032 NamedWindowExpr::WindowSpec(spec) => {
1033 WindowType::WindowSpec(spec.clone())
1034 }
1035 })
1036 }
1037 }
1038 if let Some(WindowType::NamedWindow(ident)) = &f.over {
1040 err =
1041 Some(plan_err!("The window {ident} is not defined!"));
1042 return ControlFlow::Break(());
1043 }
1044 }
1045 }
1046 ControlFlow::Continue(())
1047 });
1048 if let Some(err) = err {
1049 return err;
1050 }
1051 }
1052 }
1053 Ok(())
1054 }
1055}
1056
1057fn check_conflicting_windows(window_defs: &[NamedWindowDefinition]) -> Result<()> {
1059 for (i, window_def_i) in window_defs.iter().enumerate() {
1060 for window_def_j in window_defs.iter().skip(i + 1) {
1061 if window_def_i.0 == window_def_j.0 {
1062 return plan_err!(
1063 "The window {} is defined multiple times!",
1064 window_def_i.0
1065 );
1066 }
1067 }
1068 }
1069 Ok(())
1070}
1071
1072fn has_unnest_expr_recursively(expr: &Expr) -> bool {
1074 let mut has_unnest = false;
1075 let _ = expr.apply(|e| {
1076 if let Expr::Unnest(_) = e {
1077 has_unnest = true;
1078 Ok(TreeNodeRecursion::Stop)
1079 } else {
1080 Ok(TreeNodeRecursion::Continue)
1081 }
1082 });
1083 has_unnest
1084}