datafusion_sql/expr/
subquery.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
19use datafusion_common::{plan_err, DFSchema, Diagnostic, Result, Span, Spans};
20use datafusion_expr::expr::{Exists, InSubquery};
21use datafusion_expr::{Expr, LogicalPlan, Subquery};
22use sqlparser::ast::Expr as SQLExpr;
23use sqlparser::ast::{Query, SelectItem, SetExpr};
24use std::sync::Arc;
25
26impl<S: ContextProvider> SqlToRel<'_, S> {
27    pub(super) fn parse_exists_subquery(
28        &self,
29        subquery: Query,
30        negated: bool,
31        input_schema: &DFSchema,
32        planner_context: &mut PlannerContext,
33    ) -> Result<Expr> {
34        let old_outer_query_schema =
35            planner_context.set_outer_query_schema(Some(input_schema.clone().into()));
36        let sub_plan = self.query_to_plan(subquery, planner_context)?;
37        let outer_ref_columns = sub_plan.all_out_ref_exprs();
38        planner_context.set_outer_query_schema(old_outer_query_schema);
39        Ok(Expr::Exists(Exists {
40            subquery: Subquery {
41                subquery: Arc::new(sub_plan),
42                outer_ref_columns,
43                spans: Spans::new(),
44            },
45            negated,
46        }))
47    }
48
49    pub(super) fn parse_in_subquery(
50        &self,
51        expr: SQLExpr,
52        subquery: Query,
53        negated: bool,
54        input_schema: &DFSchema,
55        planner_context: &mut PlannerContext,
56    ) -> Result<Expr> {
57        let old_outer_query_schema =
58            planner_context.set_outer_query_schema(Some(input_schema.clone().into()));
59
60        let mut spans = Spans::new();
61        if let SetExpr::Select(select) = &subquery.body.as_ref() {
62            for item in &select.projection {
63                if let SelectItem::UnnamedExpr(SQLExpr::Identifier(ident)) = item {
64                    if let Some(span) = Span::try_from_sqlparser_span(ident.span) {
65                        spans.add_span(span);
66                    }
67                }
68            }
69        }
70
71        let sub_plan = self.query_to_plan(subquery, planner_context)?;
72        let outer_ref_columns = sub_plan.all_out_ref_exprs();
73        planner_context.set_outer_query_schema(old_outer_query_schema);
74
75        self.validate_single_column(
76            &sub_plan,
77            spans.clone(),
78            "Too many columns! The subquery should only return one column",
79            "Select only one column in the subquery",
80        )?;
81
82        let expr_obj = self.sql_to_expr(expr, input_schema, planner_context)?;
83
84        Ok(Expr::InSubquery(InSubquery::new(
85            Box::new(expr_obj),
86            Subquery {
87                subquery: Arc::new(sub_plan),
88                outer_ref_columns,
89                spans,
90            },
91            negated,
92        )))
93    }
94
95    pub(super) fn parse_scalar_subquery(
96        &self,
97        subquery: Query,
98        input_schema: &DFSchema,
99        planner_context: &mut PlannerContext,
100    ) -> Result<Expr> {
101        let old_outer_query_schema =
102            planner_context.set_outer_query_schema(Some(input_schema.clone().into()));
103        let mut spans = Spans::new();
104        if let SetExpr::Select(select) = subquery.body.as_ref() {
105            for item in &select.projection {
106                if let SelectItem::ExprWithAlias { alias, .. } = item {
107                    if let Some(span) = Span::try_from_sqlparser_span(alias.span) {
108                        spans.add_span(span);
109                    }
110                }
111            }
112        }
113        let sub_plan = self.query_to_plan(subquery, planner_context)?;
114        let outer_ref_columns = sub_plan.all_out_ref_exprs();
115        planner_context.set_outer_query_schema(old_outer_query_schema);
116
117        self.validate_single_column(
118            &sub_plan,
119            spans.clone(),
120            "Too many columns! The subquery should only return one column",
121            "Select only one column in the subquery",
122        )?;
123
124        Ok(Expr::ScalarSubquery(Subquery {
125            subquery: Arc::new(sub_plan),
126            outer_ref_columns,
127            spans,
128        }))
129    }
130
131    fn validate_single_column(
132        &self,
133        sub_plan: &LogicalPlan,
134        spans: Spans,
135        error_message: &str,
136        help_message: &str,
137    ) -> Result<()> {
138        if sub_plan.schema().fields().len() > 1 {
139            let sub_schema = sub_plan.schema();
140            let field_names = sub_schema.field_names();
141            let diagnostic =
142                self.build_multi_column_diagnostic(spans, error_message, help_message);
143            plan_err!("{}: {}", error_message, field_names.join(", "); diagnostic=diagnostic)
144        } else {
145            Ok(())
146        }
147    }
148
149    fn build_multi_column_diagnostic(
150        &self,
151        spans: Spans,
152        error_message: &str,
153        help_message: &str,
154    ) -> Diagnostic {
155        let full_span = Span::union_iter(spans.0.iter().cloned());
156        let mut diagnostic = Diagnostic::new_error(error_message, full_span);
157
158        for (i, span) in spans.iter().skip(1).enumerate() {
159            diagnostic.add_note(format!("Extra column {}", i + 1), Some(*span));
160        }
161
162        diagnostic.add_help(help_message, None);
163        diagnostic
164    }
165}