datafusion_sql/expr/
subquery.rs1use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
19use datafusion_common::{plan_err, DFSchema, Diagnostic, Result, Span, Spans};
20use datafusion_expr::expr::{Exists, InSubquery};
21use datafusion_expr::{Expr, LogicalPlan, Subquery};
22use sqlparser::ast::Expr as SQLExpr;
23use sqlparser::ast::{Query, SelectItem, SetExpr};
24use std::sync::Arc;
25
26impl<S: ContextProvider> SqlToRel<'_, S> {
27 pub(super) fn parse_exists_subquery(
28 &self,
29 subquery: Query,
30 negated: bool,
31 input_schema: &DFSchema,
32 planner_context: &mut PlannerContext,
33 ) -> Result<Expr> {
34 let old_outer_query_schema =
35 planner_context.set_outer_query_schema(Some(input_schema.clone().into()));
36 let sub_plan = self.query_to_plan(subquery, planner_context)?;
37 let outer_ref_columns = sub_plan.all_out_ref_exprs();
38 planner_context.set_outer_query_schema(old_outer_query_schema);
39 Ok(Expr::Exists(Exists {
40 subquery: Subquery {
41 subquery: Arc::new(sub_plan),
42 outer_ref_columns,
43 spans: Spans::new(),
44 },
45 negated,
46 }))
47 }
48
49 pub(super) fn parse_in_subquery(
50 &self,
51 expr: SQLExpr,
52 subquery: Query,
53 negated: bool,
54 input_schema: &DFSchema,
55 planner_context: &mut PlannerContext,
56 ) -> Result<Expr> {
57 let old_outer_query_schema =
58 planner_context.set_outer_query_schema(Some(input_schema.clone().into()));
59
60 let mut spans = Spans::new();
61 if let SetExpr::Select(select) = &subquery.body.as_ref() {
62 for item in &select.projection {
63 if let SelectItem::UnnamedExpr(SQLExpr::Identifier(ident)) = item {
64 if let Some(span) = Span::try_from_sqlparser_span(ident.span) {
65 spans.add_span(span);
66 }
67 }
68 }
69 }
70
71 let sub_plan = self.query_to_plan(subquery, planner_context)?;
72 let outer_ref_columns = sub_plan.all_out_ref_exprs();
73 planner_context.set_outer_query_schema(old_outer_query_schema);
74
75 self.validate_single_column(
76 &sub_plan,
77 spans.clone(),
78 "Too many columns! The subquery should only return one column",
79 "Select only one column in the subquery",
80 )?;
81
82 let expr_obj = self.sql_to_expr(expr, input_schema, planner_context)?;
83
84 Ok(Expr::InSubquery(InSubquery::new(
85 Box::new(expr_obj),
86 Subquery {
87 subquery: Arc::new(sub_plan),
88 outer_ref_columns,
89 spans,
90 },
91 negated,
92 )))
93 }
94
95 pub(super) fn parse_scalar_subquery(
96 &self,
97 subquery: Query,
98 input_schema: &DFSchema,
99 planner_context: &mut PlannerContext,
100 ) -> Result<Expr> {
101 let old_outer_query_schema =
102 planner_context.set_outer_query_schema(Some(input_schema.clone().into()));
103 let mut spans = Spans::new();
104 if let SetExpr::Select(select) = subquery.body.as_ref() {
105 for item in &select.projection {
106 if let SelectItem::ExprWithAlias { alias, .. } = item {
107 if let Some(span) = Span::try_from_sqlparser_span(alias.span) {
108 spans.add_span(span);
109 }
110 }
111 }
112 }
113 let sub_plan = self.query_to_plan(subquery, planner_context)?;
114 let outer_ref_columns = sub_plan.all_out_ref_exprs();
115 planner_context.set_outer_query_schema(old_outer_query_schema);
116
117 self.validate_single_column(
118 &sub_plan,
119 spans.clone(),
120 "Too many columns! The subquery should only return one column",
121 "Select only one column in the subquery",
122 )?;
123
124 Ok(Expr::ScalarSubquery(Subquery {
125 subquery: Arc::new(sub_plan),
126 outer_ref_columns,
127 spans,
128 }))
129 }
130
131 fn validate_single_column(
132 &self,
133 sub_plan: &LogicalPlan,
134 spans: Spans,
135 error_message: &str,
136 help_message: &str,
137 ) -> Result<()> {
138 if sub_plan.schema().fields().len() > 1 {
139 let sub_schema = sub_plan.schema();
140 let field_names = sub_schema.field_names();
141 let diagnostic =
142 self.build_multi_column_diagnostic(spans, error_message, help_message);
143 plan_err!("{}: {}", error_message, field_names.join(", "); diagnostic=diagnostic)
144 } else {
145 Ok(())
146 }
147 }
148
149 fn build_multi_column_diagnostic(
150 &self,
151 spans: Spans,
152 error_message: &str,
153 help_message: &str,
154 ) -> Diagnostic {
155 let full_span = Span::union_iter(spans.0.iter().cloned());
156 let mut diagnostic = Diagnostic::new_error(error_message, full_span);
157
158 for (i, span) in spans.iter().skip(1).enumerate() {
159 diagnostic.add_note(format!("Extra column {}", i + 1), Some(*span));
160 }
161
162 diagnostic.add_help(help_message, None);
163 diagnostic
164 }
165}